Skip to content

File schema_class_truncator.cpp

File List > cubrid > src > object > schema_class_truncator.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

#include "schema_class_truncator.hpp"

#include "authenticate.h"
#include "dbi.h"
#include "db.h"
#include "dbtype_function.h"
#include "execute_statement.h"
#include "network_interface_cl.h"

#include <algorithm>

using namespace cubschema;

namespace cubschema
{
  /*
   * This class is used internally by class_truncator while truncating, which could end up truncating several classes.
   * This context is in charge of truncating each of truncating classes and saving the context of each.
   */
  class class_truncate_context final
  {
    public:
      using cons_predicate = std::function<bool (const SM_CLASS_CONSTRAINT &)>;
      using saved_cons_predicate = std::function<bool (const SM_CONSTRAINT_INFO &)>;

      int save_constraints_or_clear (cons_predicate pred = nullptr);
      int drop_saved_constraints (saved_cons_predicate pred = nullptr);
      int truncate_heap ();
      int restore_constraints (saved_cons_predicate pred = nullptr);
      int reset_serials ();

      class_truncate_context (const OID &class_oid);
      class_truncate_context (class_truncate_context &&other);
      ~class_truncate_context ();

      class_truncate_context (const class_truncate_context &other) = delete;
      class_truncate_context &operator= (const class_truncate_context &other) = delete;
      class_truncate_context &operator= (const class_truncate_context &&other) = delete;

    private:
      MOP m_mop = NULL;
      SM_CLASS *m_class = NULL;
      SM_CONSTRAINT_INFO *m_unique_info = NULL;
      SM_CONSTRAINT_INFO *m_fk_info = NULL;
      SM_CONSTRAINT_INFO *m_index_info = NULL;
  };

  /*
   * collect_trun_classes () - Collects OIDs of truncatable classes regarding the CASCADE option
   *   return: NO_ERROR on success, non-zero for ERROR
   *   class_mop(in):
   *   is_cascade(in): whether to cascade TRUNCATE to FK-referring classes
   */
  int
  class_truncator::collect_trun_classes (MOP class_mop, bool is_cascade)
  {
    int error = NO_ERROR;
    SM_CLASS *class_ = NULL;
    SM_CLASS_CONSTRAINT *pk_constraint = NULL;
    SM_FOREIGN_KEY_INFO *fk_ref;
    OID *fk_cls_oid;

    error = au_fetch_class (class_mop, &class_, AU_FETCH_READ, DB_AUTH_ALTER);
    if (error != NO_ERROR || class_ == NULL)
      {
    assert (er_errid () != NO_ERROR);
    return er_errid ();
      }

    m_trun_classes.emplace (*ws_oid (class_mop));

    pk_constraint = classobj_find_cons_primary_key (class_->constraints);
    if (pk_constraint == NULL || classobj_is_pk_referred (class_mop, pk_constraint->fk_info, false, NULL) == false)
      {
    /* if no PK or FK-referred, it can be truncated */
    return NO_ERROR;
      }

    /* Now, there is a PK, and are some FKs-referring to the PK */

    if (!is_cascade)
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TRUNCATE_PK_REFERRED, 1, pk_constraint->fk_info->name);
    return ER_TRUNCATE_PK_REFERRED;
      }

    /* Find FK-child classes to cascade. */
    for (fk_ref = pk_constraint->fk_info; fk_ref; fk_ref = fk_ref->next)
      {
    if (fk_ref->delete_action == SM_FOREIGN_KEY_CASCADE)
      {
        if (m_trun_classes.find (fk_ref->self_oid) != m_trun_classes.end ())
          {
        continue;       /* already checked */
          }

        MOP fk_child_mop = ws_mop (&fk_ref->self_oid, NULL);
        if (fk_child_mop == NULL)
          {
        assert (er_errid () != NO_ERROR);
        return er_errid ();
          }

        int partition_type = DB_NOT_PARTITIONED_CLASS;
        error = sm_partitioned_class_type (fk_child_mop, &partition_type, NULL, NULL);
        if (error != NO_ERROR)
          {
        return error;
          }

        if (partition_type == DB_PARTITION_CLASS)
          {
        /*
         * FKs of all partition classes refers to a class that the parittioned class of them referes to.
         * But, partition class will be processed when the partitioned class is done.
         */
        continue;
          }

        error = collect_trun_classes (fk_child_mop, is_cascade);
        if (error != NO_ERROR)
          {
        return error;
          }
      }
    else
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TRUNCATE_CANT_CASCADE, 1, fk_ref->name);
        return ER_TRUNCATE_CANT_CASCADE;
      }
      }

    return NO_ERROR;
  }

  /*
   * truncate () - truncates classes colleted through collect_trun_classes()
   *   return: NO_ERROR on success, non-zero for ERROR
   *
   *   NOTE: this function truncates several classes which are bond in a partitioning or some foreign keys.
   *   truncating a class consists of a few steps, each of which is done one by one across all the classess.
   *   this horizontal processing is necessary that all FKs has to be processed before PKs.
   */
  int
  class_truncator::truncate (const bool is_cascade)
  {
    int error = NO_ERROR;
    std::vector<class_truncate_context> contexts;

    assert (m_class_mop != NULL);

    error = collect_trun_classes (m_class_mop, is_cascade);
    if (error != NO_ERROR)
      {
    return error;
      }

    try
      {
    std::for_each (m_trun_classes.begin(), m_trun_classes.end(), [&contexts] (const OID& oid)
    {
      contexts.emplace_back (oid);
    });
      }
    catch (int &error)
      {
    // exception from sm_context constructor
    return error;
      }

    /* Save constraints. Or, remove instances from the constraint if impossible. FK first, then non-FK */
    for (auto &context : contexts)
      {
    auto cons_predicate = [] (const SM_CLASS_CONSTRAINT& cons) -> bool
    {
      return cons.type == SM_CONSTRAINT_FOREIGN_KEY;
    };

    error = context.save_constraints_or_clear (cons_predicate);
    if (error != NO_ERROR)
      {
        return error;
      }

    /* FK must be dropped earlier than PK, because of referencing */
    error = context.drop_saved_constraints ();
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    for (auto &context : contexts)
      {
    auto cons_predicate = [] (const SM_CLASS_CONSTRAINT& cons) -> bool
    {
      return cons.type != SM_CONSTRAINT_FOREIGN_KEY;
    };

    error = context.save_constraints_or_clear (cons_predicate);
    if (error != NO_ERROR)
      {
        return error;
      }

    auto saved_cons_predicate = [] (const SM_CONSTRAINT_INFO& cons_info) -> bool
    {
      return cons_info.constraint_type != DB_CONSTRAINT_FOREIGN_KEY;
    };

    error = context.drop_saved_constraints (saved_cons_predicate);
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    /* Destroy heap file. Or, delete instances from the heap if impossible */
    for (auto &context : contexts)
      {
    error = context.truncate_heap();
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    /* Restore constraints. non-FK first, and then FK */
    for (auto &context : contexts)
      {
    auto saved_cons_predicate = [] (const SM_CONSTRAINT_INFO& cons_info) -> bool
    {
      return cons_info.constraint_type != DB_CONSTRAINT_FOREIGN_KEY;
    };

    error = context.restore_constraints (saved_cons_predicate);
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    for (auto &context : contexts)
      {
    auto saved_cons_predicate = [] (const SM_CONSTRAINT_INFO& cons_info) -> bool
    {
      return cons_info.constraint_type == DB_CONSTRAINT_FOREIGN_KEY;
    };
    error = context.restore_constraints (saved_cons_predicate);
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    /* reset auto_increment starting value */
    for (auto &context : contexts)
      {
    error = context.reset_serials ();
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    return error;
  }

  class_truncate_context::class_truncate_context (const OID &class_oid)
  {
    int error = NO_ERROR;
    m_mop = ws_mop (&class_oid, NULL);
    if (m_mop == NULL)
      {
    assert (er_errid () != NO_ERROR);
    throw er_errid ();
      }

    /* We need to flush everything so that the server logs the inserts that happened before the truncate. We need this in
     * order to make sure that a rollback takes us into a consistent state. If we can prove that simply discarding the
     * objects would work correctly we would be able to remove this call. However, it's better to be safe than sorry. */
    error = sm_flush_and_decache_objects (m_mop, true);
    if (error != NO_ERROR)
      {
    throw error;
      }

    error = au_fetch_class (m_mop, &m_class, AU_FETCH_WRITE, DB_AUTH_ALTER);
    if (error != NO_ERROR || m_class == NULL)
      {
    assert (er_errid () != NO_ERROR);
    throw er_errid ();
      }

    m_class->load_index_from_heap = 0;
  }

  class_truncate_context::class_truncate_context (class_truncate_context &&other) :
    m_mop (other.m_mop),
    m_class (other.m_class),
    m_unique_info (other.m_unique_info),
    m_fk_info (other.m_fk_info),
    m_index_info (other.m_index_info)
  {
    other.m_mop = NULL;
    other.m_class = NULL;
    other.m_unique_info = NULL;
    other.m_fk_info = NULL;
    other.m_index_info = NULL;
  }

  class_truncate_context::~class_truncate_context ()
  {
    if (m_class != NULL)
      {
    m_class->load_index_from_heap = 1;
      }

    if (m_unique_info != NULL)
      {
    sm_free_constraint_info (&m_unique_info);
      }
    if (m_fk_info != NULL)
      {
    sm_free_constraint_info (&m_fk_info);
      }
    if (m_index_info != NULL)
      {
    sm_free_constraint_info (&m_index_info);
      }
  }

  /*
   * save_constraints_or_clear () - save constraints of the the class,
   *                                or remove instance manually if it is impossible.
   *
   *   return: error code or NO_ERROR
   *   pred(in): only constraints which meet this condition are processed
   */
  int
  class_truncate_context::save_constraints_or_clear (cons_predicate pred)
  {
    int error = NO_ERROR;
    SM_CLASS_CONSTRAINT *c = NULL;

    for (c = m_class->constraints; c; c = c->next)
      {
    if (pred && !pred (*c))
      {
        continue;
      }

    if (!SM_IS_CONSTRAINT_INDEX_FAMILY (c->type))
      {
        assert (c->type == SM_CONSTRAINT_NOT_NULL);
        continue;
      }

    if ((c->type == SM_CONSTRAINT_PRIMARY_KEY && classobj_is_pk_referred (m_mop, c->fk_info, false, NULL))
        || !sm_is_possible_to_recreate_constraint (m_mop, m_class, c))
      {
        /*
         * In these cases, We cannot drop and recreate the index as this might be
         * too costly, so we just remove the instances of the current class.
         */
        error = locator_remove_class_from_index (ws_oid (m_mop), &c->index_btid, &m_class->header.ch_heap);
        if (error != NO_ERROR)
          {
        return error;
          }
      }
    else
      {
        /* All the OIDs in the index should belong to the current class, so it is safe to drop and create the
         * constraint again. We save the information required to recreate the constraint. */

        if (SM_IS_CONSTRAINT_UNIQUE_FAMILY (c->type))
          {
        if (sm_save_constraint_info (&m_unique_info, c) != NO_ERROR)
          {
            return error;
          }
          }
        else if (c->type == SM_CONSTRAINT_FOREIGN_KEY)
          {
        if (sm_save_constraint_info (&m_fk_info, c) != NO_ERROR)
          {
            return error;
          }
          }
        else
          {
        if (sm_save_constraint_info (&m_index_info, c) != NO_ERROR)
          {
            return error;
          }
          }
      }
      }

    return error;
  }

  /*
   * drop_saved_constraints () - drop constraints saved in save_constraints_or_clear().
   *
   *   return: error code or NO_ERROR
   *   pred(in): only constraints which meet this condition are processed
   */
  int
  class_truncate_context::drop_saved_constraints (saved_cons_predicate pred)
  {
    DB_CTMPL *ctmpl = NULL;
    SM_CONSTRAINT_INFO *saved = NULL;
    int error = NO_ERROR;

    /* Even for a class, FK must be dropped earlier than PK, because of the self-referencing case */
    if (m_fk_info != NULL)
      {
    ctmpl = dbt_edit_class (m_mop);
    if (ctmpl == NULL)
      {
        assert (er_errid () != NO_ERROR);
        error = er_errid ();
        return error;
      }

    for (saved = m_fk_info; saved != NULL; saved = saved->next)
      {
        if (pred && !pred (*saved))
          {
        continue;
          }

        error =
            dbt_drop_constraint (ctmpl, saved->constraint_type, saved->name, (const char **) saved->att_names, 0);
        if (error != NO_ERROR)
          {
        dbt_abort_class (ctmpl);
        return error;
          }
      }

    if (dbt_finish_class (ctmpl) == NULL)
      {
        dbt_abort_class (ctmpl);
        assert (er_errid () != NO_ERROR);
        error = er_errid ();
        return error;
      }
      }

    for (saved = m_unique_info; saved != NULL; saved = saved->next)
      {
    if (pred && !pred (*saved))
      {
        continue;
      }

    error = sm_drop_constraint (m_mop, saved->constraint_type,
                    saved->name, (const char **) saved->att_names, 0, false);
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    for (saved = m_index_info; saved != NULL; saved = saved->next)
      {
    if (pred && !pred (*saved))
      {
        continue;
      }

    error = sm_drop_index (m_mop, saved->name);
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    return error;
  }

  /*
   * truncate_heap () - truncate the heap of the class.
   *
   *   return: error code or NO_ERROR
   */
  int
  class_truncate_context::truncate_heap ()
  {
    /*
     * case 1: REUSE_OID
     * case 2: DONT_REUSE_OID with no domain referring to this class
     * case 3: DONT_REUSE_OID with some domains referring to this class
     *
     * case 1 and 2: destory heap, case 3: use DELETE query to delete all records.
     */
    if (!sm_is_reuse_oid_class (m_mop))
      {
    /*
     * This is the case 3.
     * Check if (1) there is a doamin referring to this class or (2) there is a general object domain in any user table (non system class).
     * We check it using SELECT to _db_domain.
     */
    int error = NO_ERROR;
    DB_SESSION *session = NULL;
    DB_QUERY_RESULT *result = NULL;
    STATEMENT_ID stmt_id;
    DB_VALUE value;
    char select_query[DB_MAX_IDENTIFIER_LENGTH + 256] = { 0 };
    constexpr int CNT_CATCLS_OBJECTS = 6;
    DB_BIGINT cnt_refers = CNT_CATCLS_OBJECTS + 1;
    int au_save;

    const char *class_name = db_get_class_name (m_mop);
    if (class_name == NULL)
      {
        return ER_FAILED;
      }

    /*
     * !!CAUTION!!
     * If [data_type] is DB_TYPE_OBJECT and [class_of] is NULL, it is a general object domain, but we have to check only user classes.
     * To do this, we use an walkaround in which we count the number of general object domains in existing system catalogs
     * and if the SELECT result is over this, we asuume that there are some general object domain in some user class.
     *
     * The number is now 6 and hard-coded, so we MUST consider it when add or remove a general object domain in a system class.
     * If it is changed, we MUST also change the value of CNT_CATCLS_OBJECTS.
     *
     * We add a QA test case to confirm there are only 6 general object domains in system classes, which will help notice this constraint
     * and this test case also has to be changed along if CNT_CATCLS_OBJECTS is changed.
     *
     * See CBRD-23983 and CBRD-25697 for the details.
     */

    AU_DISABLE (au_save);

    (void) snprintf (select_query, sizeof (select_query),
             "SELECT COUNT(*) FROM [_db_domain] WHERE [data_type]=%d AND ([class_of].[unique_name]='%s' OR [class_of] IS NULL) AND ROWNUM <= %d",
             DB_TYPE_OBJECT, class_name, CNT_CATCLS_OBJECTS + 1);

    session = db_open_buffer (select_query);
    if (session == NULL)
      {
        assert (er_errid () != NO_ERROR);
        error = er_errid ();
        AU_ENABLE (au_save);
        return error;
      }

    stmt_id = db_compile_statement (session);
    if (stmt_id != 1)
      {
        assert (er_errid () != NO_ERROR);
        error = er_errid ();
        db_close_session (session);
        AU_ENABLE (au_save);
        return error;
      }

    error = db_execute_statement_local (session, stmt_id, &result);
    if (error < 0)
      {
        db_close_session (session);
        AU_ENABLE (au_save);
        return error;
      }

    error = db_query_first_tuple (result);
    if (error < 0)
      {
        db_query_end (result);
        db_close_session (session);
        AU_ENABLE (au_save);
        return error;
      }

    error = db_query_get_tuple_value (result, 0, &value);
    if (error != NO_ERROR)
      {
        db_query_end (result);
        db_close_session (session);
        AU_ENABLE (au_save);
        return error;
      }

    cnt_refers = db_get_bigint (&value);

    db_query_end (result);
    db_close_session (session);

    AU_ENABLE (au_save);

    if (cnt_refers > CNT_CATCLS_OBJECTS)
      {
        return sm_truncate_using_delete (m_mop);
      }
      }

    /* Now, the heap is REUSE_OID, or DONT_REUSE_OID with no domain referring to this class. (case 1, 2) */
    return sm_truncate_using_destroy_heap (m_mop);
  }

  /*
   * restore_constraints () - restore constraints saved in save_constraints_or_clear().
   *
   *   return: error code or NO_ERROR
   *   pred(in): only constraints which meet this condition are processed
   */
  int
  class_truncate_context::restore_constraints (saved_cons_predicate pred)
  {
    DB_CTMPL *ctmpl = NULL;
    SM_CONSTRAINT_INFO *saved = NULL;
    int error = NO_ERROR;

    /* Normal index must be created earlier than unique constraint or FK, because of shared btree case. */
    for (saved = m_index_info; saved != NULL; saved = saved->next)
      {
    if (pred && !pred (*saved))
      {
        continue;
      }

    error = sm_add_constraint (m_mop, saved->constraint_type, saved->name, (const char **) saved->att_names,
                   saved->asc_desc, saved->prefix_length, false, saved->filter_predicate,
                   saved->func_index_info, saved->comment, saved->index_status);
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    /* Even for a class, PK must be created earlier than FK, because of the self-referencing case */
    for (saved = m_unique_info; saved != NULL; saved = saved->next)
      {
    if (pred && !pred (*saved))
      {
        continue;
      }

    error = sm_add_constraint (m_mop, saved->constraint_type, saved->name, (const char **) saved->att_names,
                   saved->asc_desc, saved->prefix_length, false, saved->filter_predicate,
                   saved->func_index_info, saved->comment, saved->index_status);
    if (error != NO_ERROR)
      {
        return error;
      }
      }

    /* To drop all xasl cache related class, we need to touch class. */
    ctmpl = dbt_edit_class (m_mop);
    if (ctmpl == NULL)
      {
    assert (er_errid () != NO_ERROR);
    error = er_errid ();
    return error;
      }

    for (saved = m_fk_info; saved != NULL; saved = saved->next)
      {
    if (pred && !pred (*saved))
      {
        continue;
      }

    error = dbt_add_foreign_key (ctmpl, saved->name, (const char **) saved->att_names, saved->ref_cls_name,
                     (const char **) saved->ref_attrs, saved->fk_delete_action,
                     saved->fk_update_action, saved->comment);

    if (error != NO_ERROR)
      {
        dbt_abort_class (ctmpl);
        return error;
      }
      }

    if (dbt_finish_class (ctmpl) == NULL)
      {
    dbt_abort_class (ctmpl);
    assert (er_errid () != NO_ERROR);
    error = er_errid ();
    return error;
      }

    return error;
  }

  /*
   * reset_serials () - reset serials used by the class.
   *
   *   return: error code or NO_ERROR
   */
  int
  class_truncate_context::reset_serials ()
  {
    int au_save = 0;
    SM_ATTRIBUTE *att = NULL;
    int error = NO_ERROR;

    for (att = db_get_attributes (m_mop); att != NULL; att = db_attribute_next (att))
      {
    if (att->auto_increment != NULL)
      {
        AU_DISABLE (au_save);
        error = do_reset_auto_increment_serial (att->auto_increment);
        AU_ENABLE (au_save);

        if (error != NO_ERROR)
          {
        return error;
          }
      }
      }

    return error;
  }
}