Skip to content

File log_append.cpp

File List > cubrid > src > transaction > log_append.cpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

#include "log_append.hpp"

#include "file_manager.h"
#include "log_compress.h"
#include "log_impl.h"
#include "log_manager.h"
#include "log_record.hpp"
#include "page_buffer.h"
#include "perf_monitor.h"
#include "thread_entry.hpp"
#include "thread_manager.hpp"
#include "vacuum.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#if !defined(SERVER_MODE)
static LOG_ZIP *log_zip_undo = NULL;
static LOG_ZIP *log_zip_redo = NULL;
static char *log_data_ptr = NULL;
static int log_data_length = 0;
#endif
bool log_Zip_support = false;
int log_Zip_min_size_to_compress = 255;

size_t
LOG_PRIOR_LSA_LAST_APPEND_OFFSET ()
{
  return LOGAREA_SIZE;
}

static void log_prior_lsa_append_align ();
static void log_prior_lsa_append_advance_when_doesnot_fit (size_t length);
static void log_prior_lsa_append_add_align (size_t add);
static int prior_lsa_gen_postpone_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_RCVINDEX rcvindex,
    LOG_DATA_ADDR *addr, int length, const char *data);
static int prior_lsa_gen_dbout_redo_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_RCVINDEX rcvindex,
    int length, const char *data);
static int prior_lsa_gen_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_RECTYPE rec_type, int length,
                 const char *data);
static int prior_lsa_gen_undoredo_record_from_crumbs (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node,
    LOG_RCVINDEX rcvindex, LOG_DATA_ADDR *addr, int num_ucrumbs, const LOG_CRUMB *ucrumbs, int num_rcrumbs,
    const LOG_CRUMB *rcrumbs);
static int prior_lsa_gen_2pc_prepare_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, int gtran_length,
    const char *gtran_data, int lock_length, const char *lock_data);
static int prior_lsa_gen_end_chkpt_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, int tran_length,
    const char *tran_data, int topop_length, const char *topop_data);
static int prior_lsa_copy_undo_data_to_node (LOG_PRIOR_NODE *node, int length, const char *data);
static int prior_lsa_copy_redo_data_to_node (LOG_PRIOR_NODE *node, int length, const char *data);
static int prior_lsa_copy_undo_crumbs_to_node (LOG_PRIOR_NODE *node, int num_crumbs, const LOG_CRUMB *crumbs);
static int prior_lsa_copy_redo_crumbs_to_node (LOG_PRIOR_NODE *node, int num_crumbs, const LOG_CRUMB *crumbs);
static void prior_lsa_start_append (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_TDES *tdes);
static void prior_lsa_end_append (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node);
static void prior_lsa_append_data (int length);
static LOG_LSA prior_lsa_next_record_internal (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_TDES *tdes,
    int with_lock);
static void prior_update_header_mvcc_info (const LOG_LSA &record_lsa, MVCCID mvccid);
static char *log_append_get_data_ptr (THREAD_ENTRY *thread_p);
static bool log_append_realloc_data_ptr (THREAD_ENTRY *thread_p, int length);

log_data_addr::log_data_addr (const VFID *vfid_arg, PAGE_PTR pgptr_arg, PGLENGTH offset_arg)
  : vfid (vfid_arg)
  , pgptr (pgptr_arg)
  , offset (offset_arg)
{
}

log_append_info::log_append_info ()
  : vdes (NULL_VOLDES)
  , nxio_lsa (NULL_LSA)
  , prev_lsa (NULL_LSA)
  , log_pgptr (NULL)
  , appending_page_tde_encrypted (false)
{

}

log_append_info::log_append_info (const log_append_info &other)
  : vdes (other.vdes)
  , nxio_lsa {other.nxio_lsa.load ()}
  , prev_lsa (other.prev_lsa)
  , log_pgptr (other.log_pgptr)
  , appending_page_tde_encrypted (other.appending_page_tde_encrypted)
{

}

LOG_LSA
log_append_info::get_nxio_lsa () const
{
  return nxio_lsa.load ();
}

void
log_append_info::set_nxio_lsa (const LOG_LSA &next_io_lsa)
{
  nxio_lsa.store (next_io_lsa);
}

log_prior_lsa_info::log_prior_lsa_info ()
  : prior_lsa (NULL_LSA)
  , prev_lsa (NULL_LSA)
  , prior_list_header (NULL)
  , prior_list_tail (NULL)
  , list_size (0)
  , prior_flush_list_header (NULL)
  , prior_lsa_mutex ()
{
}

void
LOG_RESET_APPEND_LSA (const LOG_LSA *lsa)
{
  // todo - concurrency safe-guard
  log_Gl.hdr.append_lsa = *lsa;
  log_Gl.prior_info.prior_lsa = *lsa;
}

void
LOG_RESET_PREV_LSA (const LOG_LSA *lsa)
{
  // todo - concurrency safe-guard
  log_Gl.append.prev_lsa = *lsa;
  log_Gl.prior_info.prev_lsa = *lsa;
}

char *
LOG_APPEND_PTR ()
{
  // todo - concurrency safe-guard
  return log_Gl.append.log_pgptr->area + log_Gl.hdr.append_lsa.offset;
}

bool
log_prior_has_worker_log_records (THREAD_ENTRY *thread_p)
{
  LOG_CS_ENTER (thread_p);

  std::unique_lock<std::mutex> ulock (log_Gl.prior_info.prior_lsa_mutex);
  LOG_LSA nxio_lsa = log_Gl.append.get_nxio_lsa ();

  if (!LSA_EQ (&nxio_lsa, &log_Gl.prior_info.prior_lsa))
    {
      LOG_PRIOR_NODE *node;

      assert (LSA_LT (&nxio_lsa, &log_Gl.prior_info.prior_lsa));
      node = log_Gl.prior_info.prior_list_header;
      while (node != NULL)
    {
      if (node->log_header.trid != LOG_SYSTEM_TRANID)
        {
          ulock.unlock ();
          LOG_CS_EXIT (thread_p);
          return true;
        }
      node = node->next;
    }
    }

  ulock.unlock ();

  LOG_CS_EXIT (thread_p);

  return false;
}

void
log_append_init_zip ()
{
  if (!prm_get_bool_value (PRM_ID_LOG_COMPRESS))
    {
      log_Zip_support = false;
      return;
    }

#if defined(SERVER_MODE)
  log_Zip_support = true;
#else
  log_zip_undo = log_zip_alloc (IO_PAGESIZE);
  log_zip_redo = log_zip_alloc (IO_PAGESIZE);
  log_data_length = IO_PAGESIZE * 2;
  log_data_ptr = (char *) malloc (log_data_length);
  if (log_data_ptr == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) log_data_length);
    }

  if (log_zip_undo == NULL || log_zip_redo == NULL || log_data_ptr == NULL)
    {
      log_Zip_support = false;
      if (log_zip_undo)
    {
      log_zip_free (log_zip_undo);
      log_zip_undo = NULL;
    }
      if (log_zip_redo)
    {
      log_zip_free (log_zip_redo);
      log_zip_redo = NULL;
    }
      if (log_data_ptr)
    {
      free_and_init (log_data_ptr);
      log_data_length = 0;
    }
    }
  else
    {
      log_Zip_support = true;
    }
#endif
}

void
log_append_final_zip ()
{
  if (!log_Zip_support)
    {
      return;
    }

#if defined (SERVER_MODE)
#else
  if (log_zip_undo)
    {
      log_zip_free (log_zip_undo);
      log_zip_undo = NULL;
    }
  if (log_zip_redo)
    {
      log_zip_free (log_zip_redo);
      log_zip_redo = NULL;
    }
  if (log_data_ptr)
    {
      free_and_init (log_data_ptr);
      log_data_length = 0;
    }
#endif
}

/*
 * prior_lsa_alloc_and_copy_data -
 *
 * return: new node
 *
 *   rec_type(in):
 *   rcvindex(in):
 *   addr(in):
 *   ulength(in):
 *   udata(in):
 *   rlength(in):
 *   rdata(in):
 */
LOG_PRIOR_NODE *
prior_lsa_alloc_and_copy_data (THREAD_ENTRY *thread_p, LOG_RECTYPE rec_type, LOG_RCVINDEX rcvindex,
                   LOG_DATA_ADDR *addr, int ulength, const char *udata, int rlength, const char *rdata)
{
  LOG_PRIOR_NODE *node;
  int error_code = NO_ERROR;

  node = (LOG_PRIOR_NODE *) malloc (sizeof (LOG_PRIOR_NODE));
  if (node == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (LOG_PRIOR_NODE));
      return NULL;
    }

  node->log_header.type = rec_type;

  node->tde_encrypted = false;

  node->data_header_length = 0;
  node->data_header = NULL;
  node->ulength = 0;
  node->udata = NULL;
  node->rlength = 0;
  node->rdata = NULL;
  node->next = NULL;

  switch (rec_type)
    {
    case LOG_UNDOREDO_DATA:
    case LOG_DIFF_UNDOREDO_DATA:
    case LOG_UNDO_DATA:
    case LOG_REDO_DATA:
    case LOG_MVCC_UNDOREDO_DATA:
    case LOG_MVCC_DIFF_UNDOREDO_DATA:
    case LOG_MVCC_REDO_DATA:
    case LOG_MVCC_UNDO_DATA:
      /* We shouldn't be here */
      /* Use prior_lsa_alloc_and_copy_crumbs instead */
      assert_release (false);
      error_code = ER_FAILED;
      break;

    case LOG_DBEXTERN_REDO_DATA:
      error_code = prior_lsa_gen_dbout_redo_record (thread_p, node, rcvindex, rlength, rdata);
      break;

    case LOG_POSTPONE:
      assert (ulength == 0 && udata == NULL);

      error_code = prior_lsa_gen_postpone_record (thread_p, node, rcvindex, addr, rlength, rdata);
      break;

    case LOG_2PC_PREPARE:
      assert (addr == NULL);
      error_code = prior_lsa_gen_2pc_prepare_record (thread_p, node, ulength, udata, rlength, rdata);
      break;
    case LOG_END_CHKPT:
      assert (addr == NULL);
      error_code = prior_lsa_gen_end_chkpt_record (thread_p, node, ulength, udata, rlength, rdata);
      break;

    case LOG_RUN_POSTPONE:
    case LOG_COMPENSATE:
    case LOG_SAVEPOINT:

    case LOG_DUMMY_HEAD_POSTPONE:

    case LOG_DUMMY_CRASH_RECOVERY:
    case LOG_DUMMY_HA_SERVER_STATE:
    case LOG_DUMMY_OVF_RECORD:
    case LOG_DUMMY_GENERIC:
    case LOG_SUPPLEMENTAL_INFO:

    case LOG_2PC_COMMIT_DECISION:
    case LOG_2PC_ABORT_DECISION:
    case LOG_COMMIT_WITH_POSTPONE:
    case LOG_COMMIT_WITH_POSTPONE_OBSOLETE:
    case LOG_SYSOP_START_POSTPONE:
    case LOG_COMMIT:
    case LOG_ABORT:
    case LOG_2PC_COMMIT_INFORM_PARTICPS:
    case LOG_2PC_ABORT_INFORM_PARTICPS:
    case LOG_SYSOP_END:
    case LOG_REPLICATION_DATA:
    case LOG_REPLICATION_STATEMENT:
    case LOG_2PC_START:
    case LOG_START_CHKPT:
    case LOG_SYSOP_ATOMIC_START:
      assert (rlength == 0 && rdata == NULL);

      error_code = prior_lsa_gen_record (thread_p, node, rec_type, ulength, udata);
      break;

    default:
      break;
    }

  if (error_code == NO_ERROR)
    {
      return node;
    }
  else
    {
      if (node != NULL)
    {
      if (node->data_header != NULL)
        {
          free_and_init (node->data_header);
        }
      if (node->udata != NULL)
        {
          free_and_init (node->udata);
        }
      if (node->rdata != NULL)
        {
          free_and_init (node->rdata);
        }
      free_and_init (node);
    }

      return NULL;
    }
}

/*
 * prior_lsa_alloc_and_copy_crumbs -
 *
 * return: new node
 *
 *   rec_type(in):
 *   rcvindex(in):
 *   addr(in):
 *   num_ucrumbs(in):
 *   ucrumbs(in):
 *   num_rcrumbs(in):
 *   rcrumbs(in):
 */
LOG_PRIOR_NODE *
prior_lsa_alloc_and_copy_crumbs (THREAD_ENTRY *thread_p, LOG_RECTYPE rec_type, LOG_RCVINDEX rcvindex,
                 LOG_DATA_ADDR *addr, const int num_ucrumbs,
                 const LOG_CRUMB *ucrumbs, const int num_rcrumbs, const LOG_CRUMB *rcrumbs)
{
  LOG_PRIOR_NODE *node;
  int error = NO_ERROR;

  node = (LOG_PRIOR_NODE *) malloc (sizeof (LOG_PRIOR_NODE));
  if (node == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (LOG_PRIOR_NODE));
      return NULL;
    }

  node->log_header.type = rec_type;

  node->tde_encrypted = false;

  node->data_header_length = 0;
  node->data_header = NULL;
  node->ulength = 0;
  node->udata = NULL;
  node->rlength = 0;
  node->rdata = NULL;
  node->next = NULL;

  switch (rec_type)
    {
    case LOG_UNDOREDO_DATA:
    case LOG_DIFF_UNDOREDO_DATA:
    case LOG_UNDO_DATA:
    case LOG_REDO_DATA:
    case LOG_MVCC_UNDOREDO_DATA:
    case LOG_MVCC_DIFF_UNDOREDO_DATA:
    case LOG_MVCC_UNDO_DATA:
    case LOG_MVCC_REDO_DATA:
      error = prior_lsa_gen_undoredo_record_from_crumbs (thread_p, node, rcvindex, addr, num_ucrumbs, ucrumbs,
          num_rcrumbs, rcrumbs);
      break;

    default:
      /* Unhandled */
      assert_release (false);
      error = ER_FAILED;
      break;
    }

  if (error == NO_ERROR)
    {
      return node;
    }
  else
    {
      if (node != NULL)
    {
      if (node->data_header != NULL)
        {
          free_and_init (node->data_header);
        }
      if (node->udata != NULL)
        {
          free_and_init (node->udata);
        }
      if (node->rdata != NULL)
        {
          free_and_init (node->rdata);
        }
      free_and_init (node);
    }
      return NULL;
    }
}

/*
 * prior_lsa_copy_undo_data_to_node -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   length(in):
 *   data(in):
 */
static int
prior_lsa_copy_undo_data_to_node (LOG_PRIOR_NODE *node, int length, const char *data)
{
  if (length <= 0 || data == NULL)
    {
      return NO_ERROR;
    }

  node->udata = (char *) malloc (length);
  if (node->udata == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

  memcpy (node->udata, data, length);

  node->ulength = length;

  return NO_ERROR;
}

/*
 * prior_lsa_copy_redo_data_to_node -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   length(in):
 *   data(in):
 */
static int
prior_lsa_copy_redo_data_to_node (LOG_PRIOR_NODE *node, int length, const char *data)
{
  if (length <= 0 || data == NULL)
    {
      return NO_ERROR;
    }

  node->rdata = (char *) malloc (length);
  if (node->rdata == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

  memcpy (node->rdata, data, length);

  node->rlength = length;

  return NO_ERROR;
}

/*
 * prior_lsa_copy_undo_crumbs_to_node -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   num_crumbs(in):
 *   crumbs(in):
 */
static int
prior_lsa_copy_undo_crumbs_to_node (LOG_PRIOR_NODE *node, int num_crumbs, const LOG_CRUMB *crumbs)
{
  int i, length;
  char *ptr;

  /* Safe guard: either num_crumbs is 0 or crumbs array is not NULL */
  assert (num_crumbs == 0 || crumbs != NULL);

  for (i = 0, length = 0; i < num_crumbs; i++)
    {
      length += crumbs[i].length;
    }

  assert (node->udata == NULL);
  if (length > 0)
    {
      node->udata = (char *) malloc (length);
      if (node->udata == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

      ptr = node->udata;
      for (i = 0; i < num_crumbs; i++)
    {
      memcpy (ptr, crumbs[i].data, crumbs[i].length);
      ptr += crumbs[i].length;
    }
    }

  node->ulength = length;
  return NO_ERROR;
}

/*
 * prior_lsa_copy_redo_crumbs_to_node -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   num_crumbs(in):
 *   crumbs(in):
 */
static int
prior_lsa_copy_redo_crumbs_to_node (LOG_PRIOR_NODE *node, int num_crumbs, const LOG_CRUMB *crumbs)
{
  int i, length;
  char *ptr;

  /* Safe guard: either num_crumbs is 0 or crumbs array is not NULL */
  assert (num_crumbs == 0 || crumbs != NULL);

  for (i = 0, length = 0; i < num_crumbs; i++)
    {
      length += crumbs[i].length;
    }

  assert (node->rdata == NULL);
  if (length > 0)
    {
      node->rdata = (char *) malloc (length);
      if (node->rdata == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

      ptr = node->rdata;
      for (i = 0; i < num_crumbs; i++)
    {
      memcpy (ptr, crumbs[i].data, crumbs[i].length);
      ptr += crumbs[i].length;
    }
    }

  node->rlength = length;

  return NO_ERROR;
}

/*
 * prior_lsa_gen_undoredo_record_from_crumbs () - Generate undoredo or MVCC
 *                        undoredo log record.
 *
 * return       : Error code.
 * thread_p (in)    : Thread entry.
 * node (in)        : Log prior node.
 * rcvindex (in)    : Index of recovery function.
 * addr (in)        : Logged data address.
 * num_ucrumbs (in) : Number of undo data crumbs.
 * ucrumbs (in)     : Undo data crumbs.
 * num_rcrumbs (in) : Number of redo data crumbs.
 * rcrumbs (in)     : Redo data crumbs.
 */
static int
prior_lsa_gen_undoredo_record_from_crumbs (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_RCVINDEX rcvindex,
    LOG_DATA_ADDR *addr, int num_ucrumbs, const LOG_CRUMB *ucrumbs,
    int num_rcrumbs, const LOG_CRUMB *rcrumbs)
{
  LOG_REC_REDO *redo_p = NULL;
  LOG_REC_UNDO *undo_p = NULL;
  LOG_REC_UNDOREDO *undoredo_p = NULL;
  LOG_REC_MVCC_REDO *mvcc_redo_p = NULL;
  LOG_REC_MVCC_UNDO *mvcc_undo_p = NULL;
  LOG_REC_MVCC_UNDOREDO *mvcc_undoredo_p = NULL;
  LOG_DATA *log_data_p = NULL;
  LOG_VACUUM_INFO *vacuum_info_p = NULL;
  VPID *vpid = NULL;
  int error_code = NO_ERROR;
  int i;
  int ulength, rlength, *data_header_ulength_p = NULL, *data_header_rlength_p = NULL;
  int total_length;
  MVCCID *mvccid_p = NULL;
  LOG_TDES *tdes = NULL;
  char *data_ptr = NULL, *tmp_ptr = NULL;
  char *undo_data = NULL, *redo_data = NULL;
  LOG_ZIP *zip_undo = NULL, *zip_redo = NULL;
  bool is_mvcc_op = LOG_IS_MVCC_OP_RECORD_TYPE (node->log_header.type);
  bool has_undo = false;
  bool has_redo = false;
  bool is_undo_zip = false, is_redo_zip = false, is_diff = false;
  bool can_zip = false;

  assert (node->log_header.type != LOG_DIFF_UNDOREDO_DATA && node->log_header.type != LOG_MVCC_DIFF_UNDOREDO_DATA);
  assert (num_ucrumbs == 0 || ucrumbs != NULL);
  assert (num_rcrumbs == 0 || rcrumbs != NULL);

  zip_undo = log_append_get_zip_undo (thread_p);
  zip_redo = log_append_get_zip_redo (thread_p);

  ulength = 0;
  for (i = 0; i < num_ucrumbs; i++)
    {
      ulength += ucrumbs[i].length;
    }
  assert (0 <= ulength);

  rlength = 0;
  for (i = 0; i < num_rcrumbs; i++)
    {
      rlength += rcrumbs[i].length;
    }
  assert (0 <= rlength);

  /* Check if we have undo or redo and if we can zip */
  if (LOG_IS_UNDOREDO_RECORD_TYPE (node->log_header.type))
    {
      has_undo = true;
      has_redo = true;
      can_zip = log_Zip_support && (zip_undo != NULL || ulength == 0) && (zip_redo != NULL || rlength == 0);
    }
  else if (LOG_IS_REDO_RECORD_TYPE (node->log_header.type))
    {
      has_redo = true;
      can_zip = log_Zip_support && zip_redo;
    }
  else
    {
      /* UNDO type */
      assert (LOG_IS_UNDO_RECORD_TYPE (node->log_header.type));
      has_undo = true;
      can_zip = log_Zip_support && zip_undo;
    }

  if (can_zip == true && (ulength >= log_Zip_min_size_to_compress || rlength >= log_Zip_min_size_to_compress))
    {
      /* Try to zip undo and/or redo data */
      total_length = 0;
      if (ulength > 0)
    {
      total_length += ulength;
    }
      if (rlength > 0)
    {
      total_length += rlength;
    }

      if (log_append_realloc_data_ptr (thread_p, total_length))
    {
      data_ptr = log_append_get_data_ptr (thread_p);
    }

      if (data_ptr != NULL)
    {
      tmp_ptr = data_ptr;

      if (ulength >= log_Zip_min_size_to_compress)
        {
          assert (has_undo == true);

          undo_data = data_ptr;

          for (i = 0; i < num_ucrumbs; i++)
        {
          memcpy (tmp_ptr, (char *) ucrumbs[i].data, ucrumbs[i].length);
          tmp_ptr += ucrumbs[i].length;
        }

          assert (CAST_BUFLEN (tmp_ptr - undo_data) == ulength);
        }

      if (rlength >= log_Zip_min_size_to_compress)
        {
          assert (has_redo == true);

          redo_data = tmp_ptr;

          for (i = 0; i < num_rcrumbs; i++)
        {
          (void) memcpy (tmp_ptr, (char *) rcrumbs[i].data, rcrumbs[i].length);
          tmp_ptr += rcrumbs[i].length;
        }

          assert (CAST_BUFLEN (tmp_ptr - redo_data) == rlength);
        }

      assert (CAST_BUFLEN (tmp_ptr - data_ptr) == total_length
          || ulength < log_Zip_min_size_to_compress || rlength < log_Zip_min_size_to_compress);

      if (ulength >= log_Zip_min_size_to_compress && rlength >= log_Zip_min_size_to_compress)
        {
          (void) log_diff (ulength, undo_data, rlength, redo_data);

          is_undo_zip = log_zip (zip_undo, ulength, undo_data);
          is_redo_zip = log_zip (zip_redo, rlength, redo_data);

          if (is_redo_zip)
        {
          is_diff = true;
        }
        }
      else
        {
          if (ulength >= log_Zip_min_size_to_compress)
        {
          is_undo_zip = log_zip (zip_undo, ulength, undo_data);
        }
          if (rlength >= log_Zip_min_size_to_compress)
        {
          is_redo_zip = log_zip (zip_redo, rlength, redo_data);
        }
        }
    }
    }

  if (is_diff)
    {
      /* Set diff UNDOREDO type */
      assert (has_redo && has_undo);
      if (is_mvcc_op)
    {
      node->log_header.type = LOG_MVCC_DIFF_UNDOREDO_DATA;
    }
      else
    {
      node->log_header.type = LOG_DIFF_UNDOREDO_DATA;
    }
    }

  /* Compute the length of data header */
  switch (node->log_header.type)
    {
    case LOG_MVCC_UNDO_DATA:
      node->data_header_length = sizeof (LOG_REC_MVCC_UNDO);
      break;
    case LOG_UNDO_DATA:
      node->data_header_length = sizeof (LOG_REC_UNDO);
      break;
    case LOG_MVCC_REDO_DATA:
      node->data_header_length = sizeof (LOG_REC_MVCC_REDO);
      break;
    case LOG_REDO_DATA:
      node->data_header_length = sizeof (LOG_REC_REDO);
      break;
    case LOG_MVCC_UNDOREDO_DATA:
    case LOG_MVCC_DIFF_UNDOREDO_DATA:
      node->data_header_length = sizeof (LOG_REC_MVCC_UNDOREDO);
      break;
    case LOG_UNDOREDO_DATA:
    case LOG_DIFF_UNDOREDO_DATA:
      node->data_header_length = sizeof (LOG_REC_UNDOREDO);
      break;
    default:
      assert (0);
      break;
    }

  /* Allocate memory for data header */
  node->data_header = (char *) malloc (node->data_header_length);
  if (node->data_header == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (LOG_REC_UNDOREDO));
      error_code = ER_OUT_OF_VIRTUAL_MEMORY;
      goto error;
    }

#if !defined (NDEBUG)
  /* Suppress valgrind complaint. */
  memset (node->data_header, 0, node->data_header_length);
#endif // DEBUG

  /* Fill the data header fields */
  switch (node->log_header.type)
    {
    case LOG_MVCC_UNDO_DATA:
      /* Use undo data from MVCC undo structure */
      mvcc_undo_p = (LOG_REC_MVCC_UNDO *) node->data_header;

      /* Must also fill vacuum info */
      vacuum_info_p = &mvcc_undo_p->vacuum_info;

      /* Must also fill MVCCID field */
      mvccid_p = &mvcc_undo_p->mvccid;

      [[fallthrough]];
    case LOG_UNDO_DATA:
      undo_p = (node->log_header.type == LOG_UNDO_DATA ? (LOG_REC_UNDO *) node->data_header : &mvcc_undo_p->undo);

      data_header_ulength_p = &undo_p->length;
      log_data_p = &undo_p->data;
      break;

    case LOG_MVCC_REDO_DATA:
      /* Use redo data from MVCC redo structure */
      mvcc_redo_p = (LOG_REC_MVCC_REDO *) node->data_header;

      /* Must also fill MVCCID field */
      mvccid_p = &mvcc_redo_p->mvccid;

      [[fallthrough]];
    case LOG_REDO_DATA:
      redo_p = (node->log_header.type == LOG_REDO_DATA ? (LOG_REC_REDO *) node->data_header : &mvcc_redo_p->redo);

      data_header_rlength_p = &redo_p->length;
      log_data_p = &redo_p->data;
      break;

    case LOG_MVCC_UNDOREDO_DATA:
    case LOG_MVCC_DIFF_UNDOREDO_DATA:
      /* Use undoredo data from MVCC undoredo structure */
      mvcc_undoredo_p = (LOG_REC_MVCC_UNDOREDO *) node->data_header;

      /* Must also fill vacuum info */
      vacuum_info_p = &mvcc_undoredo_p->vacuum_info;

      /* Must also fill MVCCID field */
      mvccid_p = &mvcc_undoredo_p->mvccid;

      [[fallthrough]];
    case LOG_UNDOREDO_DATA:
    case LOG_DIFF_UNDOREDO_DATA:
      undoredo_p = ((node->log_header.type == LOG_UNDOREDO_DATA || node->log_header.type == LOG_DIFF_UNDOREDO_DATA)
            ? (LOG_REC_UNDOREDO *) node->data_header : &mvcc_undoredo_p->undoredo);

      data_header_ulength_p = &undoredo_p->ulength;
      data_header_rlength_p = &undoredo_p->rlength;
      log_data_p = &undoredo_p->data;
      break;

    default:
      assert (0);
      break;
    }

  /* Fill log data fields */
  assert (log_data_p != NULL);

  log_data_p->rcvindex = rcvindex;
  log_data_p->offset = addr->offset;

  if (addr->pgptr != NULL)
    {
      vpid = pgbuf_get_vpid_ptr (addr->pgptr);
      log_data_p->pageid = vpid->pageid;
      log_data_p->volid = vpid->volid;
    }
  else
    {
      log_data_p->pageid = NULL_PAGEID;
      log_data_p->volid = NULL_VOLID;
    }

  if (mvccid_p != NULL)
    {
      /* Fill mvccid field */

      /* Must be an MVCC operation */
      assert (LOG_IS_MVCC_OP_RECORD_TYPE (node->log_header.type));
      assert (LOG_IS_MVCC_OPERATION (rcvindex));

      tdes = LOG_FIND_CURRENT_TDES (thread_p);
      if (tdes == NULL || !MVCCID_IS_VALID (tdes->mvccinfo.id))
    {
      assert_release (false);
      error_code = ER_FAILED;
      goto error;
    }
      else
    {
      if (!tdes->mvccinfo.sub_ids.empty ())
        {
          *mvccid_p = tdes->mvccinfo.sub_ids.back ();
        }
      else
        {
          *mvccid_p = tdes->mvccinfo.id;
        }
    }
    }

  if (vacuum_info_p != NULL)
    {
      /* Fill vacuum info field */

      /* Must be an UNDO or UNDOREDO MVCC operation */
      assert (node->log_header.type == LOG_MVCC_UNDO_DATA || node->log_header.type == LOG_MVCC_UNDOREDO_DATA
          || node->log_header.type == LOG_MVCC_DIFF_UNDOREDO_DATA);
      assert (LOG_IS_MVCC_OPERATION (rcvindex));

      if (addr->vfid != NULL)
    {
      VFID_COPY (&vacuum_info_p->vfid, addr->vfid);
    }
      else
    {
      if (rcvindex == RVES_NOTIFY_VACUUM)
        {
          VFID_SET_NULL (&vacuum_info_p->vfid);
        }
      else
        {
          /* We require VFID for vacuum */
          assert_release (false);
          error_code = ER_FAILED;
          goto error;
        }
    }

      /* Initialize previous MVCC op log lsa - will be completed later */
      LSA_SET_NULL (&vacuum_info_p->prev_mvcc_op_log_lsa);
    }

  if (is_undo_zip)
    {
      assert (has_undo && (data_header_ulength_p != NULL));

      *data_header_ulength_p = MAKE_ZIP_LEN (zip_undo->data_length);
      error_code = prior_lsa_copy_undo_data_to_node (node, zip_undo->data_length, (char *) zip_undo->log_data);
    }
  else if (has_undo)
    {
      assert (data_header_ulength_p != NULL);

      *data_header_ulength_p = ulength;
      error_code = prior_lsa_copy_undo_crumbs_to_node (node, num_ucrumbs, ucrumbs);
    }

  if (is_redo_zip)
    {
      assert (has_redo && (data_header_rlength_p != NULL));

      *data_header_rlength_p = MAKE_ZIP_LEN (zip_redo->data_length);
      error_code = prior_lsa_copy_redo_data_to_node (node, zip_redo->data_length, (char *) zip_redo->log_data);
    }
  else if (has_redo)
    {
      *data_header_rlength_p = rlength;
      error_code = prior_lsa_copy_redo_crumbs_to_node (node, num_rcrumbs, rcrumbs);
    }

  if (error_code != NO_ERROR)
    {
      goto error;
    }

  return error_code;

error:
  if (node->data_header != NULL)
    {
      free_and_init (node->data_header);
    }
  if (node->udata != NULL)
    {
      free_and_init (node->udata);
    }
  if (node->rdata != NULL)
    {
      free_and_init (node->rdata);
    }

  return error_code;
}

/*
 * prior_lsa_gen_postpone_record -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   rcvindex(in):
 *   addr(in):
 *   length(in):
 *   data(in):
 */
static int
prior_lsa_gen_postpone_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_RCVINDEX rcvindex,
                   LOG_DATA_ADDR *addr, int length, const char *data)
{
  LOG_REC_REDO *redo;
  VPID *vpid;
  int error_code = NO_ERROR;

  node->data_header_length = sizeof (LOG_REC_REDO);
  node->data_header = (char *) malloc (node->data_header_length);
  if (node->data_header == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) node->data_header_length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }
  redo = (LOG_REC_REDO *) node->data_header;

  redo->data.rcvindex = rcvindex;
  if (addr->pgptr != NULL)
    {
      vpid = pgbuf_get_vpid_ptr (addr->pgptr);
      redo->data.pageid = vpid->pageid;
      redo->data.volid = vpid->volid;
    }
  else
    {
      redo->data.pageid = NULL_PAGEID;
      redo->data.volid = NULL_VOLID;
    }
  redo->data.offset = addr->offset;

  redo->length = length;
  error_code = prior_lsa_copy_redo_data_to_node (node, redo->length, data);

  return error_code;
}

/*
 * prior_lsa_gen_dbout_redo_record -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   rcvindex(in):
 *   length(in):
 *   data(in):
 */
static int
prior_lsa_gen_dbout_redo_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_RCVINDEX rcvindex, int length,
                 const char *data)
{
  LOG_REC_DBOUT_REDO *dbout_redo;
  int error_code = NO_ERROR;

  node->data_header_length = sizeof (LOG_REC_DBOUT_REDO);
  node->data_header = (char *) malloc (node->data_header_length);
  if (node->data_header == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) node->data_header_length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }
  dbout_redo = (LOG_REC_DBOUT_REDO *) node->data_header;

  dbout_redo->rcvindex = rcvindex;
  dbout_redo->length = length;

  error_code = prior_lsa_copy_redo_data_to_node (node, dbout_redo->length, data);

  return error_code;
}

/*
 * prior_lsa_gen_2pc_prepare_record -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   gtran_length(in):
 *   gtran_data(in):
 *   lock_length(in):
 *   lock_data(in):
 */
static int
prior_lsa_gen_2pc_prepare_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, int gtran_length,
                  const char *gtran_data, int lock_length, const char *lock_data)
{
  int error_code = NO_ERROR;

  node->data_header_length = sizeof (LOG_REC_2PC_PREPCOMMIT);
  node->data_header = (char *) malloc (node->data_header_length);
  if (node->data_header == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) node->data_header_length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

  if (gtran_length > 0)
    {
      error_code = prior_lsa_copy_undo_data_to_node (node, gtran_length, gtran_data);
    }
  if (lock_length > 0)
    {
      error_code = prior_lsa_copy_redo_data_to_node (node, lock_length, lock_data);
    }

  return error_code;
}

/*
 * prior_lsa_gen_end_chkpt_record -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   tran_length(in):
 *   tran_data(in):
 *   topop_length(in):
 *   topop_data(in):
 */
static int
prior_lsa_gen_end_chkpt_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, int tran_length, const char *tran_data,
                int topop_length, const char *topop_data)
{
  int error_code = NO_ERROR;

  node->data_header_length = sizeof (LOG_REC_CHKPT);
  node->data_header = (char *) malloc (node->data_header_length);
  if (node->data_header == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) node->data_header_length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

  if (tran_length > 0)
    {
      error_code = prior_lsa_copy_undo_data_to_node (node, tran_length, tran_data);
    }
  if (topop_length > 0)
    {
      error_code = prior_lsa_copy_redo_data_to_node (node, topop_length, topop_data);
    }

  return error_code;
}

/*
 * prior_lsa_gen_record -
 *
 * return: error code or NO_ERROR
 *
 *   node(in/out):
 *   rec_type(in):
 *   length(in):
 *   data(in):
 */
static int
prior_lsa_gen_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_RECTYPE rec_type, int length,
              const char *data)
{
  int error_code = NO_ERROR;

  node->data_header_length = 0;
  switch (rec_type)
    {
    case LOG_DUMMY_HEAD_POSTPONE:
    case LOG_DUMMY_CRASH_RECOVERY:
    case LOG_DUMMY_OVF_RECORD:
    case LOG_DUMMY_GENERIC:
    case LOG_2PC_COMMIT_DECISION:
    case LOG_2PC_ABORT_DECISION:
    case LOG_2PC_COMMIT_INFORM_PARTICPS:
    case LOG_2PC_ABORT_INFORM_PARTICPS:
    case LOG_START_CHKPT:
    case LOG_SYSOP_ATOMIC_START:
      assert (length == 0 && data == NULL);
      break;

    case LOG_RUN_POSTPONE:
      node->data_header_length = sizeof (LOG_REC_RUN_POSTPONE);
      break;

    case LOG_COMPENSATE:
      node->data_header_length = sizeof (LOG_REC_COMPENSATE);
      break;

    case LOG_DUMMY_HA_SERVER_STATE:
      assert (length == 0 && data == NULL);
      node->data_header_length = sizeof (LOG_REC_HA_SERVER_STATE);
      break;

    case LOG_SAVEPOINT:
      node->data_header_length = sizeof (LOG_REC_SAVEPT);
      break;

    case LOG_COMMIT_WITH_POSTPONE:
      node->data_header_length = sizeof (LOG_REC_START_POSTPONE);
      break;

    case LOG_COMMIT_WITH_POSTPONE_OBSOLETE:
      node->data_header_length = sizeof (LOG_REC_START_POSTPONE_OBSOLETE);
      break;

    case LOG_SYSOP_START_POSTPONE:
      node->data_header_length = sizeof (LOG_REC_SYSOP_START_POSTPONE);
      break;

    case LOG_COMMIT:
    case LOG_ABORT:
      assert (length == 0 && data == NULL);
      node->data_header_length = sizeof (LOG_REC_DONETIME);
      break;

    case LOG_SYSOP_END:
      node->data_header_length = sizeof (LOG_REC_SYSOP_END);
      break;

    case LOG_REPLICATION_DATA:
    case LOG_REPLICATION_STATEMENT:
      node->data_header_length = sizeof (LOG_REC_REPLICATION);
      break;

    case LOG_2PC_START:
      node->data_header_length = sizeof (LOG_REC_2PC_START);
      break;

    case LOG_END_CHKPT:
      node->data_header_length = sizeof (LOG_REC_CHKPT);
      break;
    case LOG_SUPPLEMENTAL_INFO:
      node->data_header_length = sizeof (LOG_REC_SUPPLEMENT);
      break;
    default:
      break;
    }

  if (node->data_header_length > 0)
    {
      node->data_header = (char *) malloc (node->data_header_length);
      if (node->data_header == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) node->data_header_length);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

#if !defined (NDEBUG)
      /* Suppress valgrind complaint. */
      memset (node->data_header, 0, node->data_header_length);
#endif // DEBUG
    }

  if (length > 0)
    {
      error_code = prior_lsa_copy_undo_data_to_node (node, length, data);
    }

  return error_code;
}

static void
prior_update_header_mvcc_info (const LOG_LSA &record_lsa, MVCCID mvccid)
{
  assert (MVCCID_IS_VALID (mvccid));
  if (!log_Gl.hdr.does_block_need_vacuum)
    {
      // first mvcc record for this block
      log_Gl.hdr.oldest_visible_mvccid = log_Gl.mvcc_table.get_global_oldest_visible ();
      log_Gl.hdr.newest_block_mvccid = mvccid;
    }
  else
    {
      // sanity checks
      assert (MVCCID_IS_VALID (log_Gl.hdr.oldest_visible_mvccid));
      assert (MVCCID_IS_VALID (log_Gl.hdr.newest_block_mvccid));
      assert (log_Gl.hdr.oldest_visible_mvccid <= mvccid);
      assert (!log_Gl.hdr.mvcc_op_log_lsa.is_null ());
      assert (vacuum_get_log_blockid (log_Gl.hdr.mvcc_op_log_lsa.pageid) == vacuum_get_log_blockid (record_lsa.pageid));

      if (log_Gl.hdr.newest_block_mvccid < mvccid)
    {
      log_Gl.hdr.newest_block_mvccid = mvccid;
    }
    }
  log_Gl.hdr.mvcc_op_log_lsa = record_lsa;
  log_Gl.hdr.does_block_need_vacuum = true;
}

/*
 * prior_lsa_next_record_internal -
 *
 * return: start lsa of log record
 *
 *   node(in/out):
 *   tdes(in/out):
 *   with_lock(in):
 */
static LOG_LSA
prior_lsa_next_record_internal (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_TDES *tdes, int with_lock)
{
  LOG_LSA start_lsa;
  LOG_REC_MVCC_UNDO *mvcc_undo = NULL;
  LOG_REC_MVCC_UNDOREDO *mvcc_undoredo = NULL;
  LOG_VACUUM_INFO *vacuum_info = NULL;
  MVCCID mvccid = MVCCID_NULL;

  if (with_lock == LOG_PRIOR_LSA_WITHOUT_LOCK)
    {
      log_Gl.prior_info.prior_lsa_mutex.lock ();
    }

  prior_lsa_start_append (thread_p, node, tdes);

  LSA_COPY (&start_lsa, &node->start_lsa);

  if (LOG_ISRESTARTED () && log_Gl.hdr.does_block_need_vacuum)
    {
      assert (!LSA_ISNULL (&log_Gl.hdr.mvcc_op_log_lsa));
      if (vacuum_get_log_blockid (log_Gl.hdr.mvcc_op_log_lsa.pageid) != vacuum_get_log_blockid (start_lsa.pageid))
    {
      assert (vacuum_get_log_blockid (log_Gl.hdr.mvcc_op_log_lsa.pageid)
          <= (vacuum_get_log_blockid (start_lsa.pageid) - 1));

      vacuum_produce_log_block_data (thread_p);
    }
    }

  /* Is this a valid MVCC operations: 1. node must be undoredo/undo type and must have undo data. 2. record index must
   * the index of MVCC operations. */
  if (node->log_header.type == LOG_MVCC_UNDO_DATA || node->log_header.type == LOG_MVCC_UNDOREDO_DATA
      || node->log_header.type == LOG_MVCC_DIFF_UNDOREDO_DATA
      || (node->log_header.type == LOG_SYSOP_END
      && ((LOG_REC_SYSOP_END *) node->data_header)->type == LOG_SYSOP_END_LOGICAL_MVCC_UNDO))
    {
      /* Link the log record to previous MVCC delete/update log record */
      /* Will be used by vacuum */
      if (node->log_header.type == LOG_MVCC_UNDO_DATA)
    {
      /* Read from mvcc_undo structure */
      mvcc_undo = (LOG_REC_MVCC_UNDO *) node->data_header;
      vacuum_info = &mvcc_undo->vacuum_info;
      mvccid = mvcc_undo->mvccid;
    }
      else if (node->log_header.type == LOG_SYSOP_END)
    {
      /* Read from mvcc_undo structure */
      mvcc_undo = & ((LOG_REC_SYSOP_END *) node->data_header)->mvcc_undo;
      vacuum_info = &mvcc_undo->vacuum_info;
      mvccid = mvcc_undo->mvccid;
    }
      else
    {
      /* Read for mvcc_undoredo structure */
      assert (node->log_header.type == LOG_MVCC_UNDOREDO_DATA
          || node->log_header.type == LOG_MVCC_DIFF_UNDOREDO_DATA);

      mvcc_undoredo = (LOG_REC_MVCC_UNDOREDO *) node->data_header;
      vacuum_info = &mvcc_undoredo->vacuum_info;
      mvccid = mvcc_undoredo->mvccid;
    }

      /* Save previous mvcc operation log lsa to vacuum info */
      LSA_COPY (&vacuum_info->prev_mvcc_op_log_lsa, &log_Gl.hdr.mvcc_op_log_lsa);

      vacuum_er_log (VACUUM_ER_LOG_LOGGING,
             "log mvcc op at (%lld, %d) and create link with log_lsa(%lld, %d)",
             LSA_AS_ARGS (&node->start_lsa), LSA_AS_ARGS (&log_Gl.hdr.mvcc_op_log_lsa));

      prior_update_header_mvcc_info (start_lsa, mvccid);
    }
  else if (node->log_header.type == LOG_SYSOP_START_POSTPONE)
    {
      /* we need the system operation start postpone LSA for recovery. we have to save it under prior_lsa_mutex
       * protection.
       * at the same time, tdes->rcv.atomic_sysop_start_lsa must be reset if it was inside this system op. */
      LOG_REC_SYSOP_START_POSTPONE *sysop_start_postpone = NULL;

      assert (LSA_ISNULL (&tdes->rcv.sysop_start_postpone_lsa));
      tdes->rcv.sysop_start_postpone_lsa = start_lsa;

      sysop_start_postpone = (LOG_REC_SYSOP_START_POSTPONE *) node->data_header;
      if (LSA_LT (&sysop_start_postpone->sysop_end.lastparent_lsa, &tdes->rcv.atomic_sysop_start_lsa))
    {
      /* atomic system operation finished. */
      LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
    }

      /* for correct checkpoint, this state change must be done under the protection of prior_lsa_mutex */
      tdes->state = TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE;
    }
  else if (node->log_header.type == LOG_SYSOP_END)
    {
      /* reset tdes->rcv.sysop_start_postpone_lsa and tdes->rcv.atomic_sysop_start_lsa, if this system op is not nested.
       * we'll use lastparent_lsa to check if system op is nested or not. */
      LOG_REC_SYSOP_END *sysop_end = NULL;

      sysop_end = (LOG_REC_SYSOP_END *) node->data_header;
      if (!LSA_ISNULL (&tdes->rcv.atomic_sysop_start_lsa)
      && LSA_LT (&sysop_end->lastparent_lsa, &tdes->rcv.atomic_sysop_start_lsa))
    {
      /* atomic system operation finished. */
      LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
    }
      if (!LSA_ISNULL (&tdes->rcv.sysop_start_postpone_lsa)
      && LSA_LT (&sysop_end->lastparent_lsa, &tdes->rcv.sysop_start_postpone_lsa))
    {
      /* atomic system operation finished. */
      LSA_SET_NULL (&tdes->rcv.sysop_start_postpone_lsa);
    }
    }
  else if (node->log_header.type == LOG_COMMIT_WITH_POSTPONE
       || node->log_header.type == LOG_COMMIT_WITH_POSTPONE_OBSOLETE)
    {
      /* we need the commit with postpone LSA for recovery. we have to save it under prior_lsa_mutex protection */
      tdes->rcv.tran_start_postpone_lsa = start_lsa;
    }
  else if (node->log_header.type == LOG_SYSOP_ATOMIC_START)
    {
      /* same as with system op start postpone, we need to save these log records lsa */
      assert (LSA_ISNULL (&tdes->rcv.atomic_sysop_start_lsa));
      tdes->rcv.atomic_sysop_start_lsa = start_lsa;
    }
  else if (node->log_header.type == LOG_COMMIT || node->log_header.type == LOG_ABORT)
    {
      /* mark the commit/abort in the transaction,  */
      assert (tdes->commit_abort_lsa.is_null ());
      LSA_COPY (&tdes->commit_abort_lsa, &start_lsa);
    }

  log_prior_lsa_append_advance_when_doesnot_fit (node->data_header_length);
  log_prior_lsa_append_add_align (node->data_header_length);

  if (node->ulength > 0)
    {
      prior_lsa_append_data (node->ulength);
    }

  if (node->rlength > 0)
    {
      prior_lsa_append_data (node->rlength);
    }

  /* END append */
  prior_lsa_end_append (thread_p, node);

  if (log_Gl.prior_info.prior_list_tail == NULL)
    {
      log_Gl.prior_info.prior_list_header = node;
      log_Gl.prior_info.prior_list_tail = node;
    }
  else
    {
      log_Gl.prior_info.prior_list_tail->next = node;
      log_Gl.prior_info.prior_list_tail = node;
    }

  /* list_size in bytes */
  log_Gl.prior_info.list_size += (sizeof (LOG_PRIOR_NODE) + node->data_header_length + node->ulength + node->rlength);

  if (with_lock == LOG_PRIOR_LSA_WITHOUT_LOCK)
    {
      log_Gl.prior_info.prior_lsa_mutex.unlock ();

      if (log_Gl.prior_info.list_size >= (INT64) logpb_get_memsize ())
    {
      perfmon_inc_stat (thread_p, PSTAT_PRIOR_LSA_LIST_MAXED);

#if defined(SERVER_MODE)
      if (!log_is_in_crash_recovery ())
        {
          log_wakeup_log_flush_daemon ();

          thread_sleep (1); /* 1msec */
        }
      else
        {
          LOG_CS_ENTER (thread_p);
          logpb_prior_lsa_append_all_list (thread_p);
          LOG_CS_EXIT (thread_p);
        }
#else
      LOG_CS_ENTER (thread_p);
      logpb_prior_lsa_append_all_list (thread_p);
      LOG_CS_EXIT (thread_p);
#endif
    }
    }

  tdes->num_log_records_written++;

  return start_lsa;
}

LOG_LSA
prior_lsa_next_record (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, log_tdes *tdes)
{
  return prior_lsa_next_record_internal (thread_p, node, tdes, LOG_PRIOR_LSA_WITHOUT_LOCK);
}

LOG_LSA
prior_lsa_next_record_with_lock (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, log_tdes *tdes)
{
  return prior_lsa_next_record_internal (thread_p, node, tdes, LOG_PRIOR_LSA_WITH_LOCK);
}

int
prior_set_tde_encrypted (log_prior_node *node, LOG_RCVINDEX recvindex)
{
  if (!tde_is_loaded())
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TDE_CIPHER_IS_NOT_LOADED, 0);
      return ER_TDE_CIPHER_IS_NOT_LOADED;
    }

  tde_er_log ("prior_set_tde_encrypted(): rcvindex = %s\n", rv_rcvindex_string (recvindex));

  node->tde_encrypted = true;

  return NO_ERROR;
}

bool
prior_is_tde_encrypted (const log_prior_node *node)
{
  return node->tde_encrypted;
}

/*
 * prior_lsa_start_append:
 *
 *   node(in/out):
 *   tdes(in):
 */
static void
prior_lsa_start_append (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, LOG_TDES *tdes)
{
  /* Does the new log record fit in this page ? */
  log_prior_lsa_append_advance_when_doesnot_fit (sizeof (LOG_RECORD_HEADER));

  node->log_header.trid = tdes->trid;

  /*
   * Link the record with the previous transaction record for quick undos.
   * Link the record backward for backward traversal of the log.
   */
  LSA_COPY (&node->start_lsa, &log_Gl.prior_info.prior_lsa);

  if (tdes->is_system_worker_transaction () && !tdes->is_under_sysop ())
    {
      // lose the link to previous record
      LSA_SET_NULL (&node->log_header.prev_tranlsa);
      LSA_SET_NULL (&tdes->head_lsa);
      LSA_SET_NULL (&tdes->tail_lsa);
    }
  else
    {
      LSA_COPY (&node->log_header.prev_tranlsa, &tdes->tail_lsa);

      LSA_COPY (&tdes->tail_lsa, &log_Gl.prior_info.prior_lsa);

      /*
       * Is this the first log record of transaction ?
       */
      if (LSA_ISNULL (&tdes->head_lsa))
    {
      LSA_COPY (&tdes->head_lsa, &tdes->tail_lsa);
    }

      LSA_COPY (&tdes->undo_nxlsa, &log_Gl.prior_info.prior_lsa);
    }

  /*
   * Remember the address of new append record
   */
  LSA_COPY (&node->log_header.back_lsa, &log_Gl.prior_info.prev_lsa);
  LSA_SET_NULL (&node->log_header.forw_lsa);

  LSA_COPY (&log_Gl.prior_info.prev_lsa, &log_Gl.prior_info.prior_lsa);

  /*
   * Set the page dirty, increase and align the append offset
   */
  log_prior_lsa_append_add_align (sizeof (LOG_RECORD_HEADER));
}

/*
 * prior_lsa_end_append -
 *
 * return:
 *
 *   node(in/out):
 */
static void
prior_lsa_end_append (THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node)
{
  log_prior_lsa_append_align ();
  log_prior_lsa_append_advance_when_doesnot_fit (sizeof (LOG_RECORD_HEADER));

  LSA_COPY (&node->log_header.forw_lsa, &log_Gl.prior_info.prior_lsa);
}

static void
prior_lsa_append_data (int length)
{
  int copy_length;      /* Amount of contiguous data that can be copied */
  int current_offset;
  int last_offset;

  if (length == 0)
    {
      return;
    }

  /*
   * Align if needed,
   * don't set it dirty since this function has not updated
   */
  log_prior_lsa_append_align ();

  current_offset = (int) log_Gl.prior_info.prior_lsa.offset;
  last_offset = (int) LOG_PRIOR_LSA_LAST_APPEND_OFFSET ();

  /* Does data fit completely in current page ? */
  if ((current_offset + length) >= last_offset)
    {
      while (length > 0)
    {
      if (current_offset >= last_offset)
        {
          /*
           * Get next page and set the current one dirty
           */
          log_Gl.prior_info.prior_lsa.pageid++;
          log_Gl.prior_info.prior_lsa.offset = 0;

          current_offset = 0;
          last_offset = (int) LOG_PRIOR_LSA_LAST_APPEND_OFFSET ();
        }
      /* Find the amount of contiguous data that can be copied */
      if (current_offset + length >= last_offset)
        {
          copy_length = CAST_BUFLEN (last_offset - current_offset);
        }
      else
        {
          copy_length = length;
        }

      current_offset += copy_length;
      length -= copy_length;
      log_Gl.prior_info.prior_lsa.offset += copy_length;
    }
    }
  else
    {
      log_Gl.prior_info.prior_lsa.offset += length;
    }

  /*
   * Align the data for future appends.
   * Indicate that modifications were done
   */
  log_prior_lsa_append_align ();
}

LOG_ZIP *
log_append_get_zip_undo (THREAD_ENTRY *thread_p)
{
#if defined (SERVER_MODE)
  if (thread_p == NULL)
    {
      thread_p = thread_get_thread_entry_info ();
    }

  if (thread_p == NULL)
    {
      return NULL;
    }
  else
    {
      if (thread_p->log_zip_undo == NULL)
    {
      thread_p->log_zip_undo = log_zip_alloc (IO_PAGESIZE);
    }
      return (LOG_ZIP *) thread_p->log_zip_undo;
    }
#else
  return log_zip_undo;
#endif
}

LOG_ZIP *
log_append_get_zip_redo (THREAD_ENTRY *thread_p)
{
#if defined (SERVER_MODE)
  if (thread_p == NULL)
    {
      thread_p = thread_get_thread_entry_info ();
    }

  if (thread_p == NULL)
    {
      return NULL;
    }
  else
    {
      if (thread_p->log_zip_redo == NULL)
    {
      thread_p->log_zip_redo = log_zip_alloc (IO_PAGESIZE);
    }
      return (LOG_ZIP *) thread_p->log_zip_redo;
    }
#else
  return log_zip_redo;
#endif
}

/*
 * log_append_realloc_data_ptr -
 *
 * return:
 *
 *   data_length(in):
 *   length(in):
 *
 * NOTE:
 */
static bool
log_append_realloc_data_ptr (THREAD_ENTRY *thread_p, int length)
{
  char *data_ptr;
  int alloc_len;
#if defined (SERVER_MODE)
  if (thread_p == NULL)
    {
      thread_p = thread_get_thread_entry_info ();
    }

  if (thread_p == NULL)
    {
      return false;
    }

  if (thread_p->log_data_length < length)
    {
      alloc_len = ((int) CEIL_PTVDIV (length, IO_PAGESIZE)) * IO_PAGESIZE;

      data_ptr = (char *) realloc (thread_p->log_data_ptr, alloc_len);
      if (data_ptr == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) alloc_len);
      if (thread_p->log_data_ptr)
        {
          free_and_init (thread_p->log_data_ptr);
        }
      thread_p->log_data_length = 0;
      return false;
    }
      else
    {
      thread_p->log_data_ptr = data_ptr;
      thread_p->log_data_length = alloc_len;
    }
    }
  return true;
#else
  if (log_data_length < length)
    {
      alloc_len = ((int) CEIL_PTVDIV (length, IO_PAGESIZE)) * IO_PAGESIZE;

      data_ptr = (char *) realloc (log_data_ptr, alloc_len);
      if (data_ptr == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) alloc_len);
      if (log_data_ptr)
        {
          free_and_init (log_data_ptr);
        }
      log_data_length = 0;
      return false;
    }
      else
    {
      log_data_ptr = data_ptr;
      log_data_length = alloc_len;
    }
    }

  return true;
#endif
}

/*
 * log_append_get_data_ptr  -
 *
 * return:
 *
 */
static char *
log_append_get_data_ptr (THREAD_ENTRY *thread_p)
{
#if defined (SERVER_MODE)
  if (thread_p == NULL)
    {
      thread_p = thread_get_thread_entry_info ();
    }

  if (thread_p == NULL)
    {
      return NULL;
    }
  else
    {
      if (thread_p->log_data_ptr == NULL)
    {
      thread_p->log_data_length = IO_PAGESIZE * 2;
      thread_p->log_data_ptr = (char *) malloc (thread_p->log_data_length);

      if (thread_p->log_data_ptr == NULL)
        {
          thread_p->log_data_length = 0;
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
              (size_t) thread_p->log_data_length);
        }
    }
      return thread_p->log_data_ptr;
    }
#else
  return log_data_ptr;
#endif
}

static void
log_prior_lsa_append_align ()
{
  assert (log_Gl.prior_info.prior_lsa.offset >= 0);

  log_Gl.prior_info.prior_lsa.offset = DB_ALIGN (log_Gl.prior_info.prior_lsa.offset, DOUBLE_ALIGNMENT);
  if ((size_t) log_Gl.prior_info.prior_lsa.offset >= (size_t) LOGAREA_SIZE)
    {
      log_Gl.prior_info.prior_lsa.pageid++;
      log_Gl.prior_info.prior_lsa.offset = 0;
    }
}

static void
log_prior_lsa_append_advance_when_doesnot_fit (size_t length)
{
  assert (log_Gl.prior_info.prior_lsa.offset >= 0);

  if ((size_t) log_Gl.prior_info.prior_lsa.offset + length >= (size_t) LOGAREA_SIZE)
    {
      log_Gl.prior_info.prior_lsa.pageid++;
      log_Gl.prior_info.prior_lsa.offset = 0;
    }
}

static void
log_prior_lsa_append_add_align (size_t add)
{
  assert (log_Gl.prior_info.prior_lsa.offset >= 0);

  log_Gl.prior_info.prior_lsa.offset += (add);
  log_prior_lsa_append_align ();
}