Skip to content

File replication.c

File List > cubrid > src > transaction > replication.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * replication.c -
 */

#ident "$Id$"

#include "replication.h"

#include "dbtype.h"
#include "heap_file.h"
#include "log_lsa.hpp"
#include "object_primitive.h"
#include "object_representation.h"
#include "schema_system_catalog_constants.h"

#include <assert.h>
#include <stdio.h>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

/*
 * EXTERN TO ALL SERVER RECOVERY FUNCTION CODED SOMEWHERE ELSE
 */

#define     REPL_LOG_IS_NOT_EXISTS(tran_index)                            \
               (log_Gl.trantable.all_tdes[(tran_index)]->num_repl_records == 0)
#define     REPL_LOG_IS_FULL(tran_index)                                  \
               (log_Gl.trantable.all_tdes[(tran_index)]->num_repl_records     \
                 == log_Gl.trantable.all_tdes[(tran_index)]->cur_repl_record+1)

static const int REPL_LOG_INFO_ALLOC_SIZE = 100;

#if defined(SERVER_MODE) || defined(SA_MODE)
static int repl_log_info_alloc (LOG_TDES * tdes, int arr_size, bool need_realloc);
#endif /* SERVER_MODE || SA_MODE */

#if defined(SERVER_MODE) || defined(SA_MODE)

/*
 * repl_data_insert_log_dump - dump the "DATA INSERT" replication log
 *
 * return:
 *
 *   length(in): length of the data
 *   data(in): log data
 *
 * NOTE:
 */
void
repl_data_insert_log_dump (FILE * fp, int length, void *data)
{
  char *class_name;
  DB_VALUE key;
  char *ptr;

  ptr = or_unpack_string_nocopy ((char *) data, &class_name);
  ptr = or_unpack_mem_value (ptr, &key);
  fprintf (fp, "      class_name: %s\n", class_name);
  fprintf (fp, "      pk_value: ");
  db_value_print (&key);
  pr_clear_value (&key);
  fprintf (fp, "\n");
  fflush (fp);
}

#if defined (ENABLE_UNUSED_FUNCTION)
/*
 * repl_data_udpate_log_dump - dump the "DATA UPDATE" replication log
 *
 * return:
 *
 *   length(in): length of the data
 *   data(in): log data
 *
 * NOTE:
 */
void
repl_data_udpate_log_dump (FILE * fp, int length, void *data)
{
  /* currently same logic as insert case, but I can't gaurantee it's true after this... */
  repl_data_insert_log_dump (fp, length, data);
}

/*
 * repl_data_delete_log_dump - dump the "DATA DELETE" replication log
 *
 * return:
 *
 *   length(in): length of the data
 *   data(in): log data
 *
 * NOTE:
 */
void
repl_data_delete_log_dump (FILE * fp, int length, void *data)
{
  /* currently same logic as insert case, but I can't gaurantee it's true after this... */
  repl_data_insert_log_dump (fp, length, data);
}

#endif

/*
 * repl_schema_log_dump -
 *
 * return:
 *
 *   length(in):
 *   data(in):
 *
 * NOTE:
 */
void
repl_schema_log_dump (FILE * fp, int length, void *data)
{
  int type;
  char *class_name, *ddl;
  char *ptr;

  ptr = or_unpack_int ((char *) data, &type);
  ptr = or_unpack_string_nocopy (ptr, &class_name);
  ptr = or_unpack_string_nocopy (ptr, &ddl);
  fprintf (fp, "      type: %d\n", type);
  fprintf (fp, "      class_name: %s\n", class_name);
  fprintf (fp, "      DDL: %s\n", ddl);
  fflush (fp);
}

/*
 * repl_log_info_alloc - log info area allocation
 *
 * return: Error Code
 *
 *   tdes(in): transaction descriptor
 *   arr_size(in): array size to be allocated
 *   need_realloc(in): 0-> initial allocation (malloc), otherwise "realloc"
 *
 * NOTE:This function allocates the memory for the log info area of the
 *       target transaction. It is called when the transaction tries to to
 *   insert/update/delete operation. If the transaction do read operation
 *   only, no memory is allocated.
 *
 *   The allocation size is defined by a constant - REPL_LOG_INFO_ALLOC_SIZE
 *   We need to set the size for the user request ?
 */
static int
repl_log_info_alloc (LOG_TDES * tdes, int arr_size, bool need_realloc)
{
  int i = 0, k;
  int error = NO_ERROR;

  if (need_realloc == false)
    {
      i = arr_size * DB_SIZEOF (LOG_REPL_RECORD);
      tdes->repl_records = (LOG_REPL_RECORD *) malloc (i);
      if (tdes->repl_records == NULL)
    {
      error = ER_REPL_ERROR;
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_REPL_ERROR, 1, "can't allocate memory");
      return error;
    }
      tdes->num_repl_records = arr_size;
      k = 0;
    }
  else
    {
      i = tdes->num_repl_records + arr_size;
      tdes->repl_records = (LOG_REPL_RECORD *) realloc (tdes->repl_records, i * DB_SIZEOF (LOG_REPL_RECORD));
      if (tdes->repl_records == NULL)
    {
      error = ER_REPL_ERROR;
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_REPL_ERROR, 1, "can't allocate memory");
      return error;
    }
      k = tdes->num_repl_records;
      tdes->num_repl_records = i;
    }

  for (i = k; i < tdes->num_repl_records; i++)
    {
      tdes->repl_records[i].repl_data = NULL;
    }

  return error;
}

/*
 * repl_add_update_lsa - update the LSA of the target transaction lo
 *
 * return: NO_ERROR or error code
 *
 *   inst_oid(in): OID of the instance
 *
 * NOTE:For update operation, the server does "Heap stuff" after "Index Stuff".
 *     In order to reduce the cost of replication log, we generates a
 *     replication log at the point of indexing  processing step.
 *     (During index processing, the primary key value is fetched ..)
 *     After Heap operation, we have to set the target LSA into the
 *     replication log.
 *
 *     For update case, this function is called by locator_update_force().
 *     In the case of insert/delete cases, when the replication log info. is
 *     generated, the server already has the target transaction log(HEAP_INSERT
 *     or HEAP_DELETE).
 *     But, for the udpate case, the server doesn't has the target log when
 *     it generates the replication log. So, the server has to find out the
 *     location of replication record and match with the target transaction
 *     log after heap_update(). This is done by locator_update_force().
 */
int
repl_add_update_lsa (THREAD_ENTRY * thread_p, const OID * inst_oid)
{
  int tran_index;
  LOG_TDES *tdes;
  LOG_REPL_RECORD *repl_rec;
  int i;
  bool find = false;
  int error = NO_ERROR;

  tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);

  tdes = LOG_FIND_TDES (tran_index);
  if (tdes == NULL)
    {
      return ER_FAILED;
    }

  /* If suppress_replication flag is set, do not write replication log. */
  if (tdes->suppress_replication != 0)
    {
      return NO_ERROR;
    }

  for (i = tdes->cur_repl_record - 1; i >= 0; i--)
    {
      repl_rec = (LOG_REPL_RECORD *) (&tdes->repl_records[i]);
      if (OID_EQ (&repl_rec->inst_oid, inst_oid) && !LSA_ISNULL (&tdes->repl_update_lsa))
    {
      assert (repl_rec->rcvindex == RVREPL_DATA_UPDATE || repl_rec->rcvindex == RVREPL_DATA_UPDATE_START
          || repl_rec->rcvindex == RVREPL_DATA_UPDATE_END);
      if (repl_rec->rcvindex == RVREPL_DATA_UPDATE || repl_rec->rcvindex == RVREPL_DATA_UPDATE_START
          || repl_rec->rcvindex == RVREPL_DATA_UPDATE_END)
        {
          LSA_COPY (&repl_rec->lsa, &tdes->repl_update_lsa);
          LSA_SET_NULL (&tdes->repl_update_lsa);
          LSA_SET_NULL (&tdes->repl_insert_lsa);
          find = true;
          break;
        }
    }
    }

  if (find == false && prm_get_bool_value (PRM_ID_DEBUG_REPLICATION_DATA))
    {
      _er_log_debug (ARG_FILE_LINE, "can't find out the UPDATE LSA");
    }

  return error;
}

/*
 * repl_log_insert - insert a replication info to the transaction descriptor
 *
 * return: NO_ERROR or error code
 *
 *   class_oid(in): OID of the class
 *   inst_oid(in): OID of the instance
 *   log_type(in): log type (DATA or SCHEMA)
 *   rcvindex(in): recovery index (INSERT or DELETE or UPDATE)
 *   key_dbvalue(in): Primary Key value
 *
 * NOTE:insert a replication log info to the transaction descriptor (tdes)
 */
int
repl_log_insert (THREAD_ENTRY * thread_p, const OID * class_oid, const OID * inst_oid, LOG_RECTYPE log_type,
         LOG_RCVINDEX rcvindex, DB_VALUE * key_dbvalue, REPL_INFO_TYPE repl_info)
{
  int tran_index;
  LOG_TDES *tdes;
  LOG_REPL_RECORD *repl_rec;
  TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
  char *class_name = NULL;
  char *ptr;
  int error = NO_ERROR, strlen;

  tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
  tdes = LOG_FIND_TDES (tran_index);
  if (tdes == NULL)
    {
      return ER_FAILED;
    }

  /* If suppress_replication flag is set, do not write replication log. */
  if (tdes->suppress_replication != 0)
    {
      /* clear repl lsa in tdes since no replication log will be written */
      LSA_SET_NULL (&tdes->repl_insert_lsa);
      LSA_SET_NULL (&tdes->repl_update_lsa);

      return NO_ERROR;
    }

  if (thread_p->no_logging && tdes->fl_mark_repl_recidx == -1)
    {
      return NO_ERROR;
    }

  /* check the replication log array status, if we need to alloc? */
  if (REPL_LOG_IS_NOT_EXISTS (tran_index)
      && ((error = repl_log_info_alloc (tdes, REPL_LOG_INFO_ALLOC_SIZE, false)) != NO_ERROR))
    {
      return error;
    }
  /* the replication log array is full? re-alloc? */
  else if (REPL_LOG_IS_FULL (tran_index)
       && (error = repl_log_info_alloc (tdes, REPL_LOG_INFO_ALLOC_SIZE, true)) != NO_ERROR)
    {
      return error;
    }

  repl_rec = (LOG_REPL_RECORD *) (&tdes->repl_records[tdes->cur_repl_record]);
  repl_rec->repl_type = log_type;
  repl_rec->tde_encrypted = false;

  repl_rec->rcvindex = rcvindex;
  if (rcvindex == RVREPL_DATA_UPDATE)
    {
      switch (repl_info)
    {
    case REPL_INFO_TYPE_RBR_START:
      repl_rec->rcvindex = RVREPL_DATA_UPDATE_START;
      break;
    case REPL_INFO_TYPE_RBR_END:
      repl_rec->rcvindex = RVREPL_DATA_UPDATE_END;
      break;
    case REPL_INFO_TYPE_RBR_NORMAL:
    default:
      break;
    }

    }
  COPY_OID (&repl_rec->inst_oid, inst_oid);

  /* make the common info for the data replication */
  if (log_type == LOG_REPLICATION_DATA)
    {
      char *ptr_to_packed_key_value_size = NULL;
      int packed_key_len = 0;

      if (heap_get_class_name (thread_p, class_oid, &class_name) != NO_ERROR || class_name == NULL)
    {
      ASSERT_ERROR_AND_SET (error);
      if (error == NO_ERROR)
        {
          error = ER_REPL_ERROR;
          er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_REPL_ERROR, 1, "can't get class_name");
        }
      return error;
    }

      if (heap_get_class_tde_algorithm (thread_p, class_oid, &tde_algo) != NO_ERROR)
    {
      ASSERT_ERROR_AND_SET (error);
      if (error == NO_ERROR)
        {
          error = ER_REPL_ERROR;
        }
      return error;
    }

      repl_rec->tde_encrypted = tde_algo != TDE_ALGORITHM_NONE;

      repl_rec->length = OR_INT_SIZE;   /* packed_key_value_size */
      repl_rec->length += or_packed_string_length (class_name, &strlen);
      repl_rec->length += OR_VALUE_ALIGNED_SIZE (key_dbvalue);

      ptr = (char *) malloc (repl_rec->length);
      if (ptr == NULL)
    {
      error = ER_REPL_ERROR;
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_REPL_ERROR, 1, "can't allocate memory");
      free_and_init (class_name);
      return error;
    }

#if !defined (NDEBUG)
      /* Suppress valgrind complaint. */
      memset (ptr, 0, repl_rec->length);
#endif // DEBUG

      repl_rec->repl_data = ptr;

      /* first 4 bytes are for packed_key_len which will be filled after packing the value. */
      ptr_to_packed_key_value_size = ptr;
      ptr += OR_INT_SIZE;

      ptr = or_pack_string_with_length (ptr, class_name, strlen);
      ptr = or_pack_mem_value (ptr, key_dbvalue, &packed_key_len);

      /* fill the length of disk image of pk */
      or_pack_int (ptr_to_packed_key_value_size, packed_key_len);
    }
  else
    {
      repl_rec->repl_data = NULL;
      repl_rec->length = 0;
    }
  repl_rec->must_flush = LOG_REPL_COMMIT_NEED_FLUSH;

  switch (rcvindex)
    {
    case RVREPL_DATA_INSERT:
      if (!LSA_ISNULL (&tdes->repl_insert_lsa))
    {
      LSA_COPY (&repl_rec->lsa, &tdes->repl_insert_lsa);
      LSA_SET_NULL (&tdes->repl_insert_lsa);
      LSA_SET_NULL (&tdes->repl_update_lsa);
    }
      break;
    case RVREPL_DATA_UPDATE:
      /*
       * for the update case, this function is called before the heap
       * file update, so we don't need to LSA for update log here.
       */
      LSA_SET_NULL (&repl_rec->lsa);
      break;
    case RVREPL_DATA_DELETE:
      /*
       * for the delete case, we don't need to find out the target
       * LSA. Delete is operation is possible without "After Image"
       */
      if (LSA_ISNULL (&tdes->tail_lsa))
    {
      LSA_COPY (&repl_rec->lsa, &log_Gl.prior_info.prior_lsa);
    }
      else
    {
      LSA_COPY (&repl_rec->lsa, &tdes->tail_lsa);
    }
      break;
    default:
      break;
    }
  tdes->cur_repl_record++;

  /* if flush marking is started, mark "must_flush" at current log except the log conflicts with previous logs due to
   * same instance update */
  if (tdes->fl_mark_repl_recidx != -1)
    {
      LOG_REPL_RECORD *recsp = tdes->repl_records;
      int i;

      if (strcmp (class_name, CT_SERIAL_NAME) != 0)
    {
      for (i = 0; i < tdes->fl_mark_repl_recidx; i++)
        {
          if (recsp[i].must_flush == LOG_REPL_COMMIT_NEED_FLUSH && OID_EQ (&recsp[i].inst_oid, &repl_rec->inst_oid))
        {
          break;
        }
        }

      if (i >= tdes->fl_mark_repl_recidx)
        {
          repl_rec->must_flush = LOG_REPL_NEED_FLUSH;
        }
    }
      else
    {
      repl_rec->must_flush = LOG_REPL_NEED_FLUSH;
    }
    }

  if (class_name != NULL)
    {
      free_and_init (class_name);
    }

  return error;
}

/*
 * repl_log_insert_schema - insert a replication info(schema) to the
 *                          transaction descriptor
 *
 * return: NO_ERROR or error code
 *
 *   repl_schema(in):
 *
 * NOTE:insert a replication log info(schema) to the transaction
 *      descriptor (tdes)
 */
int
repl_log_insert_statement (THREAD_ENTRY * thread_p, REPL_INFO_SBR * repl_info)
{
  int tran_index;
  LOG_TDES *tdes;
  LOG_REPL_RECORD *repl_rec;
  char *ptr;
  int error = NO_ERROR, strlen1, strlen2, strlen3, strlen4;

  tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
  tdes = LOG_FIND_TDES (tran_index);
  if (tdes == NULL)
    {
      return ER_FAILED;
    }

  /* If suppress_replication flag is set, do not write replication log. */
  if (tdes->suppress_replication != 0)
    {
      return NO_ERROR;
    }

  /* check the replication log array status, if we need to alloc? */
  if (REPL_LOG_IS_NOT_EXISTS (tran_index)
      && ((error = repl_log_info_alloc (tdes, REPL_LOG_INFO_ALLOC_SIZE, false)) != NO_ERROR))
    {
      return error;
    }
  /* the replication log array is full? re-alloc? */
  else if (REPL_LOG_IS_FULL (tran_index)
       && (error = repl_log_info_alloc (tdes, REPL_LOG_INFO_ALLOC_SIZE, true)) != NO_ERROR)
    {
      return error;
    }

  repl_rec = (LOG_REPL_RECORD *) (&tdes->repl_records[tdes->cur_repl_record]);
  repl_rec->repl_type = LOG_REPLICATION_STATEMENT;
  repl_rec->tde_encrypted = false;

  repl_rec->rcvindex = RVREPL_STATEMENT;
  repl_rec->must_flush = LOG_REPL_COMMIT_NEED_FLUSH;
  OID_SET_NULL (&repl_rec->inst_oid);

  /* make the common info for the schema replication */
  repl_rec->length = (OR_INT_SIZE   /* REPL_INFO_SCHEMA.statement_type */
              + or_packed_string_length (repl_info->name, &strlen1)
              + or_packed_string_length (repl_info->stmt_text, &strlen2)
              + or_packed_string_length (repl_info->db_user, &strlen3)
              + or_packed_string_length (repl_info->sys_prm_context, &strlen4));

  repl_rec->repl_data = (char *) malloc (repl_rec->length);
  if (repl_rec->repl_data == NULL)
    {
      error = ER_REPL_ERROR;
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_REPL_ERROR, 1, "can't allocate memory");
      return error;
    }
  ptr = repl_rec->repl_data;
  ptr = or_pack_int (ptr, repl_info->statement_type);
  ptr = or_pack_string_with_length (ptr, repl_info->name, strlen1);
  ptr = or_pack_string_with_length (ptr, repl_info->stmt_text, strlen2);
  ptr = or_pack_string_with_length (ptr, repl_info->db_user, strlen3);
  ptr = or_pack_string_with_length (ptr, repl_info->sys_prm_context, strlen4);

  if (prm_get_bool_value (PRM_ID_DEBUG_REPLICATION_DATA))
    {
      _er_log_debug (ARG_FILE_LINE,
             "repl_log_insert_statement: repl_info_sbr { type %d, name %s, stmt_txt %s, user %s, "
             "sys_prm_context %s }\n", repl_info->statement_type, repl_info->name, repl_info->stmt_text,
             repl_info->db_user, repl_info->sys_prm_context);
    }
  LSA_COPY (&repl_rec->lsa, &tdes->tail_lsa);

  if (tdes->fl_mark_repl_recidx != -1 && tdes->cur_repl_record >= tdes->fl_mark_repl_recidx)
    {
      /*
       * statement replication does not check log conflicts, so
       * use repl_start_flush_mark with caution.
       */
      repl_rec->must_flush = LOG_REPL_NEED_FLUSH;
    }

  tdes->cur_repl_record++;

  return error;
}

/*
 * repl_start_flush_mark -
 *
 * return:
 *
 * NOTE:start to mark "must_flush" for repl records to be appended after this
 */
void
repl_start_flush_mark (THREAD_ENTRY * thread_p)
{
  LOG_TDES *tdes;

  tdes = LOG_FIND_CURRENT_TDES (thread_p);

  if (tdes == NULL)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1,
          LOG_FIND_THREAD_TRAN_INDEX (thread_p));
      return;
    }
  if (tdes->fl_mark_repl_recidx == -1)
    {
      tdes->fl_mark_repl_recidx = tdes->cur_repl_record;
    }               /* else already started, return */
  return;
}

/*
 * repl_end_flush_mark -
 *
 * return:
 *
 *   need_undo(in):
 *
 * NOTE:end to mark "must_flush" for repl records
 */
void
repl_end_flush_mark (THREAD_ENTRY * thread_p, bool need_undo)
{
  LOG_TDES *tdes;
  int i;

  tdes = LOG_FIND_CURRENT_TDES (thread_p);

  if (tdes == NULL)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1,
          LOG_FIND_THREAD_TRAN_INDEX (thread_p));
      return;
    }
  if (need_undo)
    {
      LOG_REPL_RECORD *recsp = tdes->repl_records;
      for (i = tdes->fl_mark_repl_recidx; i < tdes->cur_repl_record; i++)
    {
      /* initialize repl records to be marked as flush */
      free_and_init (recsp[i].repl_data);
    }
      tdes->cur_repl_record = tdes->fl_mark_repl_recidx;
    }
  tdes->fl_mark_repl_recidx = -1;

  return;
}

/*
 * repl_log_abort_after_lsa -
 *
 * return:
 *
 *   tdes (in) :
 *   start_lsa (in) :
 *
 */
int
repl_log_abort_after_lsa (LOG_TDES * tdes, LOG_LSA * start_lsa)
{
  LOG_REPL_RECORD *repl_rec_arr;
  int i;

  repl_rec_arr = tdes->repl_records;
  for (i = 0; i < tdes->cur_repl_record; i++)
    {
      if (LSA_GT (&repl_rec_arr[i].lsa, start_lsa))
    {
      repl_rec_arr[i].must_flush = LOG_REPL_DONT_NEED_FLUSH;
    }
    }

  return NO_ERROR;
}

#if defined(CUBRID_DEBUG)
/*
 * repl_debug_info - DEBUGGING Function, print out the replication log info.
 *
 * return:
 *
 * NOTE:Dump the replication log info to the stdout.
 */
void
repl_debug_info ()
{
  LOG_TDES *tdes;
  LOG_REPL_RECORD *repl_rec;
  int tnum, rnum;
  char *class_name;
  DB_VALUE key;
  char *ptr;

  fprintf (stdout, "REPLICATION LOG DUMP -- Memory\n");
  for (tnum = 0; tnum < log_Gl.trantable.num_assigned_indices; tnum++)
    {
      fprintf (stdout, "*************************************************\n");
      tdes = log_Gl.trantable.all_tdes[tnum];
      fprintf (stdout, "For the Trid : %d\n", (int) tdes->trid);
      for (rnum = 0; rnum < tdes->cur_repl_record; rnum++)
    {
      fprintf (stdout, "   RECORD # %d\n", rnum);
      repl_rec = (LOG_REPL_RECORD *) (&tdes->repl_records[rnum]);
      fprintf (stdout, "      type: %s\n", log_to_string (repl_rec->repl_type));
      fprintf (stdout, "      OID: %d - %d - %d\n", repl_rec->inst_oid.volid, repl_rec->inst_oid.pageid,
           repl_rec->inst_oid.slotid);
      ptr = or_unpack_string_nocopy (repl_rec->repl_data, &class_name);
      ptr = or_unpack_mem_value (ptr, &key);
      fprintf (stdout, "      class_name: %s\n", class_name);
      fprintf (stdout, "      LSA: %lld | %d\n", repl_rec->lsa.pageid, repl_rec->lsa.offset);
      db_value_print (&key);
      fprintf (stdout, "\n----------------------------------------------\n");
      pr_clear_value (&key);
    }
    }
  fflush (stdout);
}
#endif /* CUBRID_DEBUG */

#endif /* SERVER_MODE || SA_MODE */