Skip to content

File log_recovery.c

File List > cubrid > src > transaction > log_recovery.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * log_recovery.c -
 */

#ident "$Id$"

#include "log_recovery.h"

#include "boot_sr.h"
#include "locator_sr.h"
#include "log_impl.h"
#include "log_lsa.hpp"
#include "log_manager.h"
#include "log_reader.hpp"
#include "log_recovery_redo_perf.hpp"
#include "log_recovery_redo_parallel.hpp"
#include "log_system_tran.hpp"
#include "log_volids.hpp"
#include "message_catalog.h"
#include "msgcat_set_log.hpp"
#include "object_representation.h"
#include "page_buffer.h"
#include "porting_inline.hpp"
#include "recovery.h"
#include "slotted_page.h"
#include "system_parameter.h"
#include "thread_manager.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

static void log_rv_undo_record (THREAD_ENTRY * thread_p, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
                LOG_RCVINDEX rcvindex, const VPID * rcv_vpid, LOG_RCV * rcv,
                const LOG_LSA * rcv_lsa_ptr, LOG_TDES * tdes, LOG_ZIP * undo_unzip_ptr);

static bool log_rv_find_checkpoint (THREAD_ENTRY * thread_p, VOLID volid, LOG_LSA * rcv_lsa);
static int log_rv_analysis_undo_redo (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_dummy_head_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_run_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
                     LOG_LSA * check_point);
static int log_rv_analysis_compensate (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p);
static int log_rv_analysis_commit_with_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa,
                         LOG_PAGE * log_page_p, LOG_LSA * prev_lsa, bool is_media_crash,
                         time_t * stop_at, bool * did_incom_recovery);
static int log_rv_analysis_commit_with_postpone_obsolete (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa,
                              LOG_PAGE * log_page_p);
static int log_rv_analysis_sysop_start_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa,
                         LOG_PAGE * log_page_p);
static int log_rv_analysis_atomic_sysop_start (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_complete (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
                     LOG_LSA * prev_lsa, bool is_media_crash, time_t * stop_at,
                     bool * did_incom_recovery);
static int log_rv_analysis_sysop_end (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p);
static int log_rv_analysis_start_checkpoint (LOG_LSA * log_lsa, LOG_LSA * start_lsa, bool * may_use_checkpoint);
static int log_rv_analysis_end_checkpoint (THREAD_ENTRY * thread_p, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
                       LOG_LSA * check_point, LOG_LSA * start_redo_lsa, bool * may_use_checkpoint,
                       bool * may_need_synch_checkpoint_2pc);
static int log_rv_analysis_save_point (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_2pc_prepare (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_2pc_start (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_2pc_commit_decision (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_2pc_abort_decision (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_2pc_commit_inform_particps (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_2pc_abort_inform_particps (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_2pc_recv_ack (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa);
static int log_rv_analysis_log_end (int tran_id, LOG_LSA * log_lsa);
static void log_rv_analysis_record (THREAD_ENTRY * thread_p, LOG_RECTYPE log_type, int tran_id, LOG_LSA * log_lsa,
                    LOG_PAGE * log_page_p, LOG_LSA * check_point, LOG_LSA * prev_lsa,
                    LOG_LSA * start_lsa, LOG_LSA * start_redo_lsa, bool is_media_crash,
                    time_t * stop_at, bool * did_incom_recovery, bool * may_use_checkpoint,
                    bool * may_need_synch_checkpoint_2pc);
static bool log_is_page_of_record_broken (THREAD_ENTRY * thread_p, const LOG_LSA * log_lsa,
                      const LOG_RECORD_HEADER * log_rec_header);
static void log_recovery_analysis (THREAD_ENTRY * thread_p, LOG_LSA * start_lsa, LOG_LSA * start_redolsa,
                   LOG_LSA * end_redo_lsa, bool ismedia_crash, time_t * stopat,
                   bool * did_incom_recovery, INT64 * num_redo_log_records);
static bool log_recovery_needs_skip_logical_redo (THREAD_ENTRY * thread_p, TRANID tran_id, LOG_RECTYPE log_rtype,
                          LOG_RCVINDEX rcv_index, const LOG_LSA * lsa);
#if defined (SERVER_MODE)
static int log_recovery_get_redo_parallel_count ();
#endif

static void log_recovery_redo (THREAD_ENTRY * thread_p, const LOG_LSA * start_redolsa, const LOG_LSA * end_redo_lsa);
static void log_recovery_abort_interrupted_sysop (THREAD_ENTRY * thread_p, LOG_TDES * tdes,
                          const LOG_LSA * postpone_start_lsa);
static void log_recovery_finish_sysop_postpone (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_recovery_finish_postpone (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_recovery_finish_all_postpone (THREAD_ENTRY * thread_p);
static void log_recovery_abort_atomic_sysop (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_recovery_abort_all_atomic_sysops (THREAD_ENTRY * thread_p);
static void log_recovery_undo (THREAD_ENTRY * thread_p);
static void log_recovery_notpartof_archives (THREAD_ENTRY * thread_p, int start_arv_num, const char *info_reason);
static bool log_unformat_ahead_volumes (THREAD_ENTRY * thread_p, VOLID volid, VOLID * start_volid);
static void log_recovery_notpartof_volumes (THREAD_ENTRY * thread_p);
static void log_recovery_resetlog (THREAD_ENTRY * thread_p, const LOG_LSA * new_append_lsa,
                   const LOG_LSA * new_prev_lsa);
static int log_recovery_find_first_postpone (THREAD_ENTRY * thread_p, LOG_LSA * ret_lsa, LOG_LSA * start_postpone_lsa,
                         LOG_TDES * tdes);

static int log_rv_record_modify_internal (THREAD_ENTRY * thread_p, LOG_RCV * rcv, bool is_undo);
static int log_rv_undoredo_partial_changes_recursive (THREAD_ENTRY * thread_p, OR_BUF * rcv_buf, RECDES * record,
                              bool is_undo);

STATIC_INLINE PAGE_PTR log_rv_redo_fix_page (THREAD_ENTRY * thread_p, const VPID * vpid_rcv)
  __attribute__ ((ALWAYS_INLINE));

static void log_rv_simulate_runtime_worker (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_rv_end_simulation (THREAD_ENTRY * thread_p);
static void log_find_unilaterally_largest_undo_lsa (THREAD_ENTRY * thread_p, LOG_LSA & max_undo_lsa);

STATIC_INLINE UINT64 log_cnt_pages_containing_lsa (const log_lsa * from_lsa, const log_lsa * to_lsa);

/*
 * CRASH RECOVERY PROCESS
 */

/*
 * log_rv_undo_record - EXECUTE AN UNDO RECORD
 *
 * return: nothing
 *
 *   log_lsa(in/out): Log address identifier containing the log record
 *   log_page_p(in/out): Pointer to page where data starts (Set as a side
 *              effect to the page where data ends)
 *   rcvindex(in): Index to recovery functions
 *   rcv_vpid(in): Address of page to recover
 *   rcv(in/out): Recovery structure for recovery function
 *   rcv_undo_lsa(in): Address of the undo record
 *   tdes(in/out): State structure of transaction undoing data
 *   undo_unzip_ptr(in):
 *
 * NOTE:Execute an undo log record during restart recovery undo phase.
 *              A compensating log record for operation page level logging is
 *              written by the current function. For logical level logging,
 *              the undo function is responsible to log a redo record, which
 *              is converted into a compensating record by the log manager.
 *
 *              This function is very similar than log_rollback_rec, however,
 *              page locking is not done.. and the transaction index that is
 *              running is set to the one in the tdes. Probably, this
 *              function should have not been duplicated for the above few
 *              things.
 */
static void
log_rv_undo_record (THREAD_ENTRY * thread_p, LOG_LSA * log_lsa, LOG_PAGE * log_page_p, LOG_RCVINDEX rcvindex,
            const VPID * rcv_vpid, LOG_RCV * rcv, const LOG_LSA * rcv_undo_lsa, LOG_TDES * tdes,
            LOG_ZIP * undo_unzip_ptr)
{
  char *area = NULL;
  TRAN_STATE save_state;    /* The current state of the transaction. Must be returned to this state */
  bool is_zip = false;
  int error_code = NO_ERROR;
  thread_type save_thread_type = thread_type::TT_NONE;

  if (thread_p == NULL)
    {
      /* Thread entry info is required. */
      thread_p = thread_get_thread_entry_info ();
    }

  log_rv_simulate_runtime_worker (thread_p, tdes);

  if (MVCCID_IS_VALID (rcv->mvcc_id))
    {
      /* Assign the MVCCID to transaction. */

      assert (LOG_IS_MVCC_OPERATION (rcvindex));

      logtb_rv_assign_mvccid_for_undo_recovery (thread_p, rcv->mvcc_id);
    }

  /*
   * Fetch the page for physical log records. The page is not locked since the
   * recovery process is the only one running. If the page does not exist
   * anymore or there are problems fetching the page, continue anyhow, so that
   * compensating records are logged.
   */

  if (RCV_IS_LOGICAL_LOG (rcv_vpid, rcvindex))
    {
      rcv->pgptr = NULL;
    }
  else
    {
      rcv->pgptr = pgbuf_fix (thread_p, rcv_vpid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
      if (rcv->pgptr == NULL)
    {
      assert (false);
    }
    }

  /* GET BEFORE DATA */

  /*
   * If data is contained in only one buffer, pass pointer directly.
   * Otherwise, allocate a contiguous area, copy the data and pass this area.
   * At the end deallocate the area.
   */

  if (ZIP_CHECK (rcv->length))
    {               /* check compress data */
      rcv->length = (int) GET_ZIP_LEN (rcv->length);    /* convert compress length */
      is_zip = true;
    }

  if (log_lsa->offset + rcv->length < (int) LOGAREA_SIZE)
    {
      rcv->data = (char *) log_page_p->area + log_lsa->offset;
      log_lsa->offset += rcv->length;
    }
  else
    {
      /* Need to copy the data into a contiguous area */
      area = (char *) malloc (rcv->length);
      if (area == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_undo_record");
      goto end;
    }
      /* Copy the data */
      logpb_copy_from_log (thread_p, area, rcv->length, log_lsa, log_page_p);
      rcv->data = area;
    }

  if (is_zip)
    {
      if (log_unzip (undo_unzip_ptr, rcv->length, (char *) rcv->data))
    {
      rcv->length = (int) undo_unzip_ptr->data_length;
      rcv->data = (char *) undo_unzip_ptr->log_data;
    }
      else
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_undo_record");
      goto end;
    }
    }

  /* Now call the UNDO recovery function */
  if (rcv->pgptr != NULL || RCV_IS_LOGICAL_LOG (rcv_vpid, rcvindex))
    {
      /*
       * Write a compensating log record for operation page level logging.
       * For logical level logging, the recovery undo function must log an
       * redo/CLR log to describe the undo.
       */

      if (rcvindex == RVBT_MVCC_INCREMENTS_UPD)
    {
      /* nothing to do during recovery */
    }
      else if (rcvindex == RVBT_MVCC_NOTIFY_VACUUM || rcvindex == RVES_NOTIFY_VACUUM)
    {
      /* nothing to do */
    }
      else if (rcvindex == RVBT_LOG_GLOBAL_UNIQUE_STATS_COMMIT)
    {
      /* this only modifies in memory data that is only flushed to disk on checkpoints. we need to execute undo
       * every time recovery is run, and we cannot compensate it. */
      error_code = (*RV_fun[rcvindex].undofun) (thread_p, rcv);
      assert (error_code == NO_ERROR);
    }
      else if (RCV_IS_LOGICAL_COMPENSATE_MANUAL (rcvindex))
    {
      /* B-tree logical logs will add a regular compensate in the modified pages. They do not require a logical
       * compensation since the "undone" page can be accessed and logged. Only no-page logical operations require
       * logical compensation. */
      /* Invoke Undo recovery function */
      LSA_COPY (&rcv->reference_lsa, &tdes->undo_nxlsa);
      error_code = (*RV_fun[rcvindex].undofun) (thread_p, rcv);
      if (error_code != NO_ERROR)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE,
                 "log_rv_undo_record: Error applying compensation at log_lsa=(%lld, %d), "
                 "rcv = {mvccid=%llu, offset = %d, data_length = %d}",
                 (long long int) rcv_undo_lsa->pageid, (int) rcv_undo_lsa->offset,
                 (unsigned long long int) rcv->mvcc_id, (int) rcv->offset, (int) rcv->length);
        }
      else if (RCV_IS_BTREE_LOGICAL_LOG (rcvindex) && prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
        {
          _er_log_debug (ARG_FILE_LINE,
                 "BTREE_UNDO: Successfully executed undo/compensate for log entry at "
                 "lsa=%lld|%d, before lsa=%lld|%d, undo_nxlsa=%lld|%d. "
                 "Transaction=%d, rcvindex=%d.\n", (long long int) rcv_undo_lsa->pageid,
                 (int) rcv_undo_lsa->offset, (long long int) log_lsa->pageid, (int) log_lsa->offset,
                 (long long int) tdes->undo_nxlsa.pageid, (int) tdes->undo_nxlsa.offset, tdes->tran_index,
                 rcvindex);
        }
    }
      else if (!RCV_IS_LOGICAL_LOG (rcv_vpid, rcvindex))
    {
      log_append_compensate (thread_p, rcvindex, rcv_vpid, rcv->offset, rcv->pgptr, rcv->length, rcv->data, tdes);

      error_code = (*RV_fun[rcvindex].undofun) (thread_p, rcv);

      if (error_code != NO_ERROR)
        {
          VPID vpid;

          if (rcv->pgptr != NULL)
        {
          pgbuf_get_vpid (rcv->pgptr, &vpid);
        }
          else
        {
          VPID_SET_NULL (&vpid);
        }

          logpb_fatal_error (thread_p, true, ARG_FILE_LINE,
                 "log_rv_undo_record: Error applying compensation at log_lsa=(%lld, %d), "
                 "rcv = {mvccid=%llu, vpid=(%d, %d), offset = %d, data_length = %d}",
                 (long long int) rcv_undo_lsa->pageid, (int) rcv_undo_lsa->offset,
                 (unsigned long long int) rcv->mvcc_id, (int) vpid.pageid, (int) vpid.volid,
                 (int) rcv->offset, (int) rcv->length);
          goto end;
        }
    }
      else
    {
      /* Logical logging? This is a logical undo. For now, we also use a logical compensation, meaning that we
       * open a system operation that is committed & compensate at the same time.
       * However, there might be cases when compensation is not necessarily logical. If the compensation can be
       * made in a single log record and can be attached to a page, the system operation becomes useless. Take the
       * example of some b-tree cases for compensations. There might be other cases too.
       */
      save_state = tdes->state;

      LSA_COPY (&rcv->reference_lsa, &tdes->undo_nxlsa);
      log_sysop_start (thread_p);

#if defined(CUBRID_DEBUG)
      {
        /* TODO: What is this? We might remove the block. */
        LOG_LSA check_tail_lsa;

        LSA_COPY (&check_tail_lsa, &tdes->tail_lsa);
        (void) (*RV_fun[rcvindex].undofun) (rcv);

        /*
         * Make sure that a CLR was logged.
         *
         * If we do not log anything and the logical undo_nxlsa is not the
         * tail, give a warning.. unless it is a temporary file deletion.
         *
         * WARNING: the if condition is different from the one of normal
         *          rollback.
         */

        if (LSA_EQ (&check_tail_lsa, &tdes->tail_lsa) && !LSA_EQ (rcv_undo_lsa, &tdes->tail_lsa))
          {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_MISSING_COMPENSATING_RECORD, 1,
            rv_rcvindex_string (rcvindex));
          }
      }
#else /* CUBRID_DEBUG */
      (void) (*RV_fun[rcvindex].undofun) (thread_p, rcv);
#endif /* CUBRID_DEBUG */

      log_sysop_end_logical_compensate (thread_p, &rcv->reference_lsa);
      tdes->state = save_state;
    }
    }
  else
    {
      log_append_compensate (thread_p, rcvindex, rcv_vpid, rcv->offset, NULL, rcv->length, rcv->data, tdes);
      /*
       * Unable to fetch page of volume... May need media recovery on such
       * page
       */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_MAYNEED_MEDIA_RECOVERY, 1,
          fileio_get_volume_label (rcv_vpid->volid, PEEK));
    }

end:
  if (area != NULL)
    {
      free_and_init (area);
    }

  if (rcv->pgptr != NULL)
    {
      pgbuf_unfix (thread_p, rcv->pgptr);
    }

  /* Convert thread back to system transaction. */
  log_rv_end_simulation (thread_p);
}

/*
 * log_rv_redo_record - EXECUTE A REDO RECORD
 *
 * return: nothing
 *
 *   log_lsa(in/out): Log address identifier containing the log record
 *   log_page_p(in/out): Pointer to page where data starts (Set as a side
 *               effect to the page where data ends)
 *   redofun(in): Function to invoke to redo the data
 *   rcv(in/out): Recovery structure for recovery function(Set as a side
 *               effect)
 *   rcv_lsa_ptr(in): Reset data page (rcv->pgptr) to this LSA
 *   ignore_redofunc(in):
 *   undo_length(in):
 *   undo_data(in):
 *   redo_unzip(in): functions as an outside managed (longer lived) buffer space for the underlying
 *        to peform its work; the pointer to the buffer is then passed - non-owningly - to the rcv
 *        structure for further processing; it is therefore mandatory that the acontents of the
 *        buffer does not change until after the
 *
 * NOTE: Execute a redo log record.
 */
void
log_rv_redo_record (THREAD_ENTRY * thread_p, log_reader & log_pgptr_reader,
            int (*redofun) (THREAD_ENTRY * thread_p, LOG_RCV *), LOG_RCV * rcv,
            const LOG_LSA * rcv_lsa_ptr, int undo_length, const char *undo_data, LOG_ZIP & redo_unzip)
{
  int error_code;

  /* Note the the data page rcv->pgptr has been fetched by the caller */

  if (log_rv_get_unzip_and_diff_redo_log_data
      (thread_p, log_pgptr_reader, rcv, undo_length, undo_data, redo_unzip) != NO_ERROR)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_redo_record");
      return;
    }

  if (redofun != NULL)
    {
      error_code = (*redofun) (thread_p, rcv);
      if (error_code != NO_ERROR)
    {
      VPID vpid;
      if (rcv->pgptr != NULL)
        {
          pgbuf_get_vpid (rcv->pgptr, &vpid);
        }
      else
        {
          VPID_SET_NULL (&vpid);
        }

      logpb_fatal_error (thread_p, true, ARG_FILE_LINE,
                 "log_rv_redo_record: Error applying redo record at log_lsa=(%lld, %d), "
                 "rcv = {mvccid=%llu, vpid=(%d, %d), offset = %d, data_length = %d}",
                 (long long int) rcv_lsa_ptr->pageid, (int) rcv_lsa_ptr->offset,
                 (long long int) rcv->mvcc_id, (int) vpid.pageid, (int) vpid.volid, (int) rcv->offset,
                 (int) rcv->length);
    }
    }
  else
    {
      er_log_debug (ARG_FILE_LINE,
            "log_rv_redo_record: WARNING.. There is not a"
            " REDO function to execute. May produce recovery problems.");
    }

  if (rcv->pgptr != NULL)
    {
      (void) pgbuf_set_lsa (thread_p, rcv->pgptr, rcv_lsa_ptr);
    }
}

/* log_rv_fix_page_and_check_redo_is_needed - check if page still exists and, if yes, whether the lsa has not
 *                    already been applied
 *
 * return: boolean
 *
 *  thread_p(in):
 *  page_vpid(in): page identifier
 *  rcv(in/out):
 *  rcvindex(in): recovery index of log record to redo
 *  rcv_lsa(in): Reset data page (rcv->pgptr) to this LSA
 *  end_redo_lsa(in):
 */
bool
log_rv_fix_page_and_check_redo_is_needed (THREAD_ENTRY * thread_p, const VPID & page_vpid, log_rcv & rcv,
                      LOG_RCVINDEX rcvindex, const log_lsa & rcv_lsa, const LOG_LSA & end_redo_lsa)
{
  assert (rcv.pgptr == nullptr);

  if (!VPID_ISNULL (&page_vpid))
    {
      rcv.pgptr = log_rv_redo_fix_page (thread_p, &page_vpid);
      if (rcv.pgptr == nullptr)
    {
      /* the page was changed and also deallocated in the meantime, no need to apply redo */
      // only acceptable during recovery, not acceptable for replication
      assert (log_is_in_crash_recovery ());
      return false;
    }
    }

  if (rcv.pgptr != nullptr)
    {
      /* LSA of data page for log record to redo */
      const log_lsa *const rcv_page_ptr = pgbuf_get_lsa (rcv.pgptr);
      /*
       * Do we need to execute the redo operation ?
       * If page_lsa >= lsa... already updated. In this case make sure
       * that the redo is not far from the end_redo_lsa (TODO: how to ensure this last bit?)
       */
      assert (rcv_page_ptr != nullptr);
      assert (end_redo_lsa.is_null () || *rcv_page_ptr <= end_redo_lsa);
      if (rcv_lsa <= *rcv_page_ptr)
    {
      /* already applied, make sure to unfix the page */
      // only acceptable during recovery, not acceptable for replication
      assert (log_is_in_crash_recovery ());
      pgbuf_unfix_and_init (thread_p, rcv.pgptr);
      return false;
    }
    }

  return true;
}

/* log_rv_need_sync_redo - some of the redo log records need to be applied synchronously:
 *
 *  a_rcv_vpid: log record vpid, can be a null vpid
 *  a_rcvindex: recovery index of log record to redo
 */
bool
log_rv_need_sync_redo (const vpid & a_rcv_vpid, LOG_RCVINDEX a_rcvindex)
{
  if (VPID_ISNULL (&a_rcv_vpid))
    {
      // if the log record does not modify a certain page (ie: the vpid is null)
      return true;
    }

  switch (a_rcvindex)
    {
    case RVDK_NEWVOL:
    case RVDK_FORMAT:
    case RVDK_INITMAP:
    case RVDK_EXPAND_VOLUME:
    case RVDK_VOLHEAD_EXPAND:
      // Creating/expanding a volume needs to be waited before applying other changes in new pages
      return true;
    case RVDK_RESERVE_SECTORS:
    case RVDK_UNRESERVE_SECTORS:
      // Sector reservation is handled synchronously for better control; may be changed to async
      return true;
    default:
      return false;
    }
}

/*
 * log_rv_find_checkpoint - FIND RECOVERY CHECKPOINT
 *
 * return: true
 *
 *   volid(in): Volume identifier
 *   rcv_lsa(in/out): Recovery log sequence address
 *
 * NOTE: Find the recovery checkpoint address of the given volume. If
 *              it is smaller than rcv_lsa, rcv_lsa is reset to such value.
 */
static bool
log_rv_find_checkpoint (THREAD_ENTRY * thread_p, VOLID volid, LOG_LSA * rcv_lsa)
{
  LOG_LSA chkpt_lsa;        /* Checkpoint LSA of volume */
  int ret = NO_ERROR;

  ret = disk_get_checkpoint (thread_p, volid, &chkpt_lsa);
  if (LSA_ISNULL (rcv_lsa) || LSA_LT (&chkpt_lsa, rcv_lsa))
    {
      LSA_COPY (rcv_lsa, &chkpt_lsa);
    }

  return true;
}

/*
 * log_rv_get_unzip_log_data - GET UNZIP (UNDO or REDO) LOG DATA FROM LOG
 *
 * return: error code
 *
 *   length(in): log data size
 *   log_reader(in/out): (output invariant) reader will be correctly aligned after reading needed data
 *   unzip_ptr(in/out): must be pre-allocated, where the data will be extracted, if the internal
 *                buffer is not enough, it will be re-alloc'ed internally
 *   is_zip(out): a helper flag which is only needed for the situation where
 *                this function is used for the extraction of the 'redo' log data
 *
 * NOTE:if log_data is unzip data return LOG_ZIP data
 *               else log_data is zip data return unzip log data
 */
int
log_rv_get_unzip_log_data (THREAD_ENTRY * thread_p, int length, log_reader & log_pgptr_reader,
               LOG_ZIP * unzip_ptr, bool & is_zip)
{
  char *area_ptr = nullptr; /* Temporary working pointer */
  // *INDENT-OFF*
  std::unique_ptr<char[]> area;
  // *INDENT-ON*

  /*
   * If data is contained in only one buffer, pass pointer directly.
   * Otherwise, allocate a contiguous area, copy the data and pass this area.
   * At the end the area will de-allocate itself so make sure that wherever
   * a pointer to this data ends, data is copied before the end of the scope.
   */
  is_zip = ZIP_CHECK (length);
  const int unzip_length = (is_zip) ? (int) GET_ZIP_LEN (length) : length;

  const bool fits_in_current_page = log_pgptr_reader.does_fit_in_current_page (unzip_length);
  if (fits_in_current_page)
    {
      /* Data is contained in one buffer */
      // *INDENT-OFF*
      area_ptr = const_cast<char*> (log_pgptr_reader.reinterpret_cptr<char> ());
      // *INDENT-ON*
    }
  else
    {
      /* Need to copy the data into a contiguous area */
      area.reset (new char[unzip_length]);
      area_ptr = area.get ();
      log_pgptr_reader.copy_from_log (area_ptr, unzip_length);
    }

  if (is_zip)
    {
      /* will re-alloc the buffer internally if current buffer is not enough */
      if (!log_unzip (unzip_ptr, unzip_length, area_ptr))
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_get_unzip_log_data");
      return ER_FAILED;
    }
    }
  else
    {
      /* explicitly re-alloc the buffer if needed */
      if (unzip_ptr->buf_size < unzip_length)
    {
      if (!log_zip_realloc_if_needed (*unzip_ptr, unzip_length))
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_get_unzip_log_data");
          return ER_FAILED;
        }
    }
      assert (unzip_length <= unzip_ptr->buf_size);
      unzip_ptr->data_length = unzip_length;
      memcpy (unzip_ptr->log_data, area_ptr, unzip_length);
    }

  if (fits_in_current_page)
    {
      log_pgptr_reader.add_align (unzip_length);
    }
  else
    {
      /* only align; advance was peformed while copying from log into the supplied buffer */
      log_pgptr_reader.align ();
    }

  return NO_ERROR;
}

/*
 * log_rv_get_unzip_and_diff_redo_log_data - GET UNZIPPED and DIFFED REDO LOG DATA FROM LOG
 *
 * return: error code
 *
 *   length(in): log data size
 *   log_reader(in/out):
 *   rcv(in/out): Recovery structure for recovery function
 *   undo_length(in): undo data length
 *   undo_data(in): undo data
 *   redo_unzip(out): extracted redo data; required to be passed by address because
 *                    it also functions as an internal working buffer
 *
 * NOTE:if log_data is unzip data return LOG_ZIP data
 *               else log_data is zip data return unzip log data
 * TODO: after refactoring, it's easier to supply (or not) a pointer to the undo unzip
 * TODO: separate in 2 based on diffing or not and fork outside
 */
int
log_rv_get_unzip_and_diff_redo_log_data (THREAD_ENTRY * thread_p, log_reader & log_pgptr_reader,
                     LOG_RCV * rcv, int undo_length, const char *undo_data, LOG_ZIP & redo_unzip)
{
  bool is_zip = false;
  LOG_ZIP *local_unzip_ptr = nullptr;

  if (log_rv_get_unzip_log_data (thread_p, rcv->length, log_pgptr_reader, &redo_unzip, is_zip) != NO_ERROR)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_get_unzip_and_diff_redo_log_data");
      return ER_FAILED;
    }

  if (is_zip)
    {
      if (undo_length > 0 && undo_data != nullptr)
    {
      (void) log_diff (undo_length, undo_data, redo_unzip.data_length, redo_unzip.log_data);
    }
    }
  // *INDENT-OFF*
  rcv->length = static_cast<int> (redo_unzip.data_length);
  rcv->data = static_cast<char*> (redo_unzip.log_data);
  // *INDENT-ON*

  return NO_ERROR;
}

/*
 * log_recovery - Recover information
 *
 * return: nothing
 *
 *   ismedia_crash(in):Are we recovering from a media crash ?
 *   stopat(in):
 *
 */
void
log_recovery (THREAD_ENTRY * thread_p, int ismedia_crash, time_t * stopat)
{
  LOG_TDES *rcv_tdes;       /* Tran. descriptor for the recovery phase */
  int rcv_tran_index;       /* Saved transaction index */
  LOG_RECORD_HEADER *eof;   /* End of the log record */
  LOG_LSA rcv_lsa;      /* Where to start the recovery */
  LOG_LSA start_redolsa;    /* Where to start redo phase */
  LOG_LSA end_redo_lsa;     /* Where to stop the redo phase */
  bool did_incom_recovery;
  int tran_index;
  INT64 num_redo_log_records;
  int error_code = NO_ERROR;

  assert (LOG_CS_OWN_WRITE_MODE (thread_p));

  /* Save the transaction index and find the transaction descriptor */

  tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
  rcv_tdes = LOG_FIND_TDES (tran_index);
  if (rcv_tdes == NULL)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery:LOG_FIND_TDES");
      return;
    }

  rcv_tran_index = tran_index;
  rcv_tdes->state = TRAN_RECOVERY;

  if (LOG_HAS_LOGGING_BEEN_IGNORED ())
    {
      /*
       * Your database is corrupted since it crashed when logging was ignored
       */
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_CORRUPTED_DB_DUE_CRASH_NOLOGGING, 0);
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery:LOG_HAS_LOGGING_BEEN_IGNORED");
      log_Gl.hdr.has_logging_been_skipped = false;
    }

  er_log_debug (ARG_FILE_LINE, "RECOVERY: start with %lld|%d and stop at %lld", LSA_AS_ARGS (&log_Gl.hdr.chkpt_lsa),
        stopat != NULL ? *stopat : -1);

  /* Find the starting LSA for the analysis phase */

  LSA_COPY (&rcv_lsa, &log_Gl.hdr.chkpt_lsa);
  if (ismedia_crash != false)
    {
      /*
       * Media crash, we may have to start from an older checkpoint...
       * check disk headers
       */
      (void) fileio_map_mounted (thread_p, (bool (*)(THREAD_ENTRY *, VOLID, void *)) log_rv_find_checkpoint, &rcv_lsa);
    }
  else
    {
      /*
       * We do incomplete recovery only when we are comming from a media crash.
       * That is, we are restarting from a backup
       */
      if (stopat != NULL)
    {
      *stopat = -1;
    }
    }

  /* Notify vacuum it may need to recover the lost block data.
   * There are two possible cases here:
   * 1. recovery finds MVCC op log records after last checkpoint, so vacuum can start its recovery from last MVCC op
   *    log record.
   * 2. no MVCC op log record is found, so vacuum has to start recovery from checkpoint LSA. It will go
   *    backwards record by record until it either finds a MVCC op log record or until it reaches last block in
   *    vacuum data.
   */
  vacuum_notify_server_crashed (&rcv_lsa);

  /*
   * First,  ANALYSIS the log to find the state of the transactions
   * Second, REDO going forward
   * Last,   UNDO going backwards
   */

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_STARTED, 0);

  log_Gl.rcv_phase = LOG_RECOVERY_ANALYSIS_PHASE;

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_ANALYSIS_STARTED, 0);

  log_recovery_analysis (thread_p, &rcv_lsa, &start_redolsa, &end_redo_lsa, ismedia_crash, stopat,
             &did_incom_recovery, &num_redo_log_records);

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_PHASE_FINISHED, 1, "ANALYSIS");

  LSA_COPY (&log_Gl.chkpt_redo_lsa, &start_redolsa);

  LOG_SET_CURRENT_TRAN_INDEX (thread_p, rcv_tran_index);
  if (logpb_fetch_start_append_page (thread_p) != NO_ERROR)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery:logpb_fetch_start_append_page");
      // dead-ended. not reach here
      return;
    }

  if (did_incom_recovery == false)
    {
      /* Read the End of file record to find out the previous address */
      eof = (LOG_RECORD_HEADER *) LOG_APPEND_PTR ();
      LOG_RESET_PREV_LSA (&eof->back_lsa);
    }

#if !defined(SERVER_MODE)
  LSA_COPY (&log_Gl.final_restored_lsa, &log_Gl.hdr.append_lsa);
#endif /* SERVER_MODE */

  log_append_empty_record (thread_p, LOG_DUMMY_CRASH_RECOVERY, NULL);

  /*
   * Save the crash point lsa for use during the remaining recovery
   * phases.
   */
  LSA_COPY (&log_Gl.rcv_phase_lsa, &rcv_tdes->tail_lsa);

  /* Redo phase */
  log_Gl.rcv_phase = LOG_RECOVERY_REDO_PHASE;

  LOG_SET_CURRENT_TRAN_INDEX (thread_p, rcv_tran_index);

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_REDO_STARTED, 2,
      log_cnt_pages_containing_lsa (&start_redolsa, &end_redo_lsa), num_redo_log_records);

  log_recovery_redo (thread_p, &start_redolsa, &end_redo_lsa);

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_PHASE_FINISHED, 1, "REDO");

  boot_reset_db_parm (thread_p);

  /* Undo phase */
  log_Gl.rcv_phase = LOG_RECOVERY_UNDO_PHASE;

  LOG_SET_CURRENT_TRAN_INDEX (thread_p, rcv_tran_index);

  /* ER_LOG_RECOVERY_REDO_STARTED logging is inside log_recovery_undo() */

  log_recovery_undo (thread_p);

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_PHASE_FINISHED, 1, "UNDO");

  boot_reset_db_parm (thread_p);

  // *INDENT-OFF*
  log_system_tdes::rv_final ();
  // *INDENT-ON*

  if (did_incom_recovery == true)
    {
      log_recovery_notpartof_volumes (thread_p);
    }

  /* Client loose ends */
  rcv_tdes->state = TRAN_ACTIVE;

  LOG_SET_CURRENT_TRAN_INDEX (thread_p, rcv_tran_index);

  (void) logtb_set_num_loose_end_trans (thread_p);

  /* Try to finish any 2PC blocked transactions */
  if (log_Gl.trantable.num_coord_loose_end_indices > 0 || log_Gl.trantable.num_prepared_loose_end_indices > 0)
    {

      log_Gl.rcv_phase = LOG_RECOVERY_FINISH_2PC_PHASE;

      LOG_SET_CURRENT_TRAN_INDEX (thread_p, rcv_tran_index);

      log_2pc_recovery (thread_p);

      /* Check number of loose end transactions again.. */
      rcv_tdes->state = TRAN_ACTIVE;

      LOG_SET_CURRENT_TRAN_INDEX (thread_p, rcv_tran_index);

      (void) logtb_set_num_loose_end_trans (thread_p);
    }

  /* Dismount any archive and checkpoint the database */
  logpb_decache_archive_info (thread_p);

  LOG_CS_EXIT (thread_p);
  (void) logpb_checkpoint (thread_p);
  LOG_CS_ENTER (thread_p);

  /* Flush all dirty pages */
  logpb_flush_pages_direct (thread_p);
  (void) pgbuf_flush_all (thread_p, NULL_VOLID);
  (void) fileio_synchronize_all (thread_p);

  logpb_flush_header (thread_p);

  /* re-cache Tracker */
  error_code = locator_initialize (thread_p);
  if (error_code != NO_ERROR)
    {
      assert (false);
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery:locator_initialize");
      // dead-ended. not reach here
      return;
    }

  /* Remove all class representations. */
  error_code = heap_classrepr_restart_cache ();
  if (error_code != NO_ERROR)
    {
      assert (false);
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery:heap_classrepr_restart_cache");
      // dead-ended. not reach here
      return;
    }

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_FINISHED, 0);
}

/*
 * log_rv_analysis_undo_redo -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 * Note:
 */
static int
log_rv_analysis_undo_redo (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it and assume that the transaction was active
   * at the time of the crash, and thus it will be unilateraly
   * aborted. The truth of this statement will be find reading the
   * rest of the log
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_undo_redo");
      return ER_FAILED;
    }

  /* New tail and next to undo */
  LSA_COPY (&tdes->tail_lsa, log_lsa);
  LSA_COPY (&tdes->undo_nxlsa, &tdes->tail_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_dummy_head_postpone -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 * Note:
 */
static int
log_rv_analysis_dummy_head_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it and assume that the transaction was active
   * at the time of the crash, and thus it will be unilateraly
   * aborted. The truth of this statement will be find reading the
   * rest of the log
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_dummy_head_postpone");
      return ER_FAILED;
    }

  /* New tail and next to undo */
  LSA_COPY (&tdes->tail_lsa, log_lsa);
  LSA_COPY (&tdes->undo_nxlsa, &tdes->tail_lsa);

  /* if first postpone, then set address late */
  if (LSA_ISNULL (&tdes->posp_nxlsa))
    {
      LSA_COPY (&tdes->posp_nxlsa, &tdes->tail_lsa);
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_postpone -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it and assume that the transaction was active
   * at the time of the crash, and thus it will be unilateraly
   * aborted. The truth of this statement will be find reading the
   * rest of the log
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_postpone");
      return ER_FAILED;
    }

  /* if first postpone, then set address early */
  if (LSA_ISNULL (&tdes->posp_nxlsa))
    {
      LSA_COPY (&tdes->posp_nxlsa, &tdes->tail_lsa);
    }

  /* New tail and next to undo */
  LSA_COPY (&tdes->tail_lsa, log_lsa);
  LSA_COPY (&tdes->undo_nxlsa, &tdes->tail_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_run_postpone -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *   log_page_p(in/out):
 *   check_point(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_run_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
                  LOG_LSA * check_point)
{
  LOG_TDES *tdes;
  LOG_REC_RUN_POSTPONE *run_posp;

  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_run_postpone");
      return ER_FAILED;
    }

  if (tdes->state != TRAN_UNACTIVE_WILL_COMMIT && tdes->state != TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE
      && tdes->state != TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE)
    {
      /*
       * If we are comming from a checkpoint this is the result of a
       * system error since the transaction must have been already in
       * one of these states.
       * If we are not comming from a checkpoint, it is likely that
       * we are in a commit point of either a transaction or a top
       * operation.
       */
      if (!LSA_ISNULL (check_point))
    {
      er_log_debug (ARG_FILE_LINE,
            "log_recovery_analysis: SYSTEM ERROR\n Incorrect state = %s\n at log_rec at %lld|%d\n"
            " for transaction = %d (index %d).\n State should have been either of\n %s\n %s\n %s\n",
            log_state_string (tdes->state), (long long int) log_lsa->pageid, (int) log_lsa->offset,
            tdes->trid, tdes->tran_index, log_state_string (TRAN_UNACTIVE_WILL_COMMIT),
            log_state_string (TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE),
            log_state_string (TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE));
    }
      /*
       * Continue the execution by guessing that the transaction has
       * been committed
       */
      if (tdes->topops.last == -1)
    {
      tdes->state = TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE;
    }
      else
    {
      tdes->state = TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE;
    }
    }

  if (tdes->state == TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE)
    {
      /* Nothing to undo */
      LSA_SET_NULL (&tdes->undo_nxlsa);
    }

  LSA_COPY (&tdes->tail_lsa, log_lsa);

  /*
   * Need to read the log_run_postpone record to reset the posp_nxlsa
   * of transaction or top action to the value of log_ref
   */

  /* Read the DATA HEADER */
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_RUN_POSTPONE), log_lsa, log_page_p);

  run_posp = (LOG_REC_RUN_POSTPONE *) ((char *) log_page_p->area + log_lsa->offset);

  if (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE)
    {
      /* Reset start of postpone transaction for the top action */
      LSA_COPY (&tdes->topops.stack[tdes->topops.last].posp_lsa, &run_posp->ref_lsa);
    }
  else
    {
      assert (tdes->state == TRAN_UNACTIVE_WILL_COMMIT || tdes->state == TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE);

      /* Reset start of postpone transaction */
      LSA_COPY (&tdes->posp_nxlsa, &run_posp->ref_lsa);
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_compensate -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *   log_page_p(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_compensate (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p)
{
  LOG_TDES *tdes;
  LOG_REC_COMPENSATE *compensate;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it and assume that the transaction was active
   * at the time of the crash, and thus it will be unilateraly
   * aborted. The truth of this statement will be find reading the
   * rest of the log
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_compensate");
      return ER_FAILED;
    }

  /*
   * Need to read the compensating record to set the next undo address
   */

  /* Read the DATA HEADER */
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_COMPENSATE), log_lsa, log_page_p);

  compensate = (LOG_REC_COMPENSATE *) ((char *) log_page_p->area + log_lsa->offset);
  LSA_COPY (&tdes->undo_nxlsa, &compensate->undo_nxlsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_commit_with_postpone -
 *
 * return: error code
 *
 *   tran_id(in):
 *   log_lsa(in/out):
 *   log_page_p(in/out):
 *   lsa(in/out):
 *   is_media_crash(in):
 *   stop_at(in):
 *   did_incom_recovery(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_commit_with_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
                      LOG_LSA * prev_lsa, bool is_media_crash, time_t * stop_at,
                      bool * did_incom_recovery)
{
  LOG_TDES *tdes;
  LOG_REC_START_POSTPONE *start_posp;
  time_t last_at_time;
  char time_val[CTIME_MAX];
  LOG_LSA record_header_lsa;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction was in the process of
   * getting committed at this point.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_commit_with_postpone");
      return ER_FAILED;
    }

  LSA_COPY (&record_header_lsa, log_lsa);
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_START_POSTPONE), log_lsa, log_page_p);
  start_posp = (LOG_REC_START_POSTPONE *) ((char *) log_page_p->area + log_lsa->offset);

  if (is_media_crash)
    {
      last_at_time = (time_t) start_posp->at_time;
      if (stop_at != NULL && *stop_at != (time_t) (-1) && difftime (*stop_at, last_at_time) < 0)
    {
#if !defined(NDEBUG)
      if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
        {
          fprintf (stdout, msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_STARTS));
          (void) ctime_r (&last_at_time, time_val);
          fprintf (stdout,
               msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_INCOMPLTE_MEDIA_RECOVERY),
               record_header_lsa.pageid, record_header_lsa.offset, time_val);
          fprintf (stdout, msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_STARTS));
          fflush (stdout);
        }
#endif /* !NDEBUG */
      /*
       * Reset the log active and stop the recovery process at this
       * point. Before reseting the log, make sure that we are not
       * holding a page.
       */
      log_lsa->pageid = NULL_PAGEID;
      log_recovery_resetlog (thread_p, &record_header_lsa, prev_lsa);
      *did_incom_recovery = true;
    }
    }
  else
    {
      /*
       * Need to read the start postpone record to set the postpone address
       * of the transaction
       */
      tdes->state = TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE;

      /* Nothing to undo */
      LSA_SET_NULL (&tdes->undo_nxlsa);
      LSA_COPY (&tdes->tail_lsa, log_lsa);
      tdes->rcv.tran_start_postpone_lsa = tdes->tail_lsa;

      LSA_COPY (&tdes->posp_nxlsa, &start_posp->posp_lsa);
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_commit_with_postpone_obsolete-
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *   log_page_p(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_commit_with_postpone_obsolete (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa,
                           LOG_PAGE * log_page_p)
{
  LOG_TDES *tdes;
  LOG_REC_START_POSTPONE_OBSOLETE *start_posp;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction was in the process of
   * getting committed at this point.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_commit_with_postpone_obsolete");
      return ER_FAILED;
    }

  tdes->state = TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE;

  /* Nothing to undo */
  LSA_SET_NULL (&tdes->undo_nxlsa);
  LSA_COPY (&tdes->tail_lsa, log_lsa);
  tdes->rcv.tran_start_postpone_lsa = tdes->tail_lsa;

  /*
   * Need to read the start postpone record to set the postpone address
   * of the transaction
   */
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_START_POSTPONE_OBSOLETE), log_lsa, log_page_p);

  start_posp = (LOG_REC_START_POSTPONE_OBSOLETE *) ((char *) log_page_p->area + log_lsa->offset);
  LSA_COPY (&tdes->posp_nxlsa, &start_posp->posp_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_sysop_start_postpone - start system op postpone.
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *   log_page_p(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_sysop_start_postpone (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p)
{
  LOG_TDES *tdes;
  LOG_REC_SYSOP_START_POSTPONE *sysop_start_posp;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. A top system operation was in the process
   * of getting committed at this point.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_sysop_start_postpone");
      return ER_FAILED;
    }

  LSA_COPY (&tdes->tail_lsa, log_lsa);
  LSA_COPY (&tdes->undo_nxlsa, &tdes->tail_lsa);

  tdes->rcv.sysop_start_postpone_lsa = tdes->tail_lsa;

  /*
   * Need to read the start postpone record to set the start address
   * of top system operation
   */
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_SYSOP_START_POSTPONE), log_lsa, log_page_p);
  sysop_start_posp = ((LOG_REC_SYSOP_START_POSTPONE *) ((char *) log_page_p->area + log_lsa->offset));

  if (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE)
    {
      /* this is not a valid situation */
      assert_release (false);
    }
  else if (sysop_start_posp->sysop_end.type == LOG_SYSOP_END_LOGICAL_RUN_POSTPONE)
    {
      if (sysop_start_posp->sysop_end.run_postpone.is_sysop_postpone)
    {
      /* system op postpone inside system op postpone. not a valid situation */
      assert (false);
    }
      else
    {
      /* no undo. it is possible that the transaction state is TRAN_UNACTIVE_UNILATERALLY_ABORTED because this might
       * be the first log record discovered for current transaction. it will be set correctly when the system op end
       * is found or executed.
       */
      LSA_SET_NULL (&tdes->undo_nxlsa);
    }
    }
  else
    {
      assert (sysop_start_posp->sysop_end.type != LOG_SYSOP_END_ABORT);
    }

  /* update state */
  tdes->state = TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE;

  if (tdes->topops.max == 0 || (tdes->topops.last + 1) >= tdes->topops.max)
    {
      if (logtb_realloc_topops_stack (tdes, 1) == NULL)
    {
      /* Out of memory */
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_atomic_sysop_start");
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }
    }

  /*
   * NOTE if tdes->topops.last >= 0, there is an already
   * defined top system operation. However, I do not think so
   * do to the nested fashion of top system operations. Outer
   * top nested system operations will come later in the log.
   */

  if (tdes->topops.last == -1)
    {
      tdes->topops.last++;
    }
  else
    {
      /* not expected */
      assert (false);
    }

  LSA_COPY (&tdes->topops.stack[tdes->topops.last].lastparent_lsa, &sysop_start_posp->sysop_end.lastparent_lsa);
  LSA_COPY (&tdes->topops.stack[tdes->topops.last].posp_lsa, &sysop_start_posp->posp_lsa);

  if (LSA_LT (&sysop_start_posp->sysop_end.lastparent_lsa, &tdes->rcv.atomic_sysop_start_lsa))
    {
      /* reset tdes->rcv.atomic_sysop_start_lsa */
      LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_atomic_sysop_start () - analyze start atomic system operation
 *
 * return        : error code
 * thread_p (in) : thread entry
 * tran_id (in)  : transaction ID
 * log_lsa (in)  : log record LSA. will be used as marker for start system operation.
 */
static int
log_rv_analysis_atomic_sysop_start (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_sysop_start_postpone");
      return ER_FAILED;
    }

  tdes->tail_lsa = *log_lsa;
  tdes->undo_nxlsa = tdes->tail_lsa;

  /* this is a marker for system operations that need to be atomic. they will be rollbacked before postpone is finished.
   */
  tdes->rcv.atomic_sysop_start_lsa = *log_lsa;

  return NO_ERROR;
}

/*
 * log_rv_analysis_complete -
 *
 * return: error code
 *
 *   tran_id(in):
 *   log_lsa(in/out):
 *   log_page_p(in/out):
 *   lsa(in/out):
 *   is_media_crash(in):
 *   stop_at(in):
 *   did_incom_recovery(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_complete (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
              LOG_LSA * prev_lsa, bool is_media_crash, time_t * stop_at, bool * did_incom_recovery)
{
  LOG_REC_DONETIME *donetime;
  int tran_index;
  time_t last_at_time;
  char time_val[CTIME_MAX];
  LOG_LSA record_header_lsa;

  /*
   * The transaction has been fully completed. therefore, it was not
   * active at the time of the crash
   */
  tran_index = logtb_find_tran_index (thread_p, tran_id);

  if (is_media_crash != true)
    {
      goto end;
    }

  LSA_COPY (&record_header_lsa, log_lsa);

  /*
   * Need to read the donetime record to find out if we need to stop
   * the recovery at this point.
   */
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_DONETIME), log_lsa, log_page_p);

  donetime = (LOG_REC_DONETIME *) ((char *) log_page_p->area + log_lsa->offset);
  last_at_time = (time_t) donetime->at_time;
  if (stop_at != NULL && *stop_at != (time_t) (-1) && difftime (*stop_at, last_at_time) < 0)
    {
#if !defined(NDEBUG)
      if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
    {
      fprintf (stdout, msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_STARTS));
      (void) ctime_r (&last_at_time, time_val);
      fprintf (stdout,
           msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_INCOMPLTE_MEDIA_RECOVERY),
           record_header_lsa.pageid, record_header_lsa.offset, time_val);
      fprintf (stdout, msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_STARTS));
      fflush (stdout);
    }
#endif /* !NDEBUG */
      /*
       * Reset the log active and stop the recovery process at this
       * point. Before resetting the log, make sure that we are not
       * holding a page.
       */
      log_lsa->pageid = NULL_PAGEID;
      log_recovery_resetlog (thread_p, &record_header_lsa, prev_lsa);
      *did_incom_recovery = true;

      return NO_ERROR;
    }

end:

  /*
   * The transaction has been fully completed. Therefore, it was not
   * active at the time of the crash
   */
  if (tran_index != NULL_TRAN_INDEX)
    {
      logtb_free_tran_index (thread_p, tran_index);
    }

  return NO_ERROR;
}

/* TODO: We need to understand how recovery of system operations really works. We need to find its limitations.
 *   For now, I did find out this:
 *   1. if tdes->topops.last is 0 (cannot be bigger during recovery), it means the transaction should be in
 *      state TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE. tdes->topops.last may be incremented by
 *      log_rv_analysis_sysop_start_postpone or may be already 0 from checkpoint collected topops.
 *   2. In all other cases it must be -1.
 *   3. We used to consider that first sys op commit (or abort) that follows is the one that should end the postpone
 *      phase.
 *
 *   Now, for TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE state we will expect last to be always 0.
 *   We will handle sys op ends like:
 *   - commit, logical undo, logical compensate: we consider this ends the
 *         TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE phase.
 *   - logical run postpone: it depends on is_sysop_postpone. if true, then
 *         TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE continues. If it is false, then it commits the system op and
 *         transitions to TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE.
 *   - abort: we assume this was a nested run postpone that got aborted
 *
 *   In the future, I hope we can come up with a more clear and less restrictive system to handle system operations
 *   during recovery.
 */

/*
 * log_rv_analysis_sysop_end () - Analyze system operation commit log record.
 *
 * return      : Error code.
 * thread_p (in)   : Thread entry.
 * tran_id (in)    : Transaction ID.
 * log_lsa (in)    : LSA of log record.
 * log_page_p (in) : Page of log record.
 */
static int
log_rv_analysis_sysop_end (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa, LOG_PAGE * log_page_p)
{
  LOG_TDES *tdes;
  LOG_REC_SYSOP_END *sysop_end;
  bool commit_start_postpone = false;

  /* The top system action is declared as finished. Pop it from the stack of finished actions */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_complete_topope");
      return ER_FAILED;
    }

  LSA_COPY (&tdes->tail_lsa, log_lsa);
  LSA_COPY (&tdes->undo_nxlsa, &tdes->tail_lsa);
  LSA_COPY (&tdes->tail_topresult_lsa, &tdes->tail_lsa);

  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*sysop_end), log_lsa, log_page_p);
  sysop_end = (LOG_REC_SYSOP_END *) (log_page_p->area + log_lsa->offset);

  LOG_SYSOP_END_TYPE_CHECK (sysop_end->type);

  switch (sysop_end->type)
    {
    case LOG_SYSOP_END_ABORT:
      /* abort does not change state and does not finish TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE */
      if (tdes->state == TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE)
    {
      /* no undo */
      LSA_SET_NULL (&tdes->undo_nxlsa);
    }
      tdes->rcv.analysis_last_aborted_sysop_lsa = *log_lsa;
      tdes->rcv.analysis_last_aborted_sysop_start_lsa = sysop_end->lastparent_lsa;
      break;

    case LOG_SYSOP_END_COMMIT:
      assert (tdes->state != TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE);
    case LOG_SYSOP_END_LOGICAL_UNDO:
    case LOG_SYSOP_END_LOGICAL_MVCC_UNDO:
      /* todo: I think it will be safer to save previous states in nested system operations, rather than rely on context
       *       to guess it. we should consider that for cherry. */
      commit_start_postpone = true;
      break;

    case LOG_SYSOP_END_LOGICAL_COMPENSATE:
      /* compensate undo */
      tdes->undo_nxlsa = sysop_end->compensate_lsa;
      commit_start_postpone = true;
      break;

    case LOG_SYSOP_END_LOGICAL_RUN_POSTPONE:
      /* we have a complicated story here, because logical run postpone can be run in two situations: transaction
       * postpone or system op postpone.
       * logical run postpone in transaction postpone can have other postpones inside it (e.g. file destroy).
       *
       * as a consequence, if state is TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE, we have an ambiguous case:
       * 1. it can be from a system op postpone and the logical run postpone is run for a postpone log record inside the
       *    system op.
       * 2. it can be from a transaction postpone and this is the postpone phase of logical run postpone system op.
       * the ambiguity is solved with run_postpone.is_sysop_postpone field, which is true if run postpone belongs to
       * system op postpone state and false if it belongs to transaction postpone phase.
       */

      if (sysop_end->run_postpone.is_sysop_postpone)
    {
      /* run postpone for log record inside a system op */
      if (tdes->topops.last < 0 || tdes->state != TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE)
        {
          if (tdes->topops.max == 0 && logtb_realloc_topops_stack (tdes, 1) == NULL)
        {
          /* Out of memory */
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
          return ER_OUT_OF_VIRTUAL_MEMORY;
        }
          tdes->topops.last = 0;
          tdes->state = TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE;
        }
      tdes->topops.stack[tdes->topops.last].posp_lsa = sysop_end->run_postpone.postpone_lsa;
    }
      else
    {
      /* run postpone for log record inside transaction */
      tdes->posp_nxlsa = sysop_end->run_postpone.postpone_lsa;
      if (tdes->topops.last != -1)
        {
          assert (tdes->topops.last == 0);
          assert (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE);
        }
      else
        {
          /* state must be TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE. it may be TRAN_UNACTIVE_UNILATERALLY_ABORTED */
          tdes->state = TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE;
        }

      /* no undo */
      LSA_SET_NULL (&tdes->undo_nxlsa);

      /* TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE transition to TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE */
      commit_start_postpone = true;
    }
      break;
    }

  if (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE)
    {
      // topops stack is bumped when system operation start postpone is found.
      assert (tdes->topops.last == 0);
      if (commit_start_postpone)
    {
      /* change state to previous state, which is either TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE or
       * TRAN_UNACTIVE_UNILATERALLY_ABORTED. Use tdes->rcv.tran_start_postpone_lsa to determine which case it is. */
      if (!LSA_ISNULL (&tdes->rcv.tran_start_postpone_lsa))
        {
          /* this must be after start postpone */
          assert (LSA_LE (&tdes->rcv.tran_start_postpone_lsa, &sysop_end->lastparent_lsa));
          tdes->state = TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE;
        }
      else
        {
          /* default state in recovery is TRAN_UNACTIVE_UNILATERALLY_ABORTED */
          tdes->state = TRAN_UNACTIVE_UNILATERALLY_ABORTED;
        }
      tdes->topops.last = -1;
    }
      else
    {
      tdes->topops.last = 0;
    }
    }
  else
    {
      assert (tdes->topops.last == -1);
      tdes->topops.last = -1;
    }

  // if this is the end of atomic system operation or system operation postpone phase, now it is time to reset it
  //
  // NOTE - we might actually be in both a system operation postpone phase and an atomic system operation, one nested
  //        in the other. we need to check which is last and end sysop should belong to that.
  //
  // NOTE - I really hate this guessing state system and we really, really should consider a more deterministic way.
  //        Logging ALL started system operations and replicating the system operation stack precisely would really
  //        help us avoiding all these ambiguities.
  //

  // do we reset atomic sysop? next conditions must be met:
  // 1. is there atomic system operation started?
  // 2. is atomic system operation more recent than start postpone?
  // 3. is atomic system operation equal or more recent to system operation last parent?
  if (!LSA_ISNULL (&tdes->rcv.atomic_sysop_start_lsa)   /* 1 */
      && LSA_GT (&tdes->rcv.atomic_sysop_start_lsa, &tdes->rcv.sysop_start_postpone_lsa)    /* 2 */
      && LSA_GT (&tdes->rcv.atomic_sysop_start_lsa, &sysop_end->lastparent_lsa) /* 3 */ )
    {
      /* reset tdes->rcv.atomic_sysop_start_lsa */
      LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
    }
  // do we reset sysop start postpone? next conditions must be met:
  // 1. is there system operation start postpone in progress?
  // 2. is system operation start postpone more recent than atomic system operation?
  // 3. is system operation start postpone more recent than system operation last parent?
  if (!LSA_ISNULL (&tdes->rcv.sysop_start_postpone_lsa)
      && LSA_GT (&tdes->rcv.sysop_start_postpone_lsa, &tdes->rcv.atomic_sysop_start_lsa)
      && LSA_GT (&tdes->rcv.sysop_start_postpone_lsa, &sysop_end->lastparent_lsa))
    {
      /* reset tdes->rcv.sysop_start_postpone_lsa */
      LSA_SET_NULL (&tdes->rcv.sysop_start_postpone_lsa);
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_start_checkpoint -
 *
 * return: error code
 *
 *   lsa(in/out):
 *   start_lsa(in/out):
 *   may_use_checkpoint(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_start_checkpoint (LOG_LSA * log_lsa, LOG_LSA * start_lsa, bool * may_use_checkpoint)
{
  /*
   * Use the checkpoint record only if it is the first record in the
   * analysis. If it is not, it is likely that we are restarting from
   * crashes when the multimedia crashes were off. We skip the
   * checkpoint since it can contain stuff which does not exist any
   * longer.
   */

  if (LSA_EQ (log_lsa, start_lsa))
    {
      *may_use_checkpoint = true;
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_end_checkpoint -
 *
 * return: error code
 *
 *   lsa(in/out):
 *   log_page_p(in/out):
 *   check_point(in/out):
 *   start_redo_lsa(in/out):
 *   may_use_checkpoint(in/out):
 *   may_need_synch_checkpoint_2pc(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_end_checkpoint (THREAD_ENTRY * thread_p, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
                LOG_LSA * check_point, LOG_LSA * start_redo_lsa, bool * may_use_checkpoint,
                bool * may_need_synch_checkpoint_2pc)
{
  LOG_TDES *tdes;
  LOG_REC_CHKPT *tmp_chkpt;
  LOG_REC_CHKPT chkpt;
  LOG_INFO_CHKPT_TRANS *chkpt_trans;
  LOG_INFO_CHKPT_TRANS *chkpt_one;
  LOG_INFO_CHKPT_SYSOP *chkpt_topops;
  LOG_INFO_CHKPT_SYSOP *chkpt_topone;
  int size;
  void *area;
  int i;

  LOG_PAGE *log_page_local = NULL;
  char log_page_buffer[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
  LOG_LSA log_lsa_local;
  LOG_REC_SYSOP_START_POSTPONE sysop_start_postpone;

  int error_code = NO_ERROR;

  /*
   * Use the checkpoint record only if it is the first record in the
   * analysis. If it is not, it is likely that we are restarting from
   * crashes when the multimedia crashes were off. We skip the
   * checkpoint since it can contain stuff which does not exist any
   * longer.
   */

  if (*may_use_checkpoint == false)
    {
      return NO_ERROR;
    }
  *may_use_checkpoint = false;

  /*
   * Read the checkpoint record information to find out the
   * start_redolsa and the active transactions
   */

  LSA_COPY (check_point, log_lsa);

  /* Read the DATA HEADER */
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_CHKPT), log_lsa, log_page_p);
  tmp_chkpt = (LOG_REC_CHKPT *) ((char *) log_page_p->area + log_lsa->offset);
  chkpt = *tmp_chkpt;

  /* GET THE CHECKPOINT TRANSACTION INFORMATION */
  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_CHKPT), log_lsa, log_page_p);

  /* Now get the data of active transactions */

  area = NULL;
  size = sizeof (LOG_INFO_CHKPT_TRANS) * chkpt.ntrans;
  if (log_lsa->offset + size < (int) LOGAREA_SIZE)
    {
      chkpt_trans = (LOG_INFO_CHKPT_TRANS *) ((char *) log_page_p->area + log_lsa->offset);
      log_lsa->offset += size;
    }
  else
    {
      /* Need to copy the data into a contiguous area */
      area = malloc (size);
      if (area == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }
      /* Copy the data */
      logpb_copy_from_log (thread_p, (char *) area, size, log_lsa, log_page_p);
      chkpt_trans = (LOG_INFO_CHKPT_TRANS *) area;
    }

  /* Add the transactions to the transaction table */
  for (i = 0; i < chkpt.ntrans; i++)
    {
      /*
       * If this is the first time, the transaction is seen. Assign a
       * new index to describe it and assume that the transaction was
       * active at the time of the crash, and thus it will be
       * unilaterally aborted. The truth of this statement will be find
       * reading the rest of the log
       */
      tdes = logtb_rv_find_allocate_tran_index (thread_p, chkpt_trans[i].trid, log_lsa);
      if (tdes == NULL)
    {
      if (area != NULL)
        {
          free_and_init (area);
        }

      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
      return ER_FAILED;
    }
      chkpt_one = &chkpt_trans[i];

      /*
       * Clear the transaction since it may have old stuff in it.
       * Use the one that is find in the checkpoint record
       */
      logtb_clear_tdes (thread_p, tdes);

      tdes->isloose_end = chkpt_one->isloose_end;
      if (chkpt_one->state == TRAN_ACTIVE || chkpt_one->state == TRAN_UNACTIVE_ABORTED)
    {
      tdes->state = TRAN_UNACTIVE_UNILATERALLY_ABORTED;
    }
      else
    {
      tdes->state = chkpt_one->state;
    }
      LSA_COPY (&tdes->head_lsa, &chkpt_one->head_lsa);
      LSA_COPY (&tdes->tail_lsa, &chkpt_one->tail_lsa);
      LSA_COPY (&tdes->undo_nxlsa, &chkpt_one->undo_nxlsa);
      LSA_COPY (&tdes->posp_nxlsa, &chkpt_one->posp_nxlsa);
      LSA_COPY (&tdes->savept_lsa, &chkpt_one->savept_lsa);
      LSA_COPY (&tdes->tail_topresult_lsa, &chkpt_one->tail_topresult_lsa);
      LSA_COPY (&tdes->rcv.tran_start_postpone_lsa, &chkpt_one->start_postpone_lsa);
      tdes->client.set_system_internal_with_user (chkpt_one->user_name);
      if (LOG_ISTRAN_2PC (tdes))
    {
      *may_need_synch_checkpoint_2pc = true;
    }
    }

  if (area != NULL)
    {
      free_and_init (area);
    }

  /*
   * Now add top system operations that were in the process of
   * commit to this transactions
   */

  log_page_local = (LOG_PAGE *) PTR_ALIGN (log_page_buffer, MAX_ALIGNMENT);
  log_page_local->hdr.logical_pageid = NULL_PAGEID;
  log_page_local->hdr.offset = NULL_OFFSET;

  if (chkpt.ntops > 0)
    {
      size = sizeof (LOG_INFO_CHKPT_SYSOP) * chkpt.ntops;
      if (log_lsa->offset + size < (int) LOGAREA_SIZE)
    {
      chkpt_topops = ((LOG_INFO_CHKPT_SYSOP *) ((char *) log_page_p->area + log_lsa->offset));
      log_lsa->offset += size;
    }
      else
    {
      /* Need to copy the data into a contiguous area */
      area = malloc (size);
      if (area == NULL)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
          return ER_OUT_OF_VIRTUAL_MEMORY;
        }
      /* Copy the data */
      logpb_copy_from_log (thread_p, (char *) area, size, log_lsa, log_page_p);
      chkpt_topops = (LOG_INFO_CHKPT_SYSOP *) area;
    }

      /* Add the top system operations to the transactions */

      for (i = 0; i < chkpt.ntops; i++)
    {
      chkpt_topone = &chkpt_topops[i];
      tdes = logtb_rv_find_allocate_tran_index (thread_p, chkpt_topone->trid, log_lsa);
      if (tdes == NULL)
        {
          if (area != NULL)
        {
          free_and_init (area);
        }

          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
          return ER_FAILED;
        }

      if (tdes->topops.max == 0 || (tdes->topops.last + 1) >= tdes->topops.max)
        {
          if (logtb_realloc_topops_stack (tdes, chkpt.ntops) == NULL)
        {
          if (area != NULL)
            {
              free_and_init (area);
            }

          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
          return ER_OUT_OF_VIRTUAL_MEMORY;
        }
        }

      tdes->rcv.sysop_start_postpone_lsa = chkpt_topone->sysop_start_postpone_lsa;
      tdes->rcv.atomic_sysop_start_lsa = chkpt_topone->atomic_sysop_start_lsa;
      if (!chkpt_topone->sysop_start_postpone_lsa.is_null ())
        {
          // Bump the sysop level to save lastparent_lsa and posp_lsa.
          if (tdes->topops.last == -1)
        {
          tdes->topops.last++;
        }
          else
        {
          assert (tdes->topops.last == 0);
        }

          log_lsa_local = chkpt_topone->sysop_start_postpone_lsa;
          error_code =
        log_read_sysop_start_postpone (thread_p, &log_lsa_local, log_page_local, false, &sysop_start_postpone,
                           NULL, NULL, NULL, NULL);
          if (error_code != NO_ERROR)
        {
          assert (false);
          return error_code;
        }
          tdes->topops.stack[tdes->topops.last].lastparent_lsa = sysop_start_postpone.sysop_end.lastparent_lsa;
          tdes->topops.stack[tdes->topops.last].posp_lsa = sysop_start_postpone.posp_lsa;
        }
    }
    }

  if (LSA_LT (&chkpt.redo_lsa, start_redo_lsa))
    {
      LSA_COPY (start_redo_lsa, &chkpt.redo_lsa);
    }

  if (area != NULL)
    {
      free_and_init (area);
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_save_point -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_save_point (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it and assume that the transaction was active
   * at the time of the crash, and thus it will be unilateraly
   * aborted. The truth of this statement will be find reading the
   * rest of the log
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_save_point");
      return ER_FAILED;
    }

  /* New tail, next to undo and savepoint */
  LSA_COPY (&tdes->tail_lsa, log_lsa);
  LSA_COPY (&tdes->undo_nxlsa, &tdes->tail_lsa);
  LSA_COPY (&tdes->savept_lsa, &tdes->tail_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_2pc_prepare -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_2pc_prepare (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction has agreed not to
   * unilaterally abort the transaction.
   * This is a participant of the transaction
   */

  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_2pc_prepare");
      return ER_FAILED;
    }

  tdes->state = TRAN_UNACTIVE_2PC_PREPARE;
  LSA_COPY (&tdes->tail_lsa, log_lsa);

  /* Put a note that prepare_to_commit log record needs to be read during either redo phase, or during
   * finish_commit_protocol phase */
  tdes->gtrid = LOG_2PC_NULL_GTRID;

  return NO_ERROR;
}

/*
 * log_rv_analysis_2pc_start -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_2pc_start (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction was part of the two phase
   * commit process. This is a coordinator site.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_2pc_start");
      return ER_FAILED;
    }

  tdes->state = TRAN_UNACTIVE_2PC_COLLECTING_PARTICIPANT_VOTES;
  LSA_COPY (&tdes->tail_lsa, log_lsa);

  /* Put a note that prepare_to_commit log record needs to be read during either redo phase, or during
   * finish_commit_protocol phase */
  tdes->gtrid = LOG_2PC_NULL_GTRID;

  return NO_ERROR;
}

/*
 * log_rv_analysis_2pc_commit_decision -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_2pc_commit_decision (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction was part of the two phase
   * commit process. A commit decsion has been agreed.
   * This is a coordinator site.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_2pc_commit_decision");
      return ER_FAILED;
    }

  tdes->state = TRAN_UNACTIVE_2PC_COMMIT_DECISION;
  LSA_COPY (&tdes->tail_lsa, log_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_2pc_abort_decision -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_2pc_abort_decision (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction was part of the two phase
   * commit process. An abort decsion has been decided.
   * This is a coordinator site.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_2pc_abort_decision");
      return ER_FAILED;
    }

  tdes->state = TRAN_UNACTIVE_2PC_ABORT_DECISION;
  LSA_COPY (&tdes->tail_lsa, log_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_2pc_commit_inform_particps -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_2pc_commit_inform_particps (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction was part of the two phase
   * commit process. A commit decsion has been agreed and the
   * transaction was waiting on acknowledgment from participants
   * This is a coordinator site.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_rv_analysis_2pc_commit_inform_particps");
      return ER_FAILED;
    }

  tdes->state = TRAN_UNACTIVE_COMMITTED_INFORMING_PARTICIPANTS;
  LSA_COPY (&tdes->tail_lsa, log_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_2pc_abort_inform_particps -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_2pc_abort_inform_particps (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  /*
   * If this is the first time, the transaction is seen. Assign a new
   * index to describe it. The transaction was part of the two phase
   * commit process. An abort decsion has been decided and the
   * transaction was waiting on acknowledgment from participants
   * This is a coordinator site.
   */
  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
      return ER_FAILED;
    }

  tdes->state = TRAN_UNACTIVE_ABORTED_INFORMING_PARTICIPANTS;
  LSA_COPY (&tdes->tail_lsa, log_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_2pc_recv_ack -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_2pc_recv_ack (THREAD_ENTRY * thread_p, int tran_id, LOG_LSA * log_lsa)
{
  LOG_TDES *tdes;

  tdes = logtb_rv_find_allocate_tran_index (thread_p, tran_id, log_lsa);
  if (tdes == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
      return ER_FAILED;
    }

  LSA_COPY (&tdes->tail_lsa, log_lsa);

  return NO_ERROR;
}

/*
 * log_rv_analysis_log_end -
 *
 * return: error code
 *
 *   tran_id(in):
 *   lsa(in/out):
 *
 * Note:
 */
static int
log_rv_analysis_log_end (int tran_id, LOG_LSA * log_lsa)
{
  if (!logpb_is_page_in_archive (log_lsa->pageid))
    {
      /*
       * Reset the log header for the recovery undo operation
       */
      LOG_RESET_APPEND_LSA (log_lsa);
      log_Gl.hdr.next_trid = tran_id;
    }

  return NO_ERROR;
}

/*
 * log_rv_analysis_record -
 *
 * return: error code
 *
 *
 * Note:
 */
static void
log_rv_analysis_record (THREAD_ENTRY * thread_p, LOG_RECTYPE log_type, int tran_id, LOG_LSA * log_lsa,
            LOG_PAGE * log_page_p, LOG_LSA * checkpoint_lsa, LOG_LSA * prev_lsa, LOG_LSA * start_lsa,
            LOG_LSA * start_redo_lsa, bool is_media_crash, time_t * stop_at, bool * did_incom_recovery,
            bool * may_use_checkpoint, bool * may_need_synch_checkpoint_2pc)
{
  switch (log_type)
    {
    case LOG_UNDOREDO_DATA:
    case LOG_DIFF_UNDOREDO_DATA:
    case LOG_UNDO_DATA:
    case LOG_REDO_DATA:
    case LOG_MVCC_UNDOREDO_DATA:
    case LOG_MVCC_DIFF_UNDOREDO_DATA:
    case LOG_MVCC_UNDO_DATA:
    case LOG_MVCC_REDO_DATA:
    case LOG_DBEXTERN_REDO_DATA:
      (void) log_rv_analysis_undo_redo (thread_p, tran_id, log_lsa);
      break;

    case LOG_DUMMY_HEAD_POSTPONE:
      (void) log_rv_analysis_dummy_head_postpone (thread_p, tran_id, log_lsa);
      break;

    case LOG_POSTPONE:
      (void) log_rv_analysis_postpone (thread_p, tran_id, log_lsa);
      break;

    case LOG_RUN_POSTPONE:
      (void) log_rv_analysis_run_postpone (thread_p, tran_id, log_lsa, log_page_p, checkpoint_lsa);
      break;

    case LOG_COMPENSATE:
      (void) log_rv_analysis_compensate (thread_p, tran_id, log_lsa, log_page_p);
      break;

    case LOG_COMMIT_WITH_POSTPONE:
      (void) log_rv_analysis_commit_with_postpone (thread_p, tran_id, log_lsa, log_page_p, prev_lsa, is_media_crash,
                           stop_at, did_incom_recovery);
      break;

    case LOG_COMMIT_WITH_POSTPONE_OBSOLETE:
      (void) log_rv_analysis_commit_with_postpone_obsolete (thread_p, tran_id, log_lsa, log_page_p);
      break;

    case LOG_SYSOP_START_POSTPONE:
      (void) log_rv_analysis_sysop_start_postpone (thread_p, tran_id, log_lsa, log_page_p);
      break;

    case LOG_COMMIT:
    case LOG_ABORT:
      (void) log_rv_analysis_complete (thread_p, tran_id, log_lsa, log_page_p, prev_lsa, is_media_crash, stop_at,
                       did_incom_recovery);
      break;

    case LOG_SYSOP_END:
      log_rv_analysis_sysop_end (thread_p, tran_id, log_lsa, log_page_p);
      break;

    case LOG_START_CHKPT:
      log_rv_analysis_start_checkpoint (log_lsa, start_lsa, may_use_checkpoint);
      break;

    case LOG_END_CHKPT:
      log_rv_analysis_end_checkpoint (thread_p, log_lsa, log_page_p, checkpoint_lsa, start_redo_lsa,
                      may_use_checkpoint, may_need_synch_checkpoint_2pc);
      break;

    case LOG_SAVEPOINT:
      (void) log_rv_analysis_save_point (thread_p, tran_id, log_lsa);
      break;

    case LOG_2PC_PREPARE:
      (void) log_rv_analysis_2pc_prepare (thread_p, tran_id, log_lsa);
      break;

    case LOG_2PC_START:
      (void) log_rv_analysis_2pc_start (thread_p, tran_id, log_lsa);
      break;

    case LOG_2PC_COMMIT_DECISION:
      (void) log_rv_analysis_2pc_commit_decision (thread_p, tran_id, log_lsa);
      break;

    case LOG_2PC_ABORT_DECISION:
      (void) log_rv_analysis_2pc_abort_decision (thread_p, tran_id, log_lsa);
      break;

    case LOG_2PC_COMMIT_INFORM_PARTICPS:
      (void) log_rv_analysis_2pc_commit_inform_particps (thread_p, tran_id, log_lsa);
      break;

    case LOG_2PC_ABORT_INFORM_PARTICPS:
      (void) log_rv_analysis_2pc_abort_inform_particps (thread_p, tran_id, log_lsa);
      break;

    case LOG_2PC_RECV_ACK:
      (void) log_rv_analysis_2pc_recv_ack (thread_p, tran_id, log_lsa);
      break;

    case LOG_END_OF_LOG:
      (void) log_rv_analysis_log_end (tran_id, log_lsa);
      break;

    case LOG_SYSOP_ATOMIC_START:
      (void) log_rv_analysis_atomic_sysop_start (thread_p, tran_id, log_lsa);
      break;

    case LOG_DUMMY_CRASH_RECOVERY:
    case LOG_REPLICATION_DATA:
    case LOG_REPLICATION_STATEMENT:
    case LOG_DUMMY_HA_SERVER_STATE:
    case LOG_DUMMY_OVF_RECORD:
    case LOG_DUMMY_GENERIC:
    case LOG_SUPPLEMENTAL_INFO:
      break;

    case LOG_SMALLER_LOGREC_TYPE:
    case LOG_LARGER_LOGREC_TYPE:
    default:
#if defined(CUBRID_DEBUG)
      er_log_debug (ARG_FILE_LINE,
            "log_recovery_analysis: Unknown record type = %d (%s) ... May be a system error\n", log_rtype,
            log_to_string (log_rtype));
#endif /* CUBRID_DEBUG */
      /* If we are here, probably the log is corrupted.  */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_PAGE_CORRUPTED, 1, log_lsa->pageid);
      assert (false);
      break;
    }
}

/*
 * log_is_page_of_record_broken - check last page of the record
 *
 * return: true, if last page of the record is broken. false, if it is sane
 *
 *   log_lsa(in): Log record address
 *   log_rec_header(in): Log record header
 */
static bool
log_is_page_of_record_broken (THREAD_ENTRY * thread_p, const LOG_LSA * log_lsa,
                  const LOG_RECORD_HEADER * log_rec_header)
{
  char fwd_log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
  char *fwd_aligned_log_pgbuf;
  LOG_PAGE *log_fwd_page_p;
  LOG_LSA fwd_log_lsa;
  bool is_log_page_broken = false;

  assert (log_lsa != NULL && log_rec_header != NULL);

  fwd_aligned_log_pgbuf = PTR_ALIGN (fwd_log_pgbuf, MAX_ALIGNMENT);
  log_fwd_page_p = (LOG_PAGE *) fwd_aligned_log_pgbuf;

  LSA_COPY (&fwd_log_lsa, &log_rec_header->forw_lsa);

  /* TODO - Do we need to handle NULL fwd_log_lsa? */
  if (!LSA_ISNULL (&fwd_log_lsa))
    {
      /* log_Gl.hdr.eof_lsa can have a NULL_LSA value if recovery is started without an active log volume. Its value will be recovered during the log_recovery_analysis process. */
      if (LSA_GE (log_lsa, &fwd_log_lsa)
      || (!LSA_ISNULL (&log_Gl.hdr.eof_lsa) && LSA_GT (&fwd_log_lsa, &log_Gl.hdr.eof_lsa)))
    {
      // check fwd_log_lsa value if it is corrupted or not
      is_log_page_broken = true;
    }
      else
    {
      if (fwd_log_lsa.pageid != log_lsa->pageid
          && (fwd_log_lsa.offset != 0 || fwd_log_lsa.pageid > log_lsa->pageid + 1))
        {
          // The current log record spreads into several log pages.
          // Check whether the last page of the record exists.
          if (logpb_fetch_page (thread_p, &fwd_log_lsa, LOG_CS_FORCE_USE, log_fwd_page_p) != NO_ERROR)
        {
          /* The forward log page does not exists. */
          is_log_page_broken = true;
        }
        }
    }
    }

  return is_log_page_broken;
}

/*
 * log_recovery_analysis - FIND STATE OF TRANSACTIONS AT SYSTEM CRASH
 *
 * return: nothing
 *
 *   start_lsa(in): Starting address for the analysis phase
 *   start_redolsa(in/out): Starting address for redo phase
 *   end_redo_lsa(in):
 *   ismedia_crash(in): Are we recovering from a media crash ?
 *   stopat(in/out): Where to stop the recovery process.
 *                   (It may be set as a side effectto the location of last
 *                    recovery transaction).
 *   did_incom_recovery(in):
 *
 * NOTE: The recovery analysis phase scans the log forward since the
 *              last checkpoint record reflected in the log and the data
 *              volumes. The transaction table and the starting address for
 *              redo phase is created. When this phase is finished, we know
 *              the transactions that need to be unilateraly aborted (active)
 *              and the transactions that have to be completed due to postpone
 *              actions and client loose ends.
 */

static void
log_recovery_analysis (THREAD_ENTRY * thread_p, LOG_LSA * start_lsa, LOG_LSA * start_redo_lsa,
               LOG_LSA * end_redo_lsa, bool is_media_crash, time_t * stop_at, bool * did_incom_recovery,
               INT64 * num_redo_log_records)
{
  LOG_LSA checkpoint_lsa = { -1, -1 };
  LOG_LSA lsa;          /* LSA of log record to analyse */
  char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT], *aligned_log_pgbuf;
  LOG_PAGE *log_page_p = NULL;  /* Log page pointer where LSA is located */
  LOG_LSA log_lsa;
  LOG_LSA prev_lsa;
  LOG_LSA prev_prev_lsa;
  LOG_LSA first_corrupted_rec_lsa;
  LOG_RECTYPE log_rtype;    /* Log record type */
  LOG_RECORD_HEADER *log_rec = NULL;
  LOG_REC_CHKPT *tmp_chkpt; /* Temp Checkpoint log record */
  LOG_REC_CHKPT chkpt;      /* Checkpoint log record */
  LOG_INFO_CHKPT_TRANS *chkpt_trans;
  time_t last_at_time = -1;
  char time_val[CTIME_MAX];
  bool may_need_synch_checkpoint_2pc = false, may_use_checkpoint = false, is_log_page_corrupted = false;
  int tran_index;
  TRANID tran_id;
  LOG_TDES *tdes;       /* Transaction descriptor */
  void *area = NULL;
  int size, i;
  const int block_size = 4 * ONE_K;
  int start_record_block, end_record_block;
  char null_buffer[block_size + MAX_ALIGNMENT], *null_block;
  int max_num_blocks = LOG_PAGESIZE / block_size;
  int last_checked_page_id = NULL_PAGEID;
  bool is_log_page_broken;

  aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
  null_block = PTR_ALIGN (null_buffer, MAX_ALIGNMENT);
  memset (null_block, LOG_PAGE_INIT_VALUE, block_size);

  if (num_redo_log_records != NULL)
    {
      *num_redo_log_records = 0;
    }

  /*
   * Find the committed, aborted, and unilaterally aborted (active) transactions at system crash
   */

  LSA_SET_NULL (&first_corrupted_rec_lsa);
  LSA_COPY (&lsa, start_lsa);

  LSA_COPY (start_redo_lsa, &lsa);
  LSA_COPY (end_redo_lsa, &lsa);
  LSA_COPY (&prev_lsa, &lsa);
  prev_prev_lsa.set_null ();
  *did_incom_recovery = false;

  log_page_p = (LOG_PAGE *) aligned_log_pgbuf;

  is_log_page_broken = false;
  while (!LSA_ISNULL (&lsa))
    {
      /* Fetch the page where the LSA record to undo is located */
      LSA_COPY (&log_lsa, &lsa);

      /* We may fetch only if log page not already broken, but is better in this way. */
      if (logpb_fetch_page (thread_p, &log_lsa, LOG_CS_FORCE_USE, log_page_p) != NO_ERROR)
    {
      // unable to fetch the current log page.
      is_log_page_broken = true;
    }

      if (is_log_page_broken)
    {
      if (is_media_crash == true)
        {
          if (stop_at != NULL)
        {
          *stop_at = last_at_time;
        }

#if !defined(NDEBUG)
          if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
        {
          fprintf (stdout, msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_STARTS));
          (void) ctime_r (&last_at_time, time_val);
          fprintf (stdout,
               msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG,
                       MSGCAT_LOG_INCOMPLTE_MEDIA_RECOVERY), end_redo_lsa->pageid,
               end_redo_lsa->offset, ((last_at_time == -1) ? "???...\n" : time_val));
          fprintf (stdout, msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_STARTS));
          fflush (stdout);
        }
#endif /* !NDEBUG */

          /* if previous log record exists, reset tdes->tail_lsa/undo_nxlsa as previous of end_redo_lsa */
          if (log_rec != NULL)
        {
          LOG_TDES *last_log_tdes = LOG_FIND_TDES (logtb_find_tran_index (thread_p, log_rec->trid));
          if (last_log_tdes != NULL)
            {
              LSA_COPY (&last_log_tdes->tail_lsa, &log_rec->prev_tranlsa);
              LSA_COPY (&last_log_tdes->undo_nxlsa, &log_rec->prev_tranlsa);
              er_log_debug (ARG_FILE_LINE, "%s: trid = %d, tail_lsa=%lld|%d\n",
                    __func__, log_rec->trid, last_log_tdes->tail_lsa.pageid,
                    last_log_tdes->tail_lsa.offset);
            }
        }
          assert (!prev_lsa.is_null ());
          if (logpb_fetch_page (thread_p, &prev_lsa, LOG_CS_FORCE_USE, log_page_p) != NO_ERROR)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "reset log is impossible");
          return;
        }
          log_recovery_resetlog (thread_p, &prev_lsa, &prev_prev_lsa);
          *did_incom_recovery = true;

          log_Gl.mvcc_table.reset_start_mvccid ();
          return;
        }
      else
        {
          if (er_errid () == ER_TDE_CIPHER_IS_NOT_LOADED)
        {
          /* TDE Moudle has to be loaded because there are some TDE-encrypted log pages */
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE,
                     "log_recovery_analysis: log page %lld has been encrypted (TDE) and cannot be decrypted",
                     log_page_p->hdr.logical_pageid);
        }
          else
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
        }
          return;
        }
    }

      /* Check all log records in this phase */
      while (!LSA_ISNULL (&lsa) && lsa.pageid == log_lsa.pageid)
    {
      /*
       * If an offset is missing, it is because an incomplete log record was
       * archived. This log_record was completed later. Thus, we have to
       * find the offset by searching for the next log_record in the page
       */
      if (lsa.offset == NULL_OFFSET)
        {
          lsa.offset = log_page_p->hdr.offset;
          if (lsa.offset == NULL_OFFSET)
        {
          /* Continue with next pageid */
          if (logpb_is_page_in_archive (log_lsa.pageid))
            {
              lsa.pageid = log_lsa.pageid + 1;
            }
          else
            {
              lsa.pageid = NULL_PAGEID;
            }
          continue;
        }
        }

      /* If the page changed, check whether is corrupted. */
      if (last_checked_page_id != log_lsa.pageid)
        {
#if !defined(NDEBUG)
          er_log_debug (ARG_FILE_LINE, "%s: log page %lld, checksum %d\n",
                __func__, log_page_p->hdr.logical_pageid, log_page_p->hdr.checksum);
          if (prm_get_bool_value (PRM_ID_LOGPB_LOGGING_DEBUG))
        {
          fileio_page_hexa_dump ((const char *) log_page_p, LOG_PAGESIZE);
        }
#endif /* !NDEBUG */

          /* Check whether active log pages are corrupted. This may happen in case of partial page flush for instance. */
          if (logpb_page_check_corruption (thread_p, log_page_p, &is_log_page_corrupted) != NO_ERROR)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
          return;
        }

          if (is_log_page_corrupted)
        {
          if (logpb_is_page_in_archive (log_lsa.pageid))
            {
              /* Should not happen. */
              logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
              return;
            }

          /* Set first corrupted record lsa, if a block is corrupted. */
          logpb_page_get_first_null_block_lsa (thread_p, log_page_p, &first_corrupted_rec_lsa);

          /* Found corrupted log page. */
          if (prm_get_bool_value (PRM_ID_LOGPB_LOGGING_DEBUG))
            {
              _er_log_debug (ARG_FILE_LINE, "%s: log page %lld is corrupted due to partial flush.\n",
                     __func__, (long long int) log_lsa.pageid);
            }
        }

          /* Set last checked page id. */
          last_checked_page_id = log_lsa.pageid;
        }

      /* Find the log record */
      log_lsa.offset = lsa.offset;
      log_rec = LOG_GET_LOG_RECORD_HEADER (log_page_p, &log_lsa);

      if (is_media_crash == true)
        {
          /* Check also the last log page of current record. We need to check after obtaining log_rec. */
          is_log_page_broken = log_is_page_of_record_broken (thread_p, &log_lsa, log_rec);
          if (is_log_page_broken)
        {
          /* Needs to reset the log. It is done in the outer loop. Set end_redo and prev used at reset. */
          LSA_COPY (end_redo_lsa, &lsa);
          LSA_COPY (&prev_lsa, end_redo_lsa);
          prev_prev_lsa = prev_lsa;
          er_log_debug (ARG_FILE_LINE, "%s: broken record at LSA=%lld|%d ",
                __func__, log_lsa.pageid, log_lsa.offset);
          break;
        }
        }

      /* Check whether null LSA is reached. */
      if (!is_log_page_corrupted)
        {
          /* For safety reason. Normally, checksum must detect corrupted pages. */
          if (LSA_ISNULL (&log_rec->forw_lsa)
          && log_rec->type != LOG_END_OF_LOG && !logpb_is_page_in_archive (log_lsa.pageid))
        {
          /* Can't find the end of log. The next log is null. Consider the page corrupted. */
          logpb_page_get_first_null_block_lsa (thread_p, log_page_p, &first_corrupted_rec_lsa);
          is_log_page_corrupted = true;

          er_log_debug (ARG_FILE_LINE,
                "log_recovery_analysis: ** WARNING: An end of the log record was not found."
                "Latest log record at lsa = %lld|%d, first_corrupted_lsa = %lld|%d\n",
                log_lsa.pageid, log_lsa.offset, first_corrupted_rec_lsa.pageid,
                first_corrupted_rec_lsa.offset);
        }
          else if (log_rec->forw_lsa.pageid == log_lsa.pageid)
        {
          /* Quick fix. Sometimes page corruption is not detected. Check whether the current log record
           * is in corrupted block. If true, consider the page corrupted.
           */
          LOG_LSA temp_log_lsa;
          LSA_COPY (&temp_log_lsa, &log_rec->forw_lsa);
          temp_log_lsa.offset--;
          assert (log_lsa.offset >= 0 && temp_log_lsa.offset > log_lsa.offset);

          start_record_block = ((int) log_lsa.offset + sizeof (LOG_HDRPAGE)) / block_size;
          assert (start_record_block >= 0 && start_record_block < max_num_blocks);
          end_record_block = ((int) temp_log_lsa.offset + sizeof (LOG_HDRPAGE)) / block_size;
          assert (end_record_block >= 0 && end_record_block < max_num_blocks);

          if (start_record_block != end_record_block)
            {
              assert (start_record_block < end_record_block);
              if (memcmp (((char *) log_page_p) + (end_record_block * block_size), null_block, block_size) == 0)
            {
              /* The current record is corrupted - ends into a corrupted block. */
              LSA_COPY (&first_corrupted_rec_lsa, &log_lsa);
              is_log_page_corrupted = true;

              er_log_debug (ARG_FILE_LINE,
                    "log_recovery_analysis: ** WARNING: An end of the log record was not found."
                    "Latest log record at lsa = %lld|%d, first_corrupted_lsa = %lld|%d\n",
                    log_lsa.pageid, log_lsa.offset, first_corrupted_rec_lsa.pageid,
                    first_corrupted_rec_lsa.offset);
            }
            }
        }
        }

      if (is_log_page_corrupted && !LSA_ISNULL (&first_corrupted_rec_lsa))
        {
          /* If the record is corrupted - it resides in a corrupted block, then
           * resets append lsa to last valid address and stop.
           */
          if (LSA_GT (&log_lsa, &first_corrupted_rec_lsa))
        {
          LOG_RESET_APPEND_LSA (end_redo_lsa);
          LSA_SET_NULL (&lsa);
          break;
        }
          else
        {
          LOG_LSA temp_log_lsa;
          bool is_log_lsa_corrupted = false;

          if (LSA_EQ (&log_lsa, &first_corrupted_rec_lsa)
              || LSA_GT (&log_rec->forw_lsa, &first_corrupted_rec_lsa))
            {
              /* When log_lsa = first_corrupted_rec_lsa, forw_lsa may be NULL. */
              is_log_lsa_corrupted = true;
            }
          else
            {
              /* Check correctness of information from log header. */
              LSA_COPY (&temp_log_lsa, &log_lsa);
              temp_log_lsa.offset += sizeof (LOG_RECORD_HEADER);
              temp_log_lsa.offset = DB_ALIGN (temp_log_lsa.offset, DOUBLE_ALIGNMENT);

              if ((temp_log_lsa.offset > (int) LOGAREA_SIZE)
              || (LSA_GT (&temp_log_lsa, &first_corrupted_rec_lsa)))
            {
              is_log_lsa_corrupted = true;
            }
            }

          if (is_log_lsa_corrupted)
            {
              if (prm_get_bool_value (PRM_ID_LOGPB_LOGGING_DEBUG))
            {
              _er_log_debug (ARG_FILE_LINE,
                     "%s: Partial page flush - first corrupted log record LSA = (%lld, %d)\n",
                     __func__, (long long int) log_lsa.pageid, log_lsa.offset);
            }
              LOG_RESET_APPEND_LSA (&log_lsa);
              LSA_SET_NULL (&lsa);
              break;
            }
        }
        }

      tran_id = log_rec->trid;
      log_rtype = log_rec->type;

      /*
       * Save the address of last redo log record.
       * Get the address of next log record to scan
       */

      LSA_COPY (end_redo_lsa, &lsa);
      LSA_COPY (&lsa, &log_rec->forw_lsa);

      if ((is_log_page_corrupted) && (log_rtype != LOG_END_OF_LOG) && (lsa.pageid != log_lsa.pageid))
        {
          /* The page is corrupted, do not allow to advance to the next page. */
          LSA_SET_NULL (&lsa);
        }

      /*
       * If the next page is NULL_PAGEID and the current page is an archive
       * page, this is not the end of the log. This situation happens when an
       * incomplete log record is archived. Thus, its forward address is NULL.
       * Note that we have to set lsa.pageid here since the log_lsa.pageid value
       * can be changed (e.g., the log record is stored in two pages: an
       * archive page, and an active page. Later, we try to modify it whenever
       * is possible.
       */

      if (LSA_ISNULL (&lsa) && logpb_is_page_in_archive (log_lsa.pageid))
        {
          lsa.pageid = log_lsa.pageid + 1;
        }

      if (!LSA_ISNULL (&lsa) && log_lsa.pageid != NULL_PAGEID
          && (lsa.pageid < log_lsa.pageid || (lsa.pageid == log_lsa.pageid && lsa.offset <= log_lsa.offset)))
        {
          /* It seems to be a system error. Maybe a loop in the log */
          er_log_debug (ARG_FILE_LINE,
                "log_recovery_analysis: ** System error: It seems to be a loop in the log\n."
                " Current log_rec at %lld|%d. Next log_rec at %lld|%d\n", (long long int) log_lsa.pageid,
                log_lsa.offset, (long long int) lsa.pageid, lsa.offset);
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
          LSA_SET_NULL (&lsa);
          break;
        }

      if (LSA_ISNULL (&lsa) && log_rtype != LOG_END_OF_LOG && *did_incom_recovery == false)
        {
          LOG_RESET_APPEND_LSA (end_redo_lsa);
          if (log_startof_nxrec (thread_p, &log_Gl.hdr.append_lsa, true) == NULL)
        {
          /* We may destroy a record */
          LOG_RESET_APPEND_LSA (end_redo_lsa);
        }
          else
        {
          LOG_RESET_APPEND_LSA (&log_Gl.hdr.append_lsa);

          /*
           * Reset the forward address of current record to next record,
           * and then flush the page.
           */
          LSA_COPY (&log_rec->forw_lsa, &log_Gl.hdr.append_lsa);

          assert (log_lsa.pageid == log_page_p->hdr.logical_pageid);
          logpb_write_page_to_disk (thread_p, log_page_p, log_lsa.pageid);
        }
          er_log_debug (ARG_FILE_LINE,
                "log_recovery_analysis: ** WARNING: An end of the log record was not found."
                " Will Assume = %lld|%d and Next Trid = %d\n",
                (long long int) log_Gl.hdr.append_lsa.pageid, log_Gl.hdr.append_lsa.offset, tran_id);
          log_Gl.hdr.next_trid = tran_id;
        }

      if (num_redo_log_records)
        {
          switch (log_rtype)
        {
          /* count redo log */
        case LOG_REDO_DATA:
        case LOG_UNDOREDO_DATA:
        case LOG_DIFF_UNDOREDO_DATA:
        case LOG_DBEXTERN_REDO_DATA:
        case LOG_MVCC_REDO_DATA:
        case LOG_MVCC_UNDOREDO_DATA:
        case LOG_MVCC_DIFF_UNDOREDO_DATA:
        case LOG_RUN_POSTPONE:
        case LOG_COMPENSATE:
        case LOG_2PC_PREPARE:
        case LOG_2PC_START:
        case LOG_2PC_RECV_ACK:
          (*num_redo_log_records)++;
          break;
        default:
          break;
        }
        }

      log_rv_analysis_record (thread_p, log_rtype, tran_id, &log_lsa, log_page_p, &checkpoint_lsa, &prev_lsa,
                  start_lsa, start_redo_lsa, is_media_crash, stop_at, did_incom_recovery,
                  &may_use_checkpoint, &may_need_synch_checkpoint_2pc);
      if (*did_incom_recovery == true)
        {
          /* The end_redo_lsa needs to be reverted to the prev_lsa value.
           * The log record pointed to by end_redo_lsa is not a target for redo
           */
          LSA_COPY (end_redo_lsa, &prev_lsa);

          LSA_SET_NULL (&lsa);
          break;
        }
      if (LSA_EQ (end_redo_lsa, &lsa))
        {
          assert_release (!LSA_EQ (end_redo_lsa, &lsa));
          LSA_SET_NULL (&lsa);
          break;
        }
      if ((is_log_page_corrupted) && (log_rtype == LOG_END_OF_LOG))
        {
          /* The page is corrupted. Stop if end of log was found in page. In this case,
           * the remaining data in log page is corrupted. If end of log is not found,
           * then we will advance up to NULL LSA. It is important to initialize page with -1.
           * Another option may be to store the previous LSA in header page.
           * Or, to use checksum on log records, but this may slow down the system.
           */
          LSA_SET_NULL (&lsa);
          break;
        }

      LSA_COPY (&prev_lsa, end_redo_lsa);
      prev_prev_lsa = prev_lsa;

      /*
       * We can fix the lsa.pageid in the case of log_records without forward
       * address at this moment.
       */
      if (lsa.offset == NULL_OFFSET && lsa.pageid != NULL_PAGEID && lsa.pageid < log_lsa.pageid)
        {
          lsa.pageid = log_lsa.pageid;
        }
    }
    }

  if (may_need_synch_checkpoint_2pc == true)
    {
      /*
       * We may need to obtain 2PC information of distributed transactions that
       * were in the 2PC at the time of the checkpoint and they were still 2PC
       * at the time of the crash.
       * GET  the checkpoint log record information one more time..
       */
      log_lsa.pageid = checkpoint_lsa.pageid;
      log_lsa.offset = checkpoint_lsa.offset;

      log_page_p = (LOG_PAGE *) aligned_log_pgbuf;

      if (logpb_fetch_page (thread_p, &log_lsa, LOG_CS_FORCE_USE, log_page_p) != NO_ERROR)
    {
      /*
       * There is a problem. We have just read this page a little while ago
       */
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
      return;
    }

      log_rec = LOG_GET_LOG_RECORD_HEADER (log_page_p, &log_lsa);

      /* Read the DATA HEADER */
      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_page_p);
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_CHKPT), &log_lsa, log_page_p);
      tmp_chkpt = (LOG_REC_CHKPT *) ((char *) log_page_p->area + log_lsa.offset);
      chkpt = *tmp_chkpt;

      /* GET THE CHECKPOINT TRANSACTION INFORMATION */
      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_CHKPT), &log_lsa, log_page_p);

      /* Now get the data of active transactions */
      area = NULL;
      size = sizeof (LOG_INFO_CHKPT_TRANS) * chkpt.ntrans;
      if (log_lsa.offset + size < (int) LOGAREA_SIZE)
    {
      chkpt_trans = (LOG_INFO_CHKPT_TRANS *) ((char *) log_page_p->area + log_lsa.offset);
    }
      else
    {
      /* Need to copy the data into a contiguous area */
      area = malloc (size);
      if (area == NULL)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_analysis");
          return;
        }
      /* Copy the data */
      logpb_copy_from_log (thread_p, (char *) area, size, &log_lsa, log_page_p);
      chkpt_trans = (LOG_INFO_CHKPT_TRANS *) area;
    }

      /* Add the transactions to the transaction table */
      for (i = 0; i < chkpt.ntrans; i++)
    {
      tran_index = logtb_find_tran_index (thread_p, chkpt_trans[i].trid);
      if (tran_index != NULL_TRAN_INDEX)
        {
          tdes = LOG_FIND_TDES (tran_index);
          if (tdes != NULL && LOG_ISTRAN_2PC (tdes))
        {
          log_2pc_recovery_analysis_info (thread_p, tdes, &chkpt_trans[i].tail_lsa);
        }
        }
    }
      if (area != NULL)
    {
      free_and_init (area);
    }
    }

  log_Gl.mvcc_table.reset_start_mvccid ();

  if (prm_get_bool_value (PRM_ID_LOGPB_LOGGING_DEBUG))
    {
      _er_log_debug (ARG_FILE_LINE, "log_recovery_analysis: end of analysis phase, append_lsa = (%lld|%d) \n",
             (long long int) log_Gl.hdr.append_lsa.pageid, log_Gl.hdr.append_lsa.offset);
    }

  return;
}

/*
 * log_recovery_needs_skip_logical_redo - Check whether we need to skip logical redo.
 *
 * return: true if skip logical redo, false otherwise
 *
 *   thread_p(in): Thread entry
 *   tran_id(in) : Transaction id.
 *   log_rtype(in): Log record type
 *   rcv_index(in): Recovery index
 *   lsa(in) : lsa to check
 *
 * NOTE: When logical redo logging is applied and the system crashes repeatedly, we need to
 *       skip redo logical record already applied. This function checks whether the logical redo must be skipped.
 */
static bool
log_recovery_needs_skip_logical_redo (THREAD_ENTRY * thread_p, TRANID tran_id, LOG_RECTYPE log_rtype,
                      LOG_RCVINDEX rcv_index, const LOG_LSA * lsa)
{
  int tran_index;
  LOG_TDES *tdes = NULL;    /* Transaction descriptor */

  assert (lsa != NULL);

  if (log_rtype != LOG_DBEXTERN_REDO_DATA)
    {
      return false;
    }

  tran_index = logtb_find_tran_index (thread_p, tran_id);
  if (tran_index == NULL_TRAN_INDEX)
    {
      return false;
    }

  tdes = LOG_FIND_TDES (tran_index);
  if (tdes == NULL)
    {
      return false;
    }

  /* logical redo logging */
  // analysis_last_aborted_sysop_start_lsa < lsa < analysis_last_aborted_sysop_lsa
  if (LSA_LT (&tdes->rcv.analysis_last_aborted_sysop_start_lsa, lsa)
      && LSA_LT (lsa, &tdes->rcv.analysis_last_aborted_sysop_lsa))
    {
      /* Logical redo already applied. */
      er_log_debug (ARG_FILE_LINE, "log_recovery_needs_skip_logical_redo: LSA = %lld|%d, Rv_index = %s, "
            "analysis_last_aborted_sysop_lsa = %lld|%d, analysis_last_aborted_sysop_start_lsa = %lld|%d\n",
            LSA_AS_ARGS (lsa), rv_rcvindex_string (rcv_index),
            LSA_AS_ARGS (&tdes->rcv.analysis_last_aborted_sysop_lsa),
            LSA_AS_ARGS (&tdes->rcv.analysis_last_aborted_sysop_start_lsa));
      return true;
    }

  return false;
}

#if defined (SERVER_MODE)
static int
log_recovery_get_redo_parallel_count ()
{
  // *INDENT-OFF*
  const int num_cpus = cubthread::system_core_count ();
  // *INDENT-ON*

  /* 'minimum_threads_to_redo' was determined experimentally.
   * The redo performance was measured for a set of experiments involving
   * multiple single record insertions (redo log records: 6,971,675).
   * The best performance was observed when the number of redo threads was set to 16
   * under 1, 2, 4, and 8 core conditions. Therefore, the minimum number of threads was set to 16.
   * If a more appropriate value is found through further experiments with diverse sets, this value can be adjusted accordingly
   */
  const int minimum_threads_to_redo = 16;

  return MAX (minimum_threads_to_redo, num_cpus);
}
#endif

/*
 * log_recovery_redo - SCAN FORWARD REDOING DATA
 *
 * return: nothing
 *
 *   start_redolsa(in): Starting address for recovery redo phase
 *   end_redo_lsa(in):
 *
 * NOTE:In the redo phase, updates that are not reflected in the
 *              database are repeated for not only the committed transaction
 *              but also for all aborted transactions and the transactions
 *              that were in progress at the time of the failure. This phase
 *              reestablishes the state of the database as of the time of the
 *              failure. The redo phase starts by scanning the log records
 *              from the redo LSA address determined in the analysis phase.
 *              When a redoable record is found, a check is done to find out
 *              if the redo action is already reflected in the page. If it is
 *              not, then the redo actions are executed. A redo action can be
 *              skipped if the LSA of the affected page is greater or equal
 *              than that of the LSA of the log record. Any postpone actions
 *              (after commit actions) of committed transactions that have not
 *              been executed are done. Loose_ends of client actions that have
 *              not been done are postponed until the client is restarted.
 *              At the end of the recovery phase, all data dirty pages are
 *              flushed.
 *              The redo of aborted transactions are undone executing its
 *              respective compensating log records.
 */
#if defined(SERVER_MODE)
// *INDENT-OFF*
REGISTER_WORKERPOOL (parallel_recovery_redo, []() { return log_recovery_get_redo_parallel_count (); });
// *INDENT-ON*
#endif

static void
log_recovery_redo (THREAD_ENTRY * thread_p, const LOG_LSA * start_redolsa, const LOG_LSA * end_redo_lsa)
{
  LOG_LSA lsa;          /* LSA of log record to redo */

  log_rv_redo_context redo_context (*end_redo_lsa, log_reader::fetch_mode::NORMAL);

  volatile TRANID tran_id;
  volatile LOG_RECTYPE log_rtype;

  TSC_TICKS info_logging_start_time, info_logging_check_time;
  TSCTIMEVAL info_logging_elapsed_time;
  int info_logging_interval_in_secs = 0;

  assert (end_redo_lsa != nullptr && !end_redo_lsa->is_null ());

  // *INDENT-OFF*
  const auto time_start_setting_up = std::chrono::system_clock::now ();
  // *INDENT-ON*

  /* depending on compilation mode and on a system parameter, initialize the
   * infrastructure for parallel log recovery;
   * if infrastructure is not initialized dependent code below works sequentially
   */
  LOG_CS_EXIT (thread_p);

  cublog::reusable_jobs_stack reusable_jobs;
  // *INDENT-OFF*
  std::unique_ptr<cublog::redo_parallel> parallel_recovery_redo;
  // *INDENT-ON*

#if defined(SERVER_MODE)
  const int log_recovery_redo_parallel_count = log_recovery_get_redo_parallel_count ();

  reusable_jobs.initialize (log_recovery_redo_parallel_count);
  // *INDENT-OFF*
  parallel_recovery_redo.reset (new cublog::redo_parallel (log_recovery_redo_parallel_count, false, MAX_LSA, redo_context));
  // *INDENT-ON*
#endif

  // *INDENT-OFF*
  const auto time_start_main = std::chrono::system_clock::now ();
  /* 'setting_up' measures the time taken to initialize parallel log recovery redo infrstructure */
  const auto duration_setting_up_ms
      = std::chrono::duration_cast <std::chrono::milliseconds>(time_start_main - time_start_setting_up);
  // *INDENT-ON*

  // statistics data initialize
  //
  cubperf::statset_definition rcv_redo_perf_stat_definition (cublog::perf_stats_main_definition_init_list);
  cublog::perf_stats rcv_redo_perf_stat (cublog::perf_stats_is_active_for_main (), rcv_redo_perf_stat_definition);

  /*
   * GO FORWARD, redoing records of all transactions including aborted ones.
   *
   * Compensating records undo the redo of already executed undo records.
   * Transactions that were active at the time of the crash are aborted
   * during the log_recovery_undo phase
   */

  LSA_COPY (&lsa, start_redolsa);

  /* Defense for illegal start_redolsa */
  if ((lsa.offset + (int) sizeof (LOG_RECORD_HEADER)) >= LOGAREA_SIZE)
    {
      assert (false);
      /* move first record of next page */
      lsa.pageid++;
      lsa.offset = NULL_OFFSET;
    }

  info_logging_interval_in_secs = prm_get_integer_value (PRM_ID_RECOVERY_PROGRESS_LOGGING_INTERVAL);
  if (info_logging_interval_in_secs > 0 && info_logging_interval_in_secs < 5)
    {
      info_logging_interval_in_secs = 5;
    }
  if (info_logging_interval_in_secs > 0)
    {
      tsc_start_time_usec (&info_logging_start_time);
      tsc_start_time_usec (&info_logging_check_time);
    }

  while (!LSA_ISNULL (&lsa))
    {
      /* Fetch the page where the LSA record to undo is located */
      if (redo_context.m_reader.set_lsa_and_fetch_page (lsa) != NO_ERROR)
    {
      if (lsa > redo_context.m_end_redo_lsa)
        {
          goto exit;
        }
      else
        {
          LSA_SET_NULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa);
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_redo");
          return;
        }
    }

      /* PRM_ID_RECOVERY_PROGRESS_LOGGING_INTERVAL > 0 */
      if (info_logging_interval_in_secs > 0)
    {
      tsc_end_time_usec (&info_logging_elapsed_time, info_logging_check_time);
      if (info_logging_elapsed_time.tv_sec >= info_logging_interval_in_secs)
        {
          UINT64 done_page_cnt = lsa.pageid - start_redolsa->pageid;
          UINT64 total_page_cnt = log_cnt_pages_containing_lsa (start_redolsa, end_redo_lsa);

          double elapsed_time;
          double progress = double (done_page_cnt) / (total_page_cnt);

          tsc_start_time_usec (&info_logging_check_time);
          tsc_end_time_usec (&info_logging_elapsed_time, info_logging_start_time);

          elapsed_time = info_logging_elapsed_time.tv_sec + (info_logging_elapsed_time.tv_usec / 1000000.0);

          er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_PROGRESS, 6, "REDO",
              done_page_cnt, total_page_cnt, progress * 100, elapsed_time,
              done_page_cnt == 0 ? -1.0 : (elapsed_time / done_page_cnt) * (total_page_cnt - done_page_cnt));
        }
    }

      /* Check all log records in this phase */
      while (lsa.pageid == redo_context.m_reader.get_pageid ())
    {
      /*
       * Do we want to stop the recovery redo process at this time ?
       */
      if (LSA_GT (&lsa, end_redo_lsa))
        {
          LSA_SET_NULL (&lsa);
          break;
        }

      /*
       * If an offset is missing, it is because we archive an incomplete
       * log record. This log_record was completed later. Thus, we have to
       * find the offset by searching for the next log_record in the page
       */
      if (lsa.offset == NULL_OFFSET)
        {
          lsa.offset = redo_context.m_reader.get_page_header ().offset;
          if (lsa.offset == NULL_OFFSET)
        {
          /* Continue with next pageid */
          const auto log_lsa_pageid = redo_context.m_reader.get_pageid ();
          if (logpb_is_page_in_archive (log_lsa_pageid))
            {
              lsa.pageid = log_lsa_pageid + 1;
            }
          else
            {
              lsa.pageid = NULL_PAGEID;
            }
          continue;
        }
        }

      LSA_COPY (&log_Gl.unique_stats_table.curr_rcv_rec_lsa, &lsa);

      /* both the page id and the offsset might have changed; page id is changed at the end of the loop */
      redo_context.m_reader.set_lsa_and_fetch_page (lsa);
      rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_FETCH_PAGE);

      {
        /* Pointer to log record */
        // *INDENT-OFF*
        const LOG_RECORD_HEADER *log_rec_header = redo_context.m_reader.reinterpret_cptr<LOG_RECORD_HEADER> ();
        // *INDENT-ON*

        tran_id = log_rec_header->trid;
        log_rtype = log_rec_header->type;

        /* Get the address of next log record to scan */
        LSA_COPY (&lsa, &log_rec_header->forw_lsa);
      }

      /*
       * If the next page is NULL_PAGEID and the current page is an archive
       * page, this is not the end, this situation happens when an incomplete
       * log record is archived. Thus, its forward address is NULL.
       * Note that we have to set lsa.pageid here since the log_lsa.pageid value
       * can be changed (e.g., the log record is stored in an archive page and
       * in an active page. Later, we try to modify it whenever is possible.
       */
      {
        const auto log_lsa_pageid = redo_context.m_reader.get_pageid ();
        const auto log_lsa_offset = redo_context.m_reader.get_offset ();
        if (LSA_ISNULL (&lsa) && logpb_is_page_in_archive (log_lsa_pageid))
          {
        lsa.pageid = log_lsa_pageid + 1;
          }

        if (!LSA_ISNULL (&lsa) && log_lsa_pageid != NULL_PAGEID
        && (lsa.pageid < log_lsa_pageid || (lsa.pageid == log_lsa_pageid && lsa.offset <= log_lsa_offset)))
          {
        /* It seems to be a system error. Maybe a loop in the log */

        LSA_SET_NULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa);

        er_log_debug (ARG_FILE_LINE,
                  "log_recovery_redo: ** System error: It seems to be a loop in the log\n."
                  " Current log_rec at %lld|%d. Next log_rec at %lld|%d\n",
                  (long long int) log_lsa_pageid, log_lsa_offset, (long long int) lsa.pageid, lsa.offset);
        logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_redo");
        LSA_SET_NULL (&lsa);
        break;
          }
      }

      /* Read the log record information from log and the if redo is required then apply it */

      const LOG_LSA rcv_lsa = redo_context.m_reader.get_lsa ();

      /* skip log record header HEADER */
      redo_context.m_reader.add_align (sizeof (LOG_RECORD_HEADER));

// *INDENT-OFF*
#define BUILD_RECORD_INFO(TEMPLATE_TYPE) \
  log_rv_redo_rec_info<TEMPLATE_TYPE> \
    (rcv_lsa, log_rtype, redo_context.m_reader.reinterpret_copy_and_add_align<TEMPLATE_TYPE> ())
#define INVOKE_REDO_RECORD(TEMPLATE_TYPE, record_info) \
  log_rv_redo_record_sync_or_dispatch_async<TEMPLATE_TYPE> \
    (thread_p, redo_context, record_info, parallel_recovery_redo, reusable_jobs, rcv_redo_perf_stat)
// *INDENT-ON*

      switch (log_rtype)
        {
        case LOG_MVCC_UNDOREDO_DATA:
        case LOG_MVCC_DIFF_UNDOREDO_DATA:
          {
        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_MVCC_UNDOREDO));

        /* MVCC op undo/redo log record */
        const auto rec_info_mvcc_undoredo = BUILD_RECORD_INFO (LOG_REC_MVCC_UNDOREDO);

        // *INDENT-OFF*
                const MVCCID mvccid =
                  log_rv_get_log_rec_mvccid<LOG_REC_MVCC_UNDOREDO> (rec_info_mvcc_undoredo.m_logrec);
        // *INDENT-ON*

        /* Check if MVCC next ID must be updated */
        if (!MVCC_ID_PRECEDES (mvccid, log_Gl.hdr.mvcc_next_id))
          {
            log_Gl.hdr.mvcc_next_id = mvccid;
            MVCCID_FORWARD (log_Gl.hdr.mvcc_next_id);
          }

        /* Save last MVCC operation LOG_LSA. */
        LSA_COPY (&log_Gl.hdr.mvcc_op_log_lsa, &rcv_lsa);

        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);
        INVOKE_REDO_RECORD (LOG_REC_MVCC_UNDOREDO, rec_info_mvcc_undoredo);
          }
          break;

        case LOG_UNDOREDO_DATA:
        case LOG_DIFF_UNDOREDO_DATA:
          {
        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_UNDOREDO));

        /* Get undoredo structure */
        const auto rec_info_undoredo = BUILD_RECORD_INFO (LOG_REC_UNDOREDO);

        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);
        INVOKE_REDO_RECORD (LOG_REC_UNDOREDO, rec_info_undoredo);
          }
          break;

        case LOG_MVCC_REDO_DATA:
          {
        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_MVCC_REDO));

        /* MVCC op redo log record */
        const auto rec_info_mvcc_redo = BUILD_RECORD_INFO (LOG_REC_MVCC_REDO);

        // *INDENT-OFF*
        const MVCCID mvccid = log_rv_get_log_rec_mvccid<LOG_REC_MVCC_REDO> (rec_info_mvcc_redo.m_logrec);
        // *INDENT-ON*

        /* Check if MVCC next ID must be updated */
        if (!MVCC_ID_PRECEDES (mvccid, log_Gl.hdr.mvcc_next_id))
          {
            log_Gl.hdr.mvcc_next_id = mvccid;
            MVCCID_FORWARD (log_Gl.hdr.mvcc_next_id);
          }

        /* NOTE: do not update rcv_lsa on the global bookkeeping that is
         * relevant for vacuum as vacuum only processes undo data */

        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);
        INVOKE_REDO_RECORD (LOG_REC_MVCC_REDO, rec_info_mvcc_redo);
          }
          break;

        case LOG_REDO_DATA:
          {
        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_REDO));

        const auto rec_info_redo = BUILD_RECORD_INFO (LOG_REC_REDO);

        if (rec_info_redo.m_logrec.data.rcvindex == RVVAC_COMPLETE)
          {
            /* Reset log header MVCC info */
            logpb_vacuum_reset_log_header_cache (thread_p, &log_Gl.hdr);
          }

        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);
        INVOKE_REDO_RECORD (LOG_REC_REDO, rec_info_redo);
          }
          break;

        case LOG_DBEXTERN_REDO_DATA:
          {
        LOG_RCV rcv;    // = LOG_RCV_INITIALIZER;  /* Recovery structure */

        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_DBOUT_REDO));
        /* A external redo log record */
        // *INDENT-OFF*
        const LOG_REC_DBOUT_REDO *dbout_redo = redo_context.m_reader.reinterpret_cptr<LOG_REC_DBOUT_REDO> ();
        // *INDENT-ON*

        const LOG_RCVINDEX rcvindex = dbout_redo->rcvindex; /* Recovery index function */

        rcv.offset = -1;
        rcv.pgptr = NULL;
        rcv.length = dbout_redo->length;

        /* GET AFTER DATA */
        redo_context.m_reader.add_align (sizeof (LOG_REC_DBOUT_REDO));

#if !defined(NDEBUG)
        if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
          {
            fprintf (stdout, "TRACE EXT REDOING[3]: LSA = %lld|%d, Rv_index = %s\n", LSA_AS_ARGS (&rcv_lsa),
                 rv_rcvindex_string (rcvindex));
            fflush (stdout);
          }
#endif /* !NDEBUG */

        if (!log_recovery_needs_skip_logical_redo (thread_p, tran_id, log_rtype, rcvindex, &rcv_lsa))
          {
            rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);
            log_rv_redo_record (thread_p, redo_context.m_reader, RV_fun[rcvindex].redofun, &rcv,
                    &rcv_lsa, 0, nullptr, redo_context.m_redo_zip);
            /* unzip_ptr used here only as a buffer for the underlying logic, the structure's buffer
             * will be reallocated downstream if needed */
          }
          }
          break;

        case LOG_RUN_POSTPONE:
          {
        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_RUN_POSTPONE));
        /* A run postpone action */
        const auto rec_info_run_posp = BUILD_RECORD_INFO (LOG_REC_RUN_POSTPONE);

        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);
        INVOKE_REDO_RECORD (LOG_REC_RUN_POSTPONE, rec_info_run_posp);
          }
          break;

        case LOG_COMPENSATE:
          {
        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_COMPENSATE));

        const auto rec_info_compensate = BUILD_RECORD_INFO (LOG_REC_COMPENSATE);

        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);
        INVOKE_REDO_RECORD (LOG_REC_COMPENSATE, rec_info_compensate);
          }
          break;

        case LOG_2PC_PREPARE:
          {
        const int tran_index = logtb_find_tran_index (thread_p, tran_id);
        if (tran_index == NULL_TRAN_INDEX)
          {
            break;
          }
        LOG_TDES *const tdes = LOG_FIND_TDES (tran_index);
        if (tdes == NULL)
          {
            break;
          }
        /*
         * The transaction was still alive at the time of crash, therefore,
         * the decision of the 2PC is not known. Reacquire the locks
         * acquired at the time of the crash
         */

        if (tdes->state == TRAN_UNACTIVE_2PC_PREPARE)
          {
            /* The transaction was still prepared_to_commit state at the time of crash. So, read the global
             * transaction identifier and list of locks from the log record, and acquire all of the locks. */

            log_2pc_read_prepare (thread_p, LOG_2PC_OBTAIN_LOCKS, tdes, redo_context.m_reader);
          }
        else
          {
            /* The transaction was not in prepared_to_commit state anymore at the time of crash. So, there is no
             * need to read the list of locks from the log record. Read only the global transaction from the log
             * record. */
            log_2pc_read_prepare (thread_p, LOG_2PC_DONT_OBTAIN_LOCKS, tdes, redo_context.m_reader);
          }
          }
          break;

        case LOG_2PC_START:
          {
        const int tran_index = logtb_find_tran_index (thread_p, tran_id);
        if (tran_index != NULL_TRAN_INDEX)
          {
            /* The transaction was still alive at the time of crash. */
            LOG_TDES *const tdes = LOG_FIND_TDES (tran_index);
            if (tdes != NULL && LOG_ISTRAN_2PC (tdes))
              {
            /* The transaction was still alive at the time of crash. So, copy the coordinator information
             * from the log record to the transaction descriptor. */

            redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_2PC_START));
            /* Start 2PC commit log record */
            // *INDENT-OFF*
            const LOG_REC_2PC_START *start_2pc = redo_context.m_reader.reinterpret_cptr<LOG_REC_2PC_START> ();
            // *INDENT-ON*

            /*
             * Obtain the participant information
             */
            tdes->client.set_system_internal_with_user (start_2pc->user_name);
            tdes->gtrid = start_2pc->gtrid;

            const int num_particps = start_2pc->num_particps;   /* Number of participating sites */
            const int particp_id_length = start_2pc->particp_id_length; /* Length of particp_ids block */
            void *block_particps_ids = malloc (particp_id_length * num_particps);   /* A block of participant ids */
            if (block_particps_ids == NULL)
              {
                /* Out of memory */
                LSA_SET_NULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa);
                logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_redo");
                break;
              }

            redo_context.m_reader.add_align (sizeof (LOG_REC_2PC_START));
            redo_context.m_reader.align ();

            /* Read in the participants info. block from the log */
            redo_context.m_reader.copy_from_log ((char *) block_particps_ids,
                                 particp_id_length * num_particps);

            /* Initialize the coordinator information */
            if (log_2pc_alloc_coord_info (tdes, num_particps, particp_id_length, block_particps_ids) ==
                NULL)
              {
                LSA_SET_NULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa);
                logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_redo");
                break;
              }
#ifdef LOG_2PC_ACK_RECV_REQUIRED
            /* Initialize the acknowledgment vector to 0 since we do not know what acknowledgments have
             * already been received. we need to continue reading the log */

            if ((tdes->coord->ack_received =
                 (bool *) malloc (sizeof (bool) * tdes->coord->num_particps)) == NULL)
              {
                /* Out of memory */
                LSA_SET_NULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa);
                logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_redo");
                break;
              }
            for (int i = 0; i < tdes->coord->num_particps; i++)
              {
                tdes->coord->ack_received[i] = false;
              }
#endif
              }
          }
          }
          break;
#ifdef LOG_2PC_ACK_RECV_REQUIRED
        case LOG_2PC_RECV_ACK:
          {
        const int tran_index = logtb_find_tran_index (thread_p, tran_id);
        if (tran_index != NULL_TRAN_INDEX)
          {
            /* The transaction was still alive at the time of crash. */

            LOG_TDES *const tdes = LOG_FIND_TDES (tran_index);
            if (tdes != NULL && LOG_ISTRAN_2PC (tdes))
              {
            /*
             * The 2PC_START record should have already been read by this
             * time, otherwise, there is an error in the recovery analysis
             * phase.
             */
#if defined(CUBRID_DEBUG)
            if (tdes->coord == NULL || tdes->coord->ack_received == NULL)
              {
                er_log_debug (ARG_FILE_LINE,
                      "log_recovery_redo: SYSTEM ERROR There is likely an error in the recovery"
                      " analysis phase since coordinator information"
                      " has not been allocated for transaction = %d with state = %s", tdes->trid,
                      log_state_string (tdes->state));
                break;
              }
#endif /* CUBRID_DEBUG */

            /* The transaction was still alive at the time of crash. So, read the participant index from the
             * log record and set the acknowledgement flag of that participant. */

            /* Get the DATA HEADER */
            redo_context.m_reader.add_align (sizeof (LOG_RECORD_HEADER));
            redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_2PC_PARTICP_ACK));
            /* A 2PC participant ack */
            // *INDENT-OFF*
            const LOG_REC_2PC_PARTICP_ACK *received_ack = redo_context.m_reader.reinterpret_cptr<LOG_REC_2PC_PARTICP_ACK> ();
            // *INDENT-ON*

            tdes->coord->ack_received[received_ack->particp_index] = true;
              }
          }
          }
          break;
#endif
        case LOG_COMMIT:
        case LOG_ABORT:
          {
        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_READ_LOG);

        /* This is a check to verify whether completed transactions exist in the transaction table.
         * In the analysis phase, completed transactions are cleared from the transaction table.
         * However, system transactions are not cleared, so they are excluded from the assert condition
         */
        assert (tran_id == LOG_SYSTEM_TRANID ||
            (tran_id > LOG_SYSTEM_TRANID && logtb_find_tran_index (thread_p, tran_id) == NULL_TRAN_INDEX));

        rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_COMMIT_ABORT);
          }
          break;

        case LOG_MVCC_UNDO_DATA:
          {
        /* Must detect MVCC operations and recover vacuum data buffer. The found operation is not actually
         * redone/undone, but it has information that can be used for vacuum. */

        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_MVCC_UNDO));
        /* MVCC op undo log record */
        // *INDENT-OFF*
        const LOG_REC_MVCC_UNDO *mvcc_undo = redo_context.m_reader.reinterpret_cptr<LOG_REC_MVCC_UNDO> ();
        // *INDENT-ON*
        const MVCCID mvccid = mvcc_undo->mvccid;

        /* Check if MVCC next ID must be updated */
        if (!MVCC_ID_PRECEDES (mvccid, log_Gl.hdr.mvcc_next_id))
          {
            log_Gl.hdr.mvcc_next_id = mvccid;
            MVCCID_FORWARD (log_Gl.hdr.mvcc_next_id);
          }

        /* Save last MVCC operation LOG_LSA. */
        LSA_COPY (&log_Gl.hdr.mvcc_op_log_lsa, &rcv_lsa);
          }
          break;

        case LOG_UNDO_DATA:
        case LOG_DUMMY_HEAD_POSTPONE:
        case LOG_POSTPONE:
        case LOG_COMMIT_WITH_POSTPONE:
        case LOG_COMMIT_WITH_POSTPONE_OBSOLETE:
        case LOG_SYSOP_START_POSTPONE:
        case LOG_START_CHKPT:
        case LOG_END_CHKPT:
        case LOG_SAVEPOINT:
        case LOG_2PC_COMMIT_DECISION:
        case LOG_2PC_ABORT_DECISION:
        case LOG_2PC_COMMIT_INFORM_PARTICPS:
        case LOG_2PC_ABORT_INFORM_PARTICPS:
        case LOG_DUMMY_CRASH_RECOVERY:
        case LOG_REPLICATION_DATA:
        case LOG_REPLICATION_STATEMENT:
        case LOG_DUMMY_HA_SERVER_STATE:
        case LOG_DUMMY_OVF_RECORD:
        case LOG_DUMMY_GENERIC:
        case LOG_SUPPLEMENTAL_INFO:
        case LOG_END_OF_LOG:
        case LOG_SYSOP_ATOMIC_START:
          break;

        case LOG_SYSOP_END:
          {
        redo_context.m_reader.advance_when_does_not_fit (sizeof (LOG_REC_SYSOP_END));
        /* Result of top system op */
        // *INDENT-OFF*
        const LOG_REC_SYSOP_END *sysop_end = redo_context.m_reader.reinterpret_cptr<LOG_REC_SYSOP_END> ();
        // *INDENT-ON*

        if (sysop_end->type == LOG_SYSOP_END_LOGICAL_MVCC_UNDO)
          {
            LSA_COPY (&log_Gl.hdr.mvcc_op_log_lsa, &rcv_lsa);
          }
          }
          break;

        case LOG_SMALLER_LOGREC_TYPE:
        case LOG_LARGER_LOGREC_TYPE:
        default:
#if defined(CUBRID_DEBUG)
          er_log_debug (ARG_FILE_LINE,
                "log_recovery_redo: Unknown record type = %d (%s)... May be a system error", log_rtype,
                log_to_string (log_rtype));
#endif /* CUBRID_DEBUG */
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_PAGE_CORRUPTED, 1, redo_context.m_reader.get_pageid ());
          if (lsa == redo_context.m_reader.get_lsa ())
        {
          LSA_SET_NULL (&lsa);
        }
          break;
        }

      /*
       * We can fix the lsa.pageid in the case of log_records without forward
       * address at this moment.
       */
      if (lsa.offset == NULL_OFFSET && lsa.pageid != NULL_PAGEID
          && lsa.pageid < redo_context.m_reader.get_pageid ())
        {
          lsa.pageid = redo_context.m_reader.get_pageid ();
        }
    }
    }

#undef BUILD_RECORD_INFO
#undef INVOKE_REDO_RECORD

#if defined(SERVER_MODE)
  {
    // *INDENT-OFF*
    /* 'main' measures the time taken by the main thread to execute sync log records and dispatch async ones */
    const auto duration_main_ms = std::chrono::duration_cast <std::chrono::milliseconds> (
          std::chrono::system_clock::now () - time_start_main);
    if (parallel_recovery_redo != nullptr)
      {
    parallel_recovery_redo->wait_for_termination_and_stop_execution ();
    rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_WAIT_FOR_PARALLEL);
      }
    /* 'async' measures the time taken by the main thread to execute sync log records, dispatch async ones
     * and wait for the async infrastructure to finish */
    const auto duration_async_ms = std::chrono::duration_cast <std::chrono::milliseconds> (
          std::chrono::system_clock::now () - time_start_main);
    _er_log_debug (ARG_FILE_LINE,
           "log_recovery_redo_perf:  parallel_count=%2d  reusable_jobs_count=%6d  flush_back_count=%4d\n"
           "log_recovery_redo_perf:  setting_up=%6lld  main=%6lld  async=%6lld (ms)\n",
           log_recovery_redo_parallel_count,
           cublog::PARALLEL_REDO_REUSABLE_JOBS_COUNT,
           cublog::PARALLEL_REDO_REUSABLE_JOBS_FLUSH_BACK_COUNT,
           (long long) duration_setting_up_ms.count (),
           (long long) duration_main_ms.count (),
           (long long) duration_async_ms.count ());
    // *INDENT-ON*
  }
#endif
  LOG_CS_ENTER (thread_p);

  log_Gl.mvcc_table.reset_start_mvccid ();

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_PHASE_FINISHING_UP, 1, "REDO");

  /* Abort all atomic system operations that were open when server crashed */
  log_recovery_abort_all_atomic_sysops (thread_p);

  /* Now finish all postpone operations */
  log_recovery_finish_all_postpone (thread_p);

  /* Flush all dirty pages */
  logpb_flush_pages_direct (thread_p);

  logpb_flush_header (thread_p);
  (void) pgbuf_flush_all (thread_p, NULL_VOLID);

exit:
  LSA_SET_NULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa);

#if !defined(NDEBUG)
  log_Gl_recovery_redo_consistency_check.cleanup ();
#endif

  rcv_redo_perf_stat.time_and_increment (cublog::PERF_STAT_ID_FINALIZE);

  // statistics data collect & report
  //
  rcv_redo_perf_stat.log ("Log recovery redo main thread perf stats");
#if defined(SERVER_MODE)
  if (parallel_recovery_redo != nullptr)
    {
      parallel_recovery_redo->log_perf_stats ();
    }
#endif

  return;
}

/*
 * log_recovery_abort_interrupted_sysop () - find and abort interruped system operation during postpones.
 *
 * return                  : void
 * thread_p (in)           : thread entry
 * tdes (in)               : transaction descriptor
 * postpone_start_lsa (in) : LSA of start postpone (system op or transaction)
 */
static void
log_recovery_abort_interrupted_sysop (THREAD_ENTRY * thread_p, LOG_TDES * tdes, const LOG_LSA * postpone_start_lsa)
{
  LOG_LSA iter_lsa, prev_lsa;
  LOG_RECORD_HEADER logrec_head;
  LOG_PAGE *log_page = NULL;
  char buffer_log_page[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
  LOG_LSA last_parent_lsa = LSA_INITIALIZER;

  assert (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE
      || tdes->state == TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE);
  assert (!LSA_ISNULL (postpone_start_lsa));

  if (LSA_ISNULL (&tdes->undo_nxlsa) || LSA_LE (&tdes->undo_nxlsa, postpone_start_lsa))
    {
      /* nothing to abort */
      return;
    }

  log_page = (LOG_PAGE *) PTR_ALIGN (buffer_log_page, MAX_ALIGNMENT);

  /* how it works:
   * we can have so-called logical run postpone system operation for some complex cases (e.g. file destroy or
   * deallocate). if these operations are interrupted by crash, we must abort them first before finishing the postpone
   * phase of system operation or transaction.
   *
   * normally, all records during the postpone execution are run postpones - physical or logical system operations.
   * so to rollback a logical system op during postpone we have to stop at previous run postpone (or at the start of
   * postpone if this system op is first). we need to manually search it.
   */

  for (iter_lsa = tdes->undo_nxlsa; LSA_GT (&iter_lsa, postpone_start_lsa); iter_lsa = prev_lsa)
    {
      if (logpb_fetch_page (thread_p, &iter_lsa, LOG_CS_FORCE_USE, log_page) != NO_ERROR)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_abort_interrupted_sysop");
      return;
    }
      logrec_head = *(LOG_RECORD_HEADER *) (log_page->area + iter_lsa.offset);
      assert (logrec_head.trid == tdes->trid);

      if (logrec_head.type == LOG_RUN_POSTPONE)
    {
      /* found run postpone, stop */
      last_parent_lsa = iter_lsa;
      break;
    }
      else if (logrec_head.type == LOG_SYSOP_END)
    {
      /* we need to see why type of system op. */
      LOG_LSA read_lsa = iter_lsa;
      LOG_REC_SYSOP_END *sysop_end;

      /* skip header */
      LOG_READ_ADD_ALIGN (thread_p, sizeof (logrec_head), &read_lsa, log_page);
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*sysop_end), &read_lsa, log_page);

      sysop_end = (LOG_REC_SYSOP_END *) (log_page->area + read_lsa.offset);
      if (sysop_end->type == LOG_SYSOP_END_LOGICAL_RUN_POSTPONE)
        {
          /* found run postpone, stop */
          last_parent_lsa = iter_lsa;
          break;
        }
      else
        {
          /* go to last parent */
          prev_lsa = sysop_end->lastparent_lsa;
        }
    }
      else
    {
      /* safe-guard: we do not expect postpone starts */
      assert (logrec_head.type != LOG_COMMIT_WITH_POSTPONE && logrec_head.type != LOG_COMMIT_WITH_POSTPONE_OBSOLETE
          && logrec_head.type != LOG_SYSOP_START_POSTPONE);

      /* move to previous */
      prev_lsa = logrec_head.prev_tranlsa;
    }
      assert (!LSA_ISNULL (&prev_lsa) && !LSA_EQ (&prev_lsa, &iter_lsa));
    }

  if (LSA_ISNULL (&last_parent_lsa))
    {
      /* no run postpones before system op. stop at start postpone. */
      assert (LSA_EQ (&iter_lsa, postpone_start_lsa));
      last_parent_lsa = *postpone_start_lsa;
    }

  /* simulate system op */
  log_sysop_start (thread_p);
  /* hack last parent lsa */
  tdes->topops.stack[tdes->topops.last].lastparent_lsa = last_parent_lsa;
  /* rollback */
  log_sysop_abort (thread_p);
}

/*
 * log_recovery_finish_sysop_postpone () - Finish postpone during recovery for one system operation.
 *
 * return        : void
 * thread_p (in) : thread entry
 * tdes (in)     : transaction descriptor
 */
static void
log_recovery_finish_sysop_postpone (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
  LOG_LSA first_postpone_to_apply;

  if (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE)
    {
      /* We need to read the log record for system op start postpone */
      LOG_LSA sysop_start_postpone_lsa = tdes->rcv.sysop_start_postpone_lsa;
      char log_page_buffer[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
      LOG_PAGE *log_page = (LOG_PAGE *) PTR_ALIGN (log_page_buffer, MAX_ALIGNMENT);
      LOG_REC_SYSOP_START_POSTPONE sysop_start_postpone;
      int undo_buffer_size = 0, undo_data_size = 0;
      char *undo_buffer = NULL, *undo_data = NULL;

      assert (tdes->topops.last == 0);
      assert (!LSA_ISNULL (&sysop_start_postpone_lsa));
      LSA_SET_NULL (&first_postpone_to_apply);

      /* first verify it didn't crash in the middle of a run postpone system op */
      log_recovery_abort_interrupted_sysop (thread_p, tdes, &sysop_start_postpone_lsa);
      assert (tdes->topops.last == 0);

      /* find first postpone */
      log_recovery_find_first_postpone (thread_p, &first_postpone_to_apply,
                    &tdes->topops.stack[tdes->topops.last].posp_lsa, tdes);

      if (!LSA_ISNULL (&first_postpone_to_apply))
    {
      log_do_postpone (thread_p, tdes, &first_postpone_to_apply);
    }
      LSA_SET_NULL (&tdes->topops.stack[tdes->topops.last].posp_lsa);

      /* simulate system op commit */
      /* we need the data from system op start postpone */
      log_page->hdr.logical_pageid = NULL_PAGEID;
      if (log_read_sysop_start_postpone (thread_p, &sysop_start_postpone_lsa, log_page, true, &sysop_start_postpone,
                     &undo_buffer_size, &undo_buffer, &undo_data_size, &undo_data) != NO_ERROR)
    {
      /* give up */
      assert_release (false);
      return;
    }
      /* check this is not a system op postpone during system op postpone */
      assert (sysop_start_postpone.sysop_end.type != LOG_SYSOP_END_LOGICAL_RUN_POSTPONE
          || !sysop_start_postpone.sysop_end.run_postpone.is_sysop_postpone);

      log_sysop_end_recovery_postpone (thread_p, &sysop_start_postpone.sysop_end, undo_data_size, undo_data);
      if (undo_buffer != NULL)
    {
      /* no longer needed */
      db_private_free (thread_p, undo_buffer);
    }

      assert (sysop_start_postpone.sysop_end.type != LOG_SYSOP_END_ABORT);
      if (sysop_start_postpone.sysop_end.type == LOG_SYSOP_END_LOGICAL_RUN_POSTPONE)
    {
      if (sysop_start_postpone.sysop_end.run_postpone.is_sysop_postpone)
        {
          /* this is a system op postpone during system op postpone? should not happen! */
          assert (false);
          tdes->state = TRAN_UNACTIVE_UNILATERALLY_ABORTED;
          tdes->undo_nxlsa = tdes->tail_lsa;
        }
      else
        {
          /* logical run postpone during transaction postpone. */
          tdes->state = TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE;
          LSA_SET_NULL (&tdes->undo_nxlsa);

          /* we just finished the run postpone. update tdes->posp_nxlsa */
          tdes->posp_nxlsa = sysop_start_postpone.sysop_end.run_postpone.postpone_lsa;
        }
    }
      else if (!LSA_ISNULL (&tdes->rcv.tran_start_postpone_lsa))
    {
      /* this must be after start postpone */
      assert (LSA_LE (&tdes->rcv.tran_start_postpone_lsa, &sysop_start_postpone.sysop_end.lastparent_lsa));
      tdes->state = TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE;

      /* note: this is for nested system operations inside a transaction logical run postpone. this is not really,
       * fully correct, covering all cases. however it should work for file_tracker_unregister case.
       *
       * however, to have a robust recovery in the future, no matter how complicated the nesting of operations,
       * we should considering saving previous states.
       */
    }
      else
    {
      tdes->state = TRAN_UNACTIVE_UNILATERALLY_ABORTED;
      tdes->undo_nxlsa = tdes->tail_lsa;
    }

      if (tdes->topops.last >= 0)
    {
      assert_release (false);
      tdes->topops.last = -1;
    }
    }
  assert (tdes->state != TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE);
}

/*
 * log_recovery_finish_postpone () - Finish postpone during recovery for one
 *                   transaction descriptor.
 *
 * return    : Void.
 * thread_p (in) : Thread entry.
 * tdes (in)     : Transaction descriptor.
 */
static void
log_recovery_finish_postpone (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
  LOG_LSA first_postpone_to_apply;

  if (tdes == NULL || tdes->trid == NULL_TRANID)
    {
      /* Nothing to do */
      return;
    }

  /* first finish system op postpone (if the case). */
  log_recovery_finish_sysop_postpone (thread_p, tdes);

  if (tdes->state == TRAN_UNACTIVE_WILL_COMMIT || tdes->state == TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE)
    {
      LSA_SET_NULL (&first_postpone_to_apply);

      assert (tdes->is_active_worker_transaction ());
      assert (!LSA_ISNULL (&tdes->rcv.tran_start_postpone_lsa));

      /*
       * The transaction was the one that was committing
       */
      if (tdes->state == TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE)
    {
      /* make sure to abort interrupted logical postpone. */
      assert (tdes->topops.last == -1);
      log_recovery_abort_interrupted_sysop (thread_p, tdes, &tdes->rcv.tran_start_postpone_lsa);
      assert (tdes->topops.last == -1);
      /* no more undo */
      LSA_SET_NULL (&tdes->undo_nxlsa);
    }

      log_recovery_find_first_postpone (thread_p, &first_postpone_to_apply, &tdes->posp_nxlsa, tdes);
      if (!LSA_ISNULL (&first_postpone_to_apply))
    {
      log_do_postpone (thread_p, tdes, &first_postpone_to_apply);
    }

      if (tdes->coord == NULL)
    {           /* If this is a local transaction */
      (void) log_complete (thread_p, tdes, LOG_COMMIT, LOG_DONT_NEED_NEWTRID, LOG_NEED_TO_WRITE_EOT_LOG);
      logtb_free_tran_index (thread_p, tdes->tran_index);
    }
    }
  else if (tdes->state == TRAN_UNACTIVE_COMMITTED)
    {
      if (tdes->coord == NULL)
    {
      (void) log_complete (thread_p, tdes, LOG_COMMIT, LOG_DONT_NEED_NEWTRID, LOG_NEED_TO_WRITE_EOT_LOG);
      logtb_free_tran_index (thread_p, tdes->tran_index);
    }
    }
}

/*
 * log_recovery_finish_all_postpone - FINISH COMMITTING TRANSACTIONS WITH
 *                                   UNFINISH POSTPONE ACTIONS
 *
 * return: nothing
 *
 * NOTE:Finish the committing of transactions which have been declared
 *              as committed, but not all their postpone actions are done.
 *              This happens when there is a crash in the middle of a
 *              log_commit_with_postpone and log_commit.
 *              This function should be called after the log_recovery_redo
 *              function.
 */
static void
log_recovery_finish_all_postpone (THREAD_ENTRY * thread_p)
{
  // *INDENT-OFF*
  int i;
  LOG_TDES *tdes_it = NULL; /* Transaction descriptor */

  /* Finish committing transactions with unfinished postpone actions */
  thread_p = thread_p != NULL ? thread_p : thread_get_thread_entry_info ();
  assert (thread_p->tran_index == LOG_SYSTEM_TRAN_INDEX);

  auto finish_sys_postpone = [&] (LOG_TDES & tdes)
    {
      log_rv_simulate_runtime_worker (thread_p, &tdes);
      log_recovery_finish_postpone (thread_p, &tdes);
      log_rv_end_simulation (thread_p);
    };

  for (i = 1; i < log_Gl.trantable.num_total_indices; i++)
    {
      tdes_it = LOG_FIND_TDES (i);
      if (tdes_it == NULL || tdes_it->trid == NULL_TRANID)
        {
          continue;
        }
      finish_sys_postpone (*tdes_it);
    }
  log_system_tdes::map_all_tdes (finish_sys_postpone);
  // *INDENT-ON*
}

/*
 * log_recovery_abort_all_atomic_sysops () - abort all atomic system operation opened at the moment of the crash
 *
 * return        : void
 * thread_p (in) : thread entry
 */
static void
log_recovery_abort_all_atomic_sysops (THREAD_ENTRY * thread_p)
{
  // *INDENT-OFF*
  int i;
  LOG_TDES *tdes_it = NULL; /* Transaction descriptor */

  /* Finish committing transactions with unfinished postpone actions */
  thread_p = thread_p != NULL ? thread_p : thread_get_thread_entry_info ();
  assert (thread_p->tran_index == LOG_SYSTEM_TRAN_INDEX);
  auto abort_atomic_func = [&] (LOG_TDES & tdes)
    {
      log_rv_simulate_runtime_worker (thread_p, &tdes);
      log_recovery_abort_atomic_sysop (thread_p, &tdes);
      log_rv_end_simulation (thread_p);
    };
  for (i = 1; i < log_Gl.trantable.num_total_indices; i++)
    {
      tdes_it = LOG_FIND_TDES (i);
      if (tdes_it == NULL || tdes_it->trid == NULL_TRANID)
        {
          continue;
        }
      abort_atomic_func (*tdes_it);
    }
  log_system_tdes::map_all_tdes (abort_atomic_func);
  // *INDENT-ON*
}

/*
 * log_recovery_abort_atomic_sysop () - abort all changes down to tdes->rcv.atomic_sysop_start_lsa (where atomic system
 *                                      operation starts).
 *
 * return        : void
 * thread_p (in) : thread entry
 * tdes (in)     : transaction descriptor
 */
static void
log_recovery_abort_atomic_sysop (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
  char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT], *aligned_log_pgbuf;
  LOG_PAGE *log_pgptr = NULL;
  LOG_RECORD_HEADER *log_rec;
  LOG_LSA prev_atomic_sysop_start_lsa;

  if (tdes == NULL || tdes->trid == NULL_TRANID)
    {
      /* Nothing to do */
      return;
    }

  if (LSA_ISNULL (&tdes->rcv.atomic_sysop_start_lsa))
    {
      /* no atomic system operation */
      return;
    }
  if (LSA_GE (&tdes->rcv.atomic_sysop_start_lsa, &tdes->undo_nxlsa))
    {
      /* nothing after tdes->rcv.atomic_sysop_start_lsa */
      assert (LSA_EQ (&tdes->rcv.atomic_sysop_start_lsa, &tdes->undo_nxlsa));
      LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
      er_log_debug (ARG_FILE_LINE, "(trid = %d) Nothing after atomic sysop (%lld|%d), nothing to rollback.\n",
            tdes->trid, LSA_AS_ARGS (&tdes->rcv.atomic_sysop_start_lsa));
      return;
    }
  assert (tdes->topops.last <= 0);

  if (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE
      && LSA_GT (&tdes->rcv.sysop_start_postpone_lsa, &tdes->rcv.atomic_sysop_start_lsa))
    {
      /* we have (maybe) the next case:
       *
       * 1. atomic operation was started.
       * 2. nested system operation is started.
       * 3. nested operation has postpones.
       * 4. nested operation is committed with postpone.
       * 5. crash
       *
       * I am not sure this really happens. It might, and it might be risky to allow it. However, I assume this nested
       * operation will not do other complicated operations to mess with other logical operations. I hope at least. */
      /* finish postpone of nested system op. */
      er_log_debug (ARG_FILE_LINE,
            "(trid = %d) Nested sysop start pospone (%lld|%d) inside atomic sysop (%lld|%d). \n", tdes->trid,
            LSA_AS_ARGS (&tdes->rcv.sysop_start_postpone_lsa), LSA_AS_ARGS (&tdes->rcv.atomic_sysop_start_lsa));
      log_recovery_finish_sysop_postpone (thread_p, tdes);
    }
  else if (tdes->state == TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE)
    {
      /* this would be also a nested case, but the other way around:
       *
       * 1. system op starts.
       * 2. system op end with postpone.
       * 3. nested atomic system op starts.
       * 4. crash.
       *
       * we first have to abort atomic system op. postpone will be finished later. */
      er_log_debug (ARG_FILE_LINE,
            "(trid = %d) Nested atomic sysop  (%lld|%d) after sysop start postpone (%lld|%d). \n", tdes->trid,
            LSA_AS_ARGS (&tdes->rcv.sysop_start_postpone_lsa), LSA_AS_ARGS (&tdes->rcv.atomic_sysop_start_lsa));
    }
  else
    {
      er_log_debug (ARG_FILE_LINE, "(trid = %d) Atomic sysop (%lld|%d). Rollback. \n", tdes->trid,
            LSA_AS_ARGS (&tdes->rcv.atomic_sysop_start_lsa));
    }

  /* Get transaction lsa that precede atomic_sysop_start_lsa. */
  aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
  log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;
  if (logpb_fetch_page (thread_p, &tdes->rcv.atomic_sysop_start_lsa, LOG_CS_FORCE_USE, log_pgptr) != NO_ERROR)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_abort_atomic_sysop");
      return;
    }
  log_rec = LOG_GET_LOG_RECORD_HEADER (log_pgptr, &tdes->rcv.atomic_sysop_start_lsa);
  LSA_COPY (&prev_atomic_sysop_start_lsa, &log_rec->prev_tranlsa);

  /* rollback. simulate a new system op */
  log_sysop_start (thread_p);

  /* hack last parent to stop at transaction lsa that precede atomic_sysop_start_lsa. */
  LSA_COPY (&tdes->topops.stack[tdes->topops.last].lastparent_lsa, &prev_atomic_sysop_start_lsa);

  /* rollback */
  log_sysop_abort (thread_p);

  assert (tdes->topops.last <= 0);

  /* this is it. reset tdes->rcv.atomic_sysop_start_lsa and we're done. */
  LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
}

/*
 * log_recovery_undo - SCAN BACKWARDS UNDOING DATA
 *
 * return: nothing
 *
 */
static void
log_recovery_undo (THREAD_ENTRY * thread_p)
{
  LOG_LSA max_undo_lsa = NULL_LSA;  /* LSA of log record to undo */
  char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT], *aligned_log_pgbuf;
  LOG_PAGE *log_pgptr = NULL;   /* Log page pointer where LSA is located */
  LOG_LSA log_lsa;
  LOG_RECORD_HEADER *log_rec = NULL;    /* Pointer to log record */
  LOG_REC_UNDOREDO *undoredo = NULL;    /* Undo_redo log record */
  LOG_REC_UNDO *undo = NULL;    /* Undo log record */
  LOG_REC_MVCC_UNDOREDO *mvcc_undoredo = NULL;  /* MVCC op Undo_redo log record */
  LOG_REC_MVCC_UNDO *mvcc_undo = NULL;  /* MVCC op Undo log record */
  LOG_REC_COMPENSATE *compensate;   /* Compensating log record */
  LOG_REC_SYSOP_END *sysop_end; /* Result of top system op */
  LOG_RCVINDEX rcvindex;    /* Recovery index function */
  LOG_RCV rcv;          /* Recovery structure */
  VPID rcv_vpid;        /* VPID of data to recover */
  LOG_LSA rcv_lsa;      /* Address of redo log record */
  LOG_LSA prev_tranlsa;     /* prev LSA of transaction */
  LOG_TDES *tdes;       /* Transaction descriptor */
  int tran_index;
  int data_header_size = 0;
  LOG_ZIP *undo_unzip_ptr = NULL;
  int cnt_trans_to_undo = 0;
  LOG_LSA min_lsa = NULL_LSA;
  LOG_LSA max_lsa = NULL_LSA;
  bool is_mvcc_op;
  volatile TRANID tran_id;
  volatile LOG_RECTYPE log_rtype;
  TSC_TICKS info_logging_start_time, info_logging_check_time;
  TSCTIMEVAL info_logging_elapsed_time;
  int info_logging_interval_in_secs = 0;
  UINT64 total_page_cnt = 0, read_page_cnt = 0;

  aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);

  /*
   * Remove from the list of transaction to abort, those that have finished
   * when the crash happens, so it does not remain dangling in the transaction
   * table.
   */

  for (tran_index = 1; tran_index < log_Gl.trantable.num_total_indices; tran_index++)
    {
      if ((tdes = LOG_FIND_TDES (tran_index)) != NULL && tdes->trid != NULL_TRANID
      && (tdes->state == TRAN_UNACTIVE_UNILATERALLY_ABORTED || tdes->state == TRAN_UNACTIVE_ABORTED)
      && LSA_ISNULL (&tdes->undo_nxlsa))
    {
      (void) log_complete (thread_p, tdes, LOG_ABORT, LOG_DONT_NEED_NEWTRID, LOG_NEED_TO_WRITE_EOT_LOG);
      logtb_free_tran_index (thread_p, tran_index);
    }
    }
  // *INDENT-OFF*
  auto delete_func = [] (const LOG_TDES & tdes)
    {
      return LSA_ISNULL (&tdes.undo_nxlsa);
    };
  log_system_tdes::rv_delete_all_tdes_if (delete_func);
  // *INDENT-ON*

  /*
   * GO BACKWARDS, undoing records
   */

  // *INDENT-OFF*
  auto max_undo_lsa_func = [&max_undo_lsa] (const log_tdes & tdes)
    {
      if (LSA_LT (&max_undo_lsa, &tdes.undo_nxlsa))
        {
          max_undo_lsa = tdes.undo_nxlsa;
        }
    };
  // *INDENT-ON*

  /* Find the largest LSA to undo */
  logtb_rv_read_only_map_undo_tdes (thread_p, max_undo_lsa_func);
  max_lsa = max_undo_lsa;

  /* Print undo recovery information */
  // *INDENT-OFF*
  logtb_rv_read_only_map_undo_tdes (thread_p, [&cnt_trans_to_undo] (const LOG_TDES & tdes)
    {
      cnt_trans_to_undo++;
    });

  logtb_rv_read_only_map_undo_tdes (thread_p, [&min_lsa] (const LOG_TDES & tdes) {
    assert (!tdes.head_lsa.is_null());
    if (min_lsa.is_null () || LSA_LT (&tdes.head_lsa, &min_lsa))
      {
        min_lsa = tdes.head_lsa;
      }
  });
  // *INDENT-ON*

  total_page_cnt = max_lsa.is_null ()? 0 : log_cnt_pages_containing_lsa (&min_lsa, &max_lsa);

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_UNDO_STARTED, 2, total_page_cnt, cnt_trans_to_undo);

  log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;

  undo_unzip_ptr = log_zip_alloc (LOGAREA_SIZE);
  if (undo_unzip_ptr == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_undo");
      return;
    }

  info_logging_interval_in_secs = prm_get_integer_value (PRM_ID_RECOVERY_PROGRESS_LOGGING_INTERVAL);
  if (info_logging_interval_in_secs > 0 && info_logging_interval_in_secs < 5)
    {
      info_logging_interval_in_secs = 5;
    }
  if (info_logging_interval_in_secs > 0)
    {
      tsc_start_time_usec (&info_logging_start_time);
      tsc_start_time_usec (&info_logging_check_time);
    }

  LOG_CS_EXIT (thread_p);

  while (!LSA_ISNULL (&max_undo_lsa))
    {
      /* Fetch the page where the LSA record to undo is located */
      LSA_COPY (&log_lsa, &max_undo_lsa);
      if (logpb_fetch_page (thread_p, &log_lsa, LOG_CS_FORCE_USE, log_pgptr) != NO_ERROR)
    {
      log_zip_free (undo_unzip_ptr);

      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_undo");
      return;
    }

      /* PRM_ID_RECOVERY_PROGRESS_LOGGING_INTERVAL != 0 */
      if (info_logging_interval_in_secs > 0)
    {
      tsc_end_time_usec (&info_logging_elapsed_time, info_logging_check_time);
      if (info_logging_elapsed_time.tv_sec >= info_logging_interval_in_secs)
        {
          UINT64 done_page_cnt = max_lsa.pageid - log_lsa.pageid;
          double elapsed_time;
          double progress = double (done_page_cnt) / (total_page_cnt);

          tsc_start_time_usec (&info_logging_check_time);
          tsc_end_time_usec (&info_logging_elapsed_time, info_logging_start_time);

          elapsed_time = info_logging_elapsed_time.tv_sec + (info_logging_elapsed_time.tv_usec / 1000000.0);

          er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_PROGRESS, 6, "UNDO",
              done_page_cnt, total_page_cnt, progress * 100, elapsed_time,
              read_page_cnt == 0 ? -1.0 : (elapsed_time / read_page_cnt) * (total_page_cnt - done_page_cnt));
        }

      read_page_cnt++;
    }


      /* Check all log records in this phase */
      while (max_undo_lsa.pageid == log_lsa.pageid)
    {
      /* Find the log record */
      log_lsa.offset = max_undo_lsa.offset;
      log_rec = LOG_GET_LOG_RECORD_HEADER (log_pgptr, &log_lsa);

      tran_id = log_rec->trid;
      log_rtype = log_rec->type;

      LSA_COPY (&prev_tranlsa, &log_rec->prev_tranlsa);

      if (logtb_is_system_worker_tranid (tran_id))
        {
          // *INDENT-OFF*
          tdes = log_system_tdes::rv_get_tdes (tran_id);
          if (tdes == NULL)
        {
          assert (false);
        }
          // *INDENT-ON*
        }
      else
        {
          /* Active worker transaction */
          tran_index = logtb_find_tran_index (thread_p, tran_id);
          if (tran_index == NULL_TRAN_INDEX)
        {
#if defined(CUBRID_DEBUG)
          er_log_debug (ARG_FILE_LINE, "log_recovery_undo: SYSTEM ERROR for log located at %lld|%d\n",
                (long long int) log_lsa.pageid, log_lsa.offset);
#endif /* CUBRID_DEBUG */
          logtb_free_tran_index_with_undo_lsa (thread_p, &max_undo_lsa);
        }
          else
        {
          tdes = LOG_FIND_TDES (tran_index);
          if (tdes == NULL)
            {
              /* This looks like a system error in the analysis phase */
#if defined(CUBRID_DEBUG)
              er_log_debug (ARG_FILE_LINE, "log_recovery_undo: SYSTEM ERROR for log located at %lld|%d\n",
                    (long long int) log_lsa.pageid, log_lsa.offset);
#endif /* CUBRID_DEBUG */
              logtb_free_tran_index_with_undo_lsa (thread_p, &max_undo_lsa);
            }
        }
        }

      if (tran_index != NULL_TRAN_INDEX && tdes != NULL)
        {
          LSA_COPY (&tdes->undo_nxlsa, &prev_tranlsa);

          switch (log_rtype)
        {
        case LOG_MVCC_UNDOREDO_DATA:
        case LOG_MVCC_DIFF_UNDOREDO_DATA:
        case LOG_UNDOREDO_DATA:
        case LOG_DIFF_UNDOREDO_DATA:
          LSA_COPY (&rcv_lsa, &log_lsa);
          /*
           * The transaction was active at the time of the crash. The
           * transaction is unilaterally aborted by the system
           */

          if (log_rtype == LOG_MVCC_UNDOREDO_DATA || log_rtype == LOG_MVCC_DIFF_UNDOREDO_DATA)
            {
              is_mvcc_op = true;
            }
          else
            {
              is_mvcc_op = false;
            }

          /* Get the DATA HEADER */
          LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);

          if (is_mvcc_op)
            {
              data_header_size = sizeof (LOG_REC_MVCC_UNDOREDO);
              LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, data_header_size, &log_lsa, log_pgptr);
              mvcc_undoredo = (LOG_REC_MVCC_UNDOREDO *) ((char *) log_pgptr->area + log_lsa.offset);

              /* Get undoredo info */
              undoredo = &mvcc_undoredo->undoredo;

              /* Save transaction MVCCID to recovery */
              rcv.mvcc_id = mvcc_undoredo->mvccid;
            }
          else
            {
              data_header_size = sizeof (LOG_REC_UNDOREDO);
              LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, data_header_size, &log_lsa, log_pgptr);
              undoredo = (LOG_REC_UNDOREDO *) ((char *) log_pgptr->area + log_lsa.offset);

              rcv.mvcc_id = MVCCID_NULL;
            }

          rcvindex = undoredo->data.rcvindex;
          rcv.length = undoredo->ulength;
          rcv.offset = undoredo->data.offset;
          rcv_vpid.volid = undoredo->data.volid;
          rcv_vpid.pageid = undoredo->data.pageid;

          LOG_READ_ADD_ALIGN (thread_p, data_header_size, &log_lsa, log_pgptr);

#if !defined(NDEBUG)
          if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
            {
              fprintf (stdout,
                   "TRACE UNDOING[1]: LSA = %lld|%d, Rv_index = %s,\n"
                   "      volid = %d, pageid = %d, offset = %d,\n", (long long int) rcv_lsa.pageid,
                   (int) rcv_lsa.offset, rv_rcvindex_string (rcvindex), rcv_vpid.volid, rcv_vpid.pageid,
                   rcv.offset);
              fflush (stdout);
            }
#endif /* !NDEBUG */

          log_rv_undo_record (thread_p, &log_lsa, log_pgptr, rcvindex, &rcv_vpid, &rcv, &rcv_lsa, tdes,
                      undo_unzip_ptr);
          break;

        case LOG_MVCC_UNDO_DATA:
        case LOG_UNDO_DATA:
          /* Does the record belong to a MVCC op? */
          is_mvcc_op = log_rtype == LOG_MVCC_UNDO_DATA;

          LSA_COPY (&rcv_lsa, &log_lsa);
          /*
           * The transaction was active at the time of the crash. The
           * transaction is unilaterally aborted by the system
           */

          /* Get the DATA HEADER */
          LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);

          if (is_mvcc_op)
            {
              data_header_size = sizeof (LOG_REC_MVCC_UNDO);
              LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, data_header_size, &log_lsa, log_pgptr);
              mvcc_undo = (LOG_REC_MVCC_UNDO *) ((char *) log_pgptr->area + log_lsa.offset);

              /* Get undo info */
              undo = &mvcc_undo->undo;

              /* Save transaction MVCCID to recovery */
              rcv.mvcc_id = mvcc_undo->mvccid;
            }
          else
            {
              data_header_size = sizeof (LOG_REC_UNDO);
              LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, data_header_size, &log_lsa, log_pgptr);
              undo = (LOG_REC_UNDO *) ((char *) log_pgptr->area + log_lsa.offset);

              rcv.mvcc_id = MVCCID_NULL;
            }

          rcvindex = undo->data.rcvindex;
          rcv.length = undo->length;
          rcv.offset = undo->data.offset;
          rcv_vpid.volid = undo->data.volid;
          rcv_vpid.pageid = undo->data.pageid;

          LOG_READ_ADD_ALIGN (thread_p, data_header_size, &log_lsa, log_pgptr);

#if !defined(NDEBUG)
          if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
            {
              fprintf (stdout,
                   "TRACE UNDOING[2]: LSA = %lld|%d, Rv_index = %s,\n"
                   "      volid = %d, pageid = %d, offset = %hd,\n", LSA_AS_ARGS (&rcv_lsa),
                   rv_rcvindex_string (rcvindex), rcv_vpid.volid, rcv_vpid.pageid, rcv.offset);
              fflush (stdout);
            }
#endif /* !NDEBUG */
          log_rv_undo_record (thread_p, &log_lsa, log_pgptr, rcvindex, &rcv_vpid, &rcv, &rcv_lsa, tdes,
                      undo_unzip_ptr);
          break;

        case LOG_REDO_DATA:
        case LOG_MVCC_REDO_DATA:
        case LOG_DBEXTERN_REDO_DATA:
        case LOG_DUMMY_HEAD_POSTPONE:
        case LOG_POSTPONE:
        case LOG_SAVEPOINT:
        case LOG_REPLICATION_DATA:
        case LOG_REPLICATION_STATEMENT:
        case LOG_DUMMY_HA_SERVER_STATE:
        case LOG_DUMMY_OVF_RECORD:
        case LOG_DUMMY_GENERIC:
        case LOG_SUPPLEMENTAL_INFO:
        case LOG_SYSOP_ATOMIC_START:
          /* Not for UNDO ... */
          /* Break switch to go to previous record */
          break;

        case LOG_COMPENSATE:
          /* Only for REDO .. Go to next undo record Need to read the compensating record to set the next undo
           * address. */

          /* Get the DATA HEADER */
          LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);
          LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_COMPENSATE), &log_lsa, log_pgptr);
          compensate = (LOG_REC_COMPENSATE *) ((char *) log_pgptr->area + log_lsa.offset);
          LSA_COPY (&prev_tranlsa, &compensate->undo_nxlsa);
          break;

        case LOG_SYSOP_END:
          /*
           * We found a system top operation that should be skipped from
           * rollback
           */

          /* Read the DATA HEADER */
          LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);
          LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_SYSOP_END), &log_lsa, log_pgptr);
          sysop_end = ((LOG_REC_SYSOP_END *) ((char *) log_pgptr->area + log_lsa.offset));

          if (sysop_end->type == LOG_SYSOP_END_LOGICAL_UNDO)
            {
              /* execute undo */
              rcvindex = sysop_end->undo.data.rcvindex;
              rcv.length = sysop_end->undo.length;
              rcv.offset = sysop_end->undo.data.offset;
              rcv_vpid.volid = sysop_end->undo.data.volid;
              rcv_vpid.pageid = sysop_end->undo.data.pageid;
              rcv.mvcc_id = MVCCID_NULL;

              /* will jump to parent LSA. save it now before advancing to undo data */
              LSA_COPY (&prev_tranlsa, &sysop_end->lastparent_lsa);
              LSA_COPY (&tdes->undo_nxlsa, &sysop_end->lastparent_lsa);

              LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_SYSOP_END), &log_lsa, log_pgptr);
              log_rv_undo_record (thread_p, &log_lsa, log_pgptr, rcvindex, &rcv_vpid, &rcv, &rcv_lsa, tdes,
                      undo_unzip_ptr);
            }
          else if (sysop_end->type == LOG_SYSOP_END_LOGICAL_MVCC_UNDO)
            {
              /* execute undo */
              rcvindex = sysop_end->mvcc_undo.undo.data.rcvindex;
              rcv.length = sysop_end->mvcc_undo.undo.length;
              rcv.offset = sysop_end->mvcc_undo.undo.data.offset;
              rcv_vpid.volid = sysop_end->mvcc_undo.undo.data.volid;
              rcv_vpid.pageid = sysop_end->mvcc_undo.undo.data.pageid;
              rcv.mvcc_id = sysop_end->mvcc_undo.mvccid;

              /* will jump to parent LSA. save it now before advancing to undo data */
              LSA_COPY (&prev_tranlsa, &sysop_end->lastparent_lsa);
              LSA_COPY (&tdes->undo_nxlsa, &sysop_end->lastparent_lsa);
              LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_SYSOP_END), &log_lsa, log_pgptr);
              log_rv_undo_record (thread_p, &log_lsa, log_pgptr, rcvindex, &rcv_vpid, &rcv, &rcv_lsa, tdes,
                      undo_unzip_ptr);
            }
          else if (sysop_end->type == LOG_SYSOP_END_LOGICAL_COMPENSATE)
            {
              /* compensate */
              LSA_COPY (&prev_tranlsa, &sysop_end->compensate_lsa);
            }
          else
            {
              /* should not find run postpones on undo recovery */
              assert (sysop_end->type != LOG_SYSOP_END_LOGICAL_RUN_POSTPONE);

              /* jump to parent LSA */
              LSA_COPY (&prev_tranlsa, &sysop_end->lastparent_lsa);
            }
          break;

        case LOG_RUN_POSTPONE:
        case LOG_COMMIT_WITH_POSTPONE:
        case LOG_COMMIT_WITH_POSTPONE_OBSOLETE:
        case LOG_COMMIT:
        case LOG_SYSOP_START_POSTPONE:
        case LOG_ABORT:
        case LOG_START_CHKPT:
        case LOG_END_CHKPT:
        case LOG_2PC_PREPARE:
        case LOG_2PC_START:
        case LOG_2PC_ABORT_DECISION:
        case LOG_2PC_COMMIT_DECISION:
        case LOG_2PC_ABORT_INFORM_PARTICPS:
        case LOG_2PC_COMMIT_INFORM_PARTICPS:
        case LOG_2PC_RECV_ACK:
        case LOG_DUMMY_CRASH_RECOVERY:
        case LOG_END_OF_LOG:
          /* This looks like a system error in the analysis phase */
#if defined(CUBRID_DEBUG)
          er_log_debug (ARG_FILE_LINE,
                "log_recovery_undo: SYSTEM ERROR for log located at %lld|%d,"
                " Bad log_rectype = %d\n (%s).\n", (long long int) log_lsa.pageid, log_lsa.offset,
                log_rtype, log_to_string (log_rtype));
#endif /* CUBRID_DEBUG */
          /* Remove the transaction from the recovery process */
          assert (false);

          /* Clear MVCCID */
          tdes->mvccinfo.id = MVCCID_NULL;

          if (logtb_is_system_worker_tranid (tran_id))
            {
              // *INDENT-OFF*
              log_system_tdes::rv_delete_tdes (tran_id);
              // *INDENT-ON*
            }
          else
            {
              (void) log_complete (thread_p, tdes, LOG_ABORT, LOG_DONT_NEED_NEWTRID, LOG_NEED_TO_WRITE_EOT_LOG);
              logtb_free_tran_index (thread_p, tran_index);
            }
          tdes = NULL;
          break;

        case LOG_SMALLER_LOGREC_TYPE:
        case LOG_LARGER_LOGREC_TYPE:
        default:
#if defined(CUBRID_DEBUG)
          er_log_debug (ARG_FILE_LINE,
                "log_recovery_undo: Unknown record type = %d (%s)\n ... May be a system error",
                log_rtype, log_to_string (log_rtype));
#endif /* CUBRID_DEBUG */
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_PAGE_CORRUPTED, 1, log_lsa.pageid);
          assert (false);

          /*
           * Remove the transaction from the recovery process
           */

          /* Clear MVCCID */
          tdes->mvccinfo.id = MVCCID_NULL;

          if (logtb_is_system_worker_tranid (tran_id))
            {
              // *INDENT-OFF*
              log_system_tdes::rv_delete_tdes (tran_id);
              // *INDENT-ON*
            }
          else
            {
              (void) log_complete (thread_p, tdes, LOG_ABORT, LOG_DONT_NEED_NEWTRID, LOG_NEED_TO_WRITE_EOT_LOG);
              logtb_free_tran_index (thread_p, tran_index);
            }
          tdes = NULL;
          break;
        }

          /* Just in case, it was changed */
          if (tdes != NULL)
        {
          /* Is this the end of transaction? */
          if (LSA_ISNULL (&prev_tranlsa))
            {
              /* Clear MVCCID */
              tdes->mvccinfo.id = MVCCID_NULL;

              if (logtb_is_system_worker_tranid (tran_id))
            {
              // *INDENT-OFF*
              log_system_tdes::rv_delete_tdes (tran_id);
              // *INDENT-ON*
            }
              else
            {
#ifdef CCI_XA
              if (tdes->state == TRAN_UNACTIVE_2PC_PREPARE)
                {
                  /*
                   * In 2PC Prepare state, commit or abort should not be performed.
                   * Commit or abort should be performed by xa_tran_end from the coordinator.
                   */
                  ;
                }
              else
#endif
                {
                  (void) log_complete (thread_p, tdes, LOG_ABORT, LOG_DONT_NEED_NEWTRID,
                           LOG_NEED_TO_WRITE_EOT_LOG);
                  logtb_free_tran_index (thread_p, tran_index);
                  tdes = NULL;
                }
            }
            }
          else
            {
              /* Update transaction next undo LSA */
              LSA_COPY (&tdes->undo_nxlsa, &prev_tranlsa);
            }
        }
        }

      /* Find the next log record to undo */
      max_undo_lsa = NULL_LSA;
      logtb_rv_read_only_map_undo_tdes (thread_p, max_undo_lsa_func);
    }
    }

  log_zip_free (undo_unzip_ptr);

  er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_RECOVERY_PHASE_FINISHING_UP, 1, "UNDO");

  /* Flush all dirty pages */

  LOG_CS_ENTER (thread_p);

  logpb_flush_pages_direct (thread_p);

  logpb_flush_header (thread_p);
  (void) pgbuf_flush_all (thread_p, NULL_VOLID);

  return;
}

/*
 * log_recovery_notpartof_archives - REMOVE ARCHIVES THAT ARE NOT PART OF
 *                                 DATABASE (USED for PARTIAL RECOVERY)
 *
 * return: nothing..
 *
 *   start_arv_num(in): Start removing archives at this point.
 *   info_reason(in): Reason for removal
 *
 * NOTE: Remove archives that are not part of database any longer.
 *              This happen when we do partial recovery.
 */
static void
log_recovery_notpartof_archives (THREAD_ENTRY * thread_p, int start_arv_num, const char *info_reason)
{
  char logarv_name[PATH_MAX];   /* Archive name */
  char logarv_name_first[PATH_MAX]; /* Archive name */
  int i;
  const char *catmsg;
  int error_code;

  if (log_Gl.append.vdes != NULL_VOLDES)
    {
      /*
       * Trust the current log header to remove any archives that are not
       * needed due to partial recovery
       */
      for (i = start_arv_num; i <= log_Gl.hdr.nxarv_num - 1; i++)
    {
      fileio_make_log_archive_name (logarv_name, log_Archive_path, log_Prefix, i);
      fileio_unformat (thread_p, logarv_name);
    }
    }
  else
    {
      /*
       * We don't know where to stop. Stop when an archive is not in the OS
       * This may not be good enough.
       */
      for (i = start_arv_num; i <= INT_MAX; i++)
    {
      fileio_make_log_archive_name (logarv_name, log_Archive_path, log_Prefix, i);
      if (fileio_is_volume_exist (logarv_name) == false)
        {
          if (i > start_arv_num)
        {
          fileio_make_log_archive_name (logarv_name, log_Archive_path, log_Prefix, i - 1);
        }
          break;
        }
      fileio_unformat (thread_p, logarv_name);
    }
    }

  if (info_reason != NULL)
    /*
     * Note if start_arv_num == i, we break from the loop and did not remove
     * anything
     */
    if (start_arv_num != i)
      {
    if (start_arv_num == i - 1)
      {
        catmsg = msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_LOGINFO_REMOVE_REASON);
        if (catmsg == NULL)
          {
        catmsg = "REMOVE: %d %s to \n%d %s.\nREASON: %s\n";
          }
        error_code =
          log_dump_log_info (log_Name_info, true, catmsg, start_arv_num, logarv_name, start_arv_num, logarv_name,
                 info_reason);
        if (error_code != NO_ERROR && error_code != ER_LOG_MOUNT_FAIL)
          {
        return;
          }
      }
    else
      {
        fileio_make_log_archive_name (logarv_name_first, log_Archive_path, log_Prefix, start_arv_num);

        catmsg = msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_LOGINFO_REMOVE_REASON);
        if (catmsg == NULL)
          {
        catmsg = "REMOVE: %d %s to \n%d %s.\nREASON: %s\n";
          }
        error_code =
          log_dump_log_info (log_Name_info, true, catmsg, start_arv_num, logarv_name_first, i - 1, logarv_name,
                 info_reason);
        if (error_code != NO_ERROR && error_code != ER_LOG_MOUNT_FAIL)
          {
        return;
          }
      }
      }

  log_Gl.hdr.last_deleted_arv_num = (start_arv_num == i) ? i : i - 1;


  if (log_Gl.append.vdes != NULL_VOLDES)
    {
      logpb_flush_header (thread_p);    /* to get rid off archives */
    }

}

/*
 * log_unformat_ahead_volumes -
 *
 * return:
 *
 *   volid(in):
 *   start_volid(in):
 *
 * NOTE:
 */
static bool
log_unformat_ahead_volumes (THREAD_ENTRY * thread_p, VOLID volid, VOLID * start_volid)
{
  bool result = true;

  if (volid != NULL_VOLID && volid >= *start_volid)
    {
      /* This volume is not part of the database any longer */
      if (pgbuf_invalidate_all (thread_p, volid) != NO_ERROR)
    {
      result = false;
    }
      else
    {
      char *vlabel = fileio_get_volume_label (volid, ALLOC_COPY);
      fileio_unformat (thread_p, vlabel);
      if (vlabel)
        {
          free (vlabel);
        }
    }
    }
  return result;
}

/*
 * log_recovery_notpartof_volumes -
 *
 * return:
 *
 * NOTE:
 */
static void
log_recovery_notpartof_volumes (THREAD_ENTRY * thread_p)
{
  const char *ext_path;
  const char *ext_name;
  int vdes;
  VOLID start_volid;
  VOLID volid;
  char vol_fullname[PATH_MAX];
  INT64 vol_dbcreation;     /* Database creation time in volume */
  char *alloc_extpath = NULL;
  int ret = NO_ERROR;

  start_volid = boot_find_next_permanent_volid (thread_p);

  /*
   * FIRST: ASSUME VOLUME INFORMATION WAS AHEAD OF US.
   * Start removing mounted volumes that are not part of the database any
   * longer due to partial recovery point. Note that these volumes were
   * mounted before the recovery started.
   */

  (void) fileio_map_mounted (thread_p, (bool (*)(THREAD_ENTRY *, VOLID, void *)) log_unformat_ahead_volumes,
                 &start_volid);

  /*
   * SECOND: ASSUME RIGHT VOLUME INFORMATION.
   * Remove any volumes that are laying around on disk
   */

  /*
   * Get the name of the extension: ext_path|dbname|"ext"|volid
   */

  /* Use the directory where the primary volume is located */
  alloc_extpath = (char *) malloc (PATH_MAX);
  if (alloc_extpath != NULL)
    {
      ext_path = fileio_get_directory_path (alloc_extpath, log_Db_fullname);
      if (ext_path == NULL)
    {
      alloc_extpath[0] = '\0';
      ext_path = alloc_extpath;
    }
    }
  else
    {
      ext_path = "";        /* Pointer to a null terminated string */
    }

  ext_name = fileio_get_base_file_name (log_Db_fullname);
  /*
   * We don't know where to stop. Stop when an archive is not in the OS
   */

  for (volid = start_volid; volid < LOG_MAX_DBVOLID; volid++)
    {
      fileio_make_volume_ext_name (vol_fullname, ext_path, ext_name, volid);
      if (fileio_is_volume_exist (vol_fullname) == false)
    {
      break;
    }

      vdes = fileio_mount (thread_p, log_Db_fullname, vol_fullname, volid, false, false);
      if (vdes != NULL_VOLDES)
    {
      ret = disk_get_db_creation (thread_p, volid, &vol_dbcreation);
      fileio_dismount (thread_p, vdes);
      if (difftime ((time_t) vol_dbcreation, (time_t) log_Gl.hdr.db_creation) != 0)
        {
          /* This volume does not belong to given database */
          ;         /* NO-OP */
        }
      else
        {
          fileio_unformat (thread_p, vol_fullname);
        }
    }
    }

  if (alloc_extpath)
    {
      free_and_init (alloc_extpath);
    }

  (void) logpb_recreate_volume_info (thread_p);

}

static void
log_recovery_resetlog (THREAD_ENTRY * thread_p, const LOG_LSA * new_append_lsa, const LOG_LSA * new_prev_lsa)
{
  char newappend_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
  char *aligned_newappend_pgbuf;
  LOG_PAGE *newappend_pgptr = NULL;
  int arv_num;
  const char *catmsg;
  char *catmsg_dup;
  int ret = NO_ERROR;

  assert (LOG_CS_OWN_WRITE_MODE (thread_p));
  assert (new_prev_lsa != NULL);

  aligned_newappend_pgbuf = PTR_ALIGN (newappend_pgbuf, MAX_ALIGNMENT);

  if (log_Gl.append.vdes != NULL_VOLDES && log_Gl.append.log_pgptr != NULL)
    {
      logpb_flush_pages_direct (thread_p);
      logpb_invalid_all_append_pages (thread_p);
    }

  if (LSA_ISNULL (new_append_lsa))
    {
      log_Gl.hdr.append_lsa.pageid = 0;
      log_Gl.hdr.append_lsa.offset = 0;
      LOG_RESET_APPEND_LSA (&log_Gl.hdr.append_lsa);
    }
  else
    {
      if (log_Gl.append.vdes == NULL_VOLDES
      || (log_Gl.hdr.fpageid > new_append_lsa->pageid && new_append_lsa->offset > 0))
    {
      /*
       * We are going to rest the header of the active log to the past.
       * Save the content of the new append page since it has to be
       * transfered to the new location. This is needed since we may not
       * start at location zero.
       *
       * We need to destroy any log archive created after this point
       */

      newappend_pgptr = (LOG_PAGE *) aligned_newappend_pgbuf;

      if ((logpb_fetch_page (thread_p, new_append_lsa, LOG_CS_FORCE_USE, newappend_pgptr)) != NO_ERROR)
        {
          newappend_pgptr = NULL;
        }
    }
      LOG_RESET_APPEND_LSA (new_append_lsa);
    }

  LSA_COPY (&log_Gl.hdr.chkpt_lsa, &log_Gl.hdr.append_lsa);
  log_Gl.hdr.is_shutdown = false;

  logpb_invalidate_pool (thread_p);

  if (log_Gl.append.vdes == NULL_VOLDES || log_Gl.hdr.fpageid > log_Gl.hdr.append_lsa.pageid)
    {
      LOG_PAGE *loghdr_pgptr = NULL;
      LOG_PAGE *append_pgptr = NULL;

      /*
       * Don't have the log active, or we are going to the past
       */
      arv_num = logpb_get_archive_number (thread_p, log_Gl.hdr.append_lsa.pageid - 1);
      if (arv_num == -1)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_resetlog");
      return;
    }
      arv_num = arv_num + 1;

      catmsg = msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_RESETLOG_DUE_INCOMPLTE_MEDIA_RECOVERY);
      catmsg_dup = strdup (catmsg);
      if (catmsg_dup != NULL)
    {
      log_recovery_notpartof_archives (thread_p, arv_num, catmsg_dup);
      free_and_init (catmsg_dup);
    }
      else
    {
      /* NOTE: catmsg..may get corrupted if the function calls the catalog */
      log_recovery_notpartof_archives (thread_p, arv_num, catmsg);
    }

      log_Gl.hdr.fpageid = log_Gl.hdr.append_lsa.pageid;
      log_Gl.hdr.nxarv_pageid = log_Gl.hdr.append_lsa.pageid;
      log_Gl.hdr.nxarv_phy_pageid = logpb_to_physical_pageid (log_Gl.hdr.nxarv_pageid);
      log_Gl.hdr.nxarv_num = arv_num;
      log_Gl.hdr.last_arv_num_for_syscrashes = -1;
      log_Gl.hdr.last_deleted_arv_num = -1;

      if (log_Gl.append.vdes == NULL_VOLDES)
    {
      /* Create the log active since we do not have one */
      ret = disk_get_db_creation (thread_p, LOG_DBFIRST_VOLID, &log_Gl.hdr.db_creation);
      if (ret != NO_ERROR)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_resetlog");
          return;
        }

      log_Gl.append.vdes =
        fileio_format (thread_p, log_Db_fullname, log_Name_active, LOG_DBLOG_ACTIVE_VOLID,
               log_get_num_pages_for_creation (-1), false, true, false, LOG_PAGESIZE, 0, false);

      if (log_Gl.append.vdes != NULL_VOLDES)
        {
          loghdr_pgptr = logpb_create_header_page (thread_p);
        }

      if (log_Gl.append.vdes == NULL_VOLDES || loghdr_pgptr == NULL)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_resetlog");
          return;
        }

      /*
       * Flush the header page and first append page so that we can record
       * the header page on it
       */
      if (logpb_flush_page (thread_p, loghdr_pgptr) != NO_ERROR)
        {
          logpb_fatal_error (thread_p, false, ARG_FILE_LINE, "log_recovery_resetlog");
        }
    }

      append_pgptr = logpb_create_page (thread_p, log_Gl.hdr.fpageid);
      if (append_pgptr == NULL)
    {
      logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_resetlog");
      return;
    }
      if (logpb_flush_page (thread_p, append_pgptr) != NO_ERROR)
    {
      logpb_fatal_error (thread_p, false, ARG_FILE_LINE, "log_recovery_resetlog");
      return;
    }
    }
  else
    {
      /*
       * There is already a log active and the new append location is in the
       * current range. Leave the log as it is, just reset the append location.
       */
      if (log_Gl.hdr.nxarv_pageid >= log_Gl.hdr.append_lsa.pageid)
    {
      log_Gl.hdr.nxarv_pageid = log_Gl.hdr.append_lsa.pageid;
      log_Gl.hdr.nxarv_phy_pageid = logpb_to_physical_pageid (log_Gl.hdr.nxarv_pageid);
    }
    }

  /*
   * Fetch the append page and write it with and end of log mark.
   * Then, free the page, same for the header page.
   */

  if (logpb_fetch_start_append_page (thread_p) == NO_ERROR)
    {
      if (newappend_pgptr != NULL && log_Gl.append.log_pgptr != NULL)
    {
      memcpy ((char *) log_Gl.append.log_pgptr, (char *) newappend_pgptr, LOG_PAGESIZE);
      logpb_set_dirty (thread_p, log_Gl.append.log_pgptr);
    }
      logpb_flush_pages_direct (thread_p);
    }

  LOG_RESET_PREV_LSA (new_prev_lsa);

  log_Gl.hdr.mvcc_op_log_lsa.set_null ();
  log_Gl.hdr.vacuum_last_blockid = 0;

  // set a flag that active log was reset; some operations may be affected
  log_Gl.hdr.was_active_log_reset = true;

  logpb_flush_header (thread_p);
  logpb_decache_archive_info (thread_p);

  return;
}

/*
 * log_startof_nxrec - FIND START OF NEXT RECORD (USED FOR PARTIAL RECOVERY)
 *
 * return: lsa or NULL in case of error
 *
 *   lsa(in):  Starting address. Set as a side effect to next address
 *   canuse_forwaddr(in): Use forward address if available
 *
 * NOTE:Find start address of next record either by looking to forward
 *              address or by scanning the current record.
 */
LOG_LSA *
log_startof_nxrec (THREAD_ENTRY * thread_p, LOG_LSA * lsa, bool canuse_forwaddr)
{
  char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT], *aligned_log_pgbuf;
  LOG_PAGE *log_pgptr = NULL;   /* Log page pointer where LSA is located */
  LOG_LSA log_lsa;
  LOG_RECTYPE type;     /* Log record type */
  LOG_RECORD_HEADER *log_rec;   /* Pointer to log record */
  LOG_REC_UNDOREDO *undoredo;   /* Undo_redo log record */
  LOG_REC_UNDO *undo;       /* Undo log record */
  LOG_REC_REDO *redo;       /* Redo log record */
  LOG_REC_MVCC_UNDOREDO *mvcc_undoredo; /* MVCC op undo_redo log record */
  LOG_REC_MVCC_UNDO *mvcc_undo; /* MVCC op undo log record */
  LOG_REC_MVCC_REDO *mvcc_redo; /* MVCC op redo log record */
  LOG_REC_DBOUT_REDO *dbout_redo;   /* A external redo log record */
  LOG_REC_SAVEPT *savept;   /* A savepoint log record */
  LOG_REC_COMPENSATE *compensate;   /* Compensating log record */
  LOG_REC_RUN_POSTPONE *run_posp;   /* A run postpone action */
  LOG_REC_CHKPT *chkpt;     /* Checkpoint log record */
  LOG_REC_2PC_START *start_2pc; /* A 2PC start log record */
  LOG_REC_2PC_PREPCOMMIT *prepared; /* A 2PC prepare to commit */
  LOG_REC_REPLICATION *repl_log;
  LOG_REC_SUPPLEMENT *supplement;

  int undo_length;      /* Undo length */
  int redo_length;      /* Redo length */
  unsigned int nobj_locks;
  int repl_log_length;
  size_t size;

  aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);

  if (LSA_ISNULL (lsa))
    {
      return NULL;
    }

  log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;

  if (logpb_fetch_page (thread_p, lsa, LOG_CS_FORCE_USE, log_pgptr) != NO_ERROR)
    {
      fprintf (stdout, " Error reading page %lld... Quit\n", (long long int) lsa->pageid);
      goto error;
    }

  /*
   * If offset is missing, it is because we archive an incomplete
   * log record or we start dumping the log not from its first page. We
   * have to find the offset by searching for the next log_record in the page
   */
  if (lsa->offset == NULL_OFFSET)
    {
      lsa->offset = log_pgptr->hdr.offset;
      if (lsa->offset == NULL_OFFSET)
    {
      goto error;
    }
    }

  LSA_COPY (&log_lsa, lsa);
  log_rec = LOG_GET_LOG_RECORD_HEADER (log_pgptr, &log_lsa);
  type = log_rec->type;

  if (canuse_forwaddr == true)
    {
      /*
       * Use forward address of current log record
       */
      LSA_COPY (lsa, &log_rec->forw_lsa);
      if (LSA_ISNULL (lsa) && logpb_is_page_in_archive (log_lsa.pageid))
    {
      lsa->pageid = log_lsa.pageid + 1;
    }

      if (!LSA_ISNULL (lsa))
    {
      return lsa;
    }
    }

  /* Advance the pointer to log_rec data */

  LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);
  switch (type)
    {
    case LOG_UNDOREDO_DATA:
    case LOG_DIFF_UNDOREDO_DATA:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_UNDOREDO), &log_lsa, log_pgptr);
      undoredo = (LOG_REC_UNDOREDO *) ((char *) log_pgptr->area + log_lsa.offset);

      undo_length = (int) GET_ZIP_LEN (undoredo->ulength);
      redo_length = (int) GET_ZIP_LEN (undoredo->rlength);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_UNDOREDO), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, undo_length, &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
      break;

    case LOG_MVCC_UNDOREDO_DATA:
    case LOG_MVCC_DIFF_UNDOREDO_DATA:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_MVCC_UNDOREDO), &log_lsa, log_pgptr);
      mvcc_undoredo = (LOG_REC_MVCC_UNDOREDO *) ((char *) log_pgptr->area + log_lsa.offset);

      undo_length = (int) GET_ZIP_LEN (mvcc_undoredo->undoredo.ulength);
      redo_length = (int) GET_ZIP_LEN (mvcc_undoredo->undoredo.rlength);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_MVCC_UNDOREDO), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, undo_length, &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
      break;

    case LOG_UNDO_DATA:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_UNDO), &log_lsa, log_pgptr);
      undo = (LOG_REC_UNDO *) ((char *) log_pgptr->area + log_lsa.offset);

      undo_length = (int) GET_ZIP_LEN (undo->length);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_UNDO), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, undo_length, &log_lsa, log_pgptr);
      break;

    case LOG_MVCC_UNDO_DATA:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_MVCC_UNDO), &log_lsa, log_pgptr);
      mvcc_undo = (LOG_REC_MVCC_UNDO *) ((char *) log_pgptr->area + log_lsa.offset);

      undo_length = (int) GET_ZIP_LEN (mvcc_undo->undo.length);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_MVCC_UNDO), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, undo_length, &log_lsa, log_pgptr);
      break;

    case LOG_MVCC_REDO_DATA:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_MVCC_REDO), &log_lsa, log_pgptr);
      mvcc_redo = (LOG_REC_MVCC_REDO *) ((char *) log_pgptr->area + log_lsa.offset);
      redo_length = (int) GET_ZIP_LEN (mvcc_redo->redo.length);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_MVCC_REDO), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
      break;

    case LOG_REDO_DATA:
    case LOG_POSTPONE:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_REDO), &log_lsa, log_pgptr);
      redo = (LOG_REC_REDO *) ((char *) log_pgptr->area + log_lsa.offset);
      redo_length = (int) GET_ZIP_LEN (redo->length);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_REDO), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
      break;

    case LOG_RUN_POSTPONE:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_RUN_POSTPONE), &log_lsa, log_pgptr);
      run_posp = (LOG_REC_RUN_POSTPONE *) ((char *) log_pgptr->area + log_lsa.offset);
      redo_length = run_posp->length;

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_RUN_POSTPONE), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
      break;

    case LOG_DBEXTERN_REDO_DATA:
      /* Read the data header */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_DBOUT_REDO), &log_lsa, log_pgptr);
      dbout_redo = ((LOG_REC_DBOUT_REDO *) ((char *) log_pgptr->area + log_lsa.offset));
      redo_length = dbout_redo->length;

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_DBOUT_REDO), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
      break;

    case LOG_COMPENSATE:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_COMPENSATE), &log_lsa, log_pgptr);
      compensate = (LOG_REC_COMPENSATE *) ((char *) log_pgptr->area + log_lsa.offset);
      redo_length = compensate->length;

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_COMPENSATE), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
      break;

    case LOG_COMMIT_WITH_POSTPONE:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_START_POSTPONE), &log_lsa, log_pgptr);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_START_POSTPONE), &log_lsa, log_pgptr);
      break;

    case LOG_COMMIT_WITH_POSTPONE_OBSOLETE:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_START_POSTPONE_OBSOLETE), &log_lsa, log_pgptr);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_START_POSTPONE_OBSOLETE), &log_lsa, log_pgptr);
      break;

    case LOG_COMMIT:
    case LOG_ABORT:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_DONETIME), &log_lsa, log_pgptr);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_DONETIME), &log_lsa, log_pgptr);
      break;

    case LOG_SYSOP_START_POSTPONE:
      {
    LOG_REC_SYSOP_START_POSTPONE *sysop_start_postpone;
    int undo_size = 0;

    /* Read the DATA HEADER */
    LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_SYSOP_START_POSTPONE), &log_lsa, log_pgptr);
    sysop_start_postpone = (LOG_REC_SYSOP_START_POSTPONE *) (log_pgptr->area + log_lsa.offset);
    if (sysop_start_postpone->sysop_end.type == LOG_SYSOP_END_LOGICAL_UNDO)
      {
        undo_size = sysop_start_postpone->sysop_end.undo.length;
      }
    else if (sysop_start_postpone->sysop_end.type == LOG_SYSOP_END_LOGICAL_MVCC_UNDO)
      {
        undo_size = sysop_start_postpone->sysop_end.mvcc_undo.undo.length;
      }
    LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_SYSOP_START_POSTPONE), &log_lsa, log_pgptr);
    LOG_READ_ADD_ALIGN (thread_p, undo_size, &log_lsa, log_pgptr);
      }
      break;

    case LOG_SYSOP_END:
      {
    LOG_REC_SYSOP_END *sysop_end;
    int undo_size = 0;

    /* Read the DATA HEADER */
    LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_SYSOP_END), &log_lsa, log_pgptr);

    sysop_end = (LOG_REC_SYSOP_END *) (log_pgptr->area + log_lsa.offset);

    if (sysop_end->type == LOG_SYSOP_END_LOGICAL_UNDO)
      {
        undo_size = sysop_end->undo.length;
      }
    else if (sysop_end->type == LOG_SYSOP_END_LOGICAL_MVCC_UNDO)
      {
        undo_size = sysop_end->mvcc_undo.undo.length;
      }
    LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_SYSOP_END), &log_lsa, log_pgptr);
      }
      break;

    case LOG_END_CHKPT:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_CHKPT), &log_lsa, log_pgptr);
      chkpt = (LOG_REC_CHKPT *) ((char *) log_pgptr->area + log_lsa.offset);
      undo_length = sizeof (LOG_INFO_CHKPT_TRANS) * chkpt->ntrans;
      redo_length = sizeof (LOG_INFO_CHKPT_SYSOP) * chkpt->ntops;

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_CHKPT), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, undo_length, &log_lsa, log_pgptr);
      if (redo_length > 0)
    {
      LOG_READ_ADD_ALIGN (thread_p, redo_length, &log_lsa, log_pgptr);
    }
      break;

    case LOG_SAVEPOINT:
      /* Read the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_SAVEPT), &log_lsa, log_pgptr);
      savept = (LOG_REC_SAVEPT *) ((char *) log_pgptr->area + log_lsa.offset);
      undo_length = savept->length;

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_SAVEPT), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, undo_length, &log_lsa, log_pgptr);
      break;

    case LOG_2PC_PREPARE:
      /* Get the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_2PC_PREPCOMMIT), &log_lsa, log_pgptr);
      prepared = (LOG_REC_2PC_PREPCOMMIT *) ((char *) log_pgptr->area + log_lsa.offset);
      nobj_locks = prepared->num_object_locks;
      /* ignore npage_locks */

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_2PC_PREPCOMMIT), &log_lsa, log_pgptr);

      if (prepared->gtrinfo_length > 0)
    {
      LOG_READ_ADD_ALIGN (thread_p, prepared->gtrinfo_length, &log_lsa, log_pgptr);
    }

      if (nobj_locks > 0)
    {
      size = nobj_locks * sizeof (LK_ACQOBJ_LOCK);
      LOG_READ_ADD_ALIGN (thread_p, (INT16) size, &log_lsa, log_pgptr);
    }
      break;

    case LOG_2PC_START:
      /* Get the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_2PC_START), &log_lsa, log_pgptr);
      start_2pc = (LOG_REC_2PC_START *) ((char *) log_pgptr->area + log_lsa.offset);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_2PC_START), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, (start_2pc->particp_id_length * start_2pc->num_particps), &log_lsa, log_pgptr);
      break;

    case LOG_2PC_RECV_ACK:
      /* Get the DATA HEADER */
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_2PC_PARTICP_ACK), &log_lsa, log_pgptr);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_2PC_PARTICP_ACK), &log_lsa, log_pgptr);
      break;

    case LOG_SUPPLEMENTAL_INFO:
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_SUPPLEMENT), &log_lsa, log_pgptr);
      supplement = (LOG_REC_SUPPLEMENT *) ((char *) log_pgptr->area + log_lsa.offset);
      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_SUPPLEMENT), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, supplement->length, &log_lsa, log_pgptr);
    case LOG_START_CHKPT:
    case LOG_2PC_COMMIT_DECISION:
    case LOG_2PC_ABORT_DECISION:
    case LOG_2PC_COMMIT_INFORM_PARTICPS:
    case LOG_2PC_ABORT_INFORM_PARTICPS:
    case LOG_DUMMY_HEAD_POSTPONE:
    case LOG_DUMMY_CRASH_RECOVERY:
    case LOG_DUMMY_OVF_RECORD:
    case LOG_DUMMY_GENERIC:
    case LOG_SYSOP_ATOMIC_START:
      break;

    case LOG_END_OF_LOG:
      assert (false);
      break;

    case LOG_REPLICATION_DATA:
    case LOG_REPLICATION_STATEMENT:
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_REPLICATION), &log_lsa, log_pgptr);

      repl_log = (LOG_REC_REPLICATION *) ((char *) log_pgptr->area + log_lsa.offset);
      repl_log_length = (int) GET_ZIP_LEN (repl_log->length);

      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_REPLICATION), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, repl_log_length, &log_lsa, log_pgptr);
      break;

    case LOG_DUMMY_HA_SERVER_STATE:
      LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_HA_SERVER_STATE), &log_lsa, log_pgptr);
      LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_REC_HA_SERVER_STATE), &log_lsa, log_pgptr);
      break;

    case LOG_SMALLER_LOGREC_TYPE:
    case LOG_LARGER_LOGREC_TYPE:
    default:
      break;
    }

  /* Make sure you point to beginning of a next record */
  LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);

  LSA_COPY (lsa, &log_lsa);

  return lsa;

error:

  return NULL;
}

/*
 * log_recovery_find_first_postpone -
 *      Find the first postpone log lsa to be applied.
 *
 * return: error code
 *
 *   ret_lsa(out):
 *   start_postpone_lsa(in):
 *   tdes(in):
 *
 */
static int
log_recovery_find_first_postpone (THREAD_ENTRY * thread_p, LOG_LSA * ret_lsa, LOG_LSA * start_postpone_lsa,
                  LOG_TDES * tdes)
{
  LOG_LSA end_postpone_lsa;
  LOG_LSA start_seek_lsa;
  LOG_LSA *end_seek_lsa;
  LOG_LSA next_start_seek_lsa;
  LOG_LSA log_lsa;
  LOG_LSA forward_lsa;
  LOG_LSA next_postpone_lsa;
  LOG_LSA local_start_postpone_run_lsa;
  LOG_REC_RUN_POSTPONE *run_posp;

  char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
  char *aligned_log_pgbuf;
  LOG_PAGE *log_pgptr = NULL;
  LOG_RECORD_HEADER *log_rec;
  bool isdone;

  LOG_TOPOP_RANGE nxtop_array[LOG_TOPOP_STACK_INIT_SIZE];
  LOG_TOPOP_RANGE *nxtop_stack = NULL;
  LOG_TOPOP_RANGE *nxtop_range = NULL;
  int nxtop_count = 0;
  bool start_postpone_lsa_wasapplied = false;

  assert (ret_lsa && start_postpone_lsa && tdes);

  LSA_SET_NULL (ret_lsa);

  if (log_is_in_crash_recovery () == false
      || (tdes->state != TRAN_UNACTIVE_WILL_COMMIT && tdes->state != TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE
      && tdes->state != TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE))
    {
      assert (0);
      return ER_FAILED;
    }

  if (LSA_ISNULL (start_postpone_lsa))
    {
      return NO_ERROR;
    }

  LSA_SET_NULL (&next_postpone_lsa);

  aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
  log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;

  LSA_COPY (&end_postpone_lsa, &tdes->tail_lsa);
  LSA_COPY (&next_start_seek_lsa, start_postpone_lsa);

  nxtop_stack = nxtop_array;
  nxtop_count = log_get_next_nested_top (thread_p, tdes, start_postpone_lsa, &nxtop_stack);

  while (!LSA_ISNULL (&next_start_seek_lsa))
    {
      LSA_COPY (&start_seek_lsa, &next_start_seek_lsa);

      if (nxtop_count > 0)
    {
      nxtop_count--;
      nxtop_range = &(nxtop_stack[nxtop_count]);

      if (LSA_LT (&start_seek_lsa, &(nxtop_range->start_lsa)))
        {
          end_seek_lsa = &(nxtop_range->start_lsa);
          LSA_COPY (&next_start_seek_lsa, &(nxtop_range->end_lsa));
        }
      else if (LSA_EQ (&start_seek_lsa, &(nxtop_range->end_lsa)))
        {
          end_seek_lsa = &end_postpone_lsa;
          LSA_SET_NULL (&next_start_seek_lsa);
        }
      else
        {
          LSA_COPY (&next_start_seek_lsa, &(nxtop_range->end_lsa));
          continue;
        }
    }
      else
    {
      end_seek_lsa = &end_postpone_lsa;
      LSA_SET_NULL (&next_start_seek_lsa);
    }

      /*
       * Start doing postpone operation for this range
       */

      LSA_COPY (&forward_lsa, &start_seek_lsa);

      isdone = false;
      while (!LSA_ISNULL (&forward_lsa) && !isdone)
    {
      /* Fetch the page where the postpone LSA record is located */
      LSA_COPY (&log_lsa, &forward_lsa);
      if (logpb_fetch_page (thread_p, &log_lsa, LOG_CS_FORCE_USE, log_pgptr) != NO_ERROR)
        {
          logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_find_first_postpone");
          goto end;
        }

      while (forward_lsa.pageid == log_lsa.pageid && !isdone)
        {
          if (LSA_GT (&forward_lsa, end_seek_lsa))
        {
          /* Finish at this point */
          isdone = true;
          break;
        }
          /*
           * If an offset is missing, it is because we archive an incomplete
           * log record. This log_record was completed later.
           * Thus, we have to find the offset by searching
           * for the next log_record in the page.
           */
          if (forward_lsa.offset == NULL_OFFSET)
        {
          forward_lsa.offset = log_pgptr->hdr.offset;
          if (forward_lsa.offset == NULL_OFFSET)
            {
              /* Continue at next pageid */
              if (logpb_is_page_in_archive (log_lsa.pageid))
            {
              forward_lsa.pageid = log_lsa.pageid + 1;
            }
              else
            {
              forward_lsa.pageid = NULL_PAGEID;
            }
              continue;
            }
        }

          log_lsa.offset = forward_lsa.offset;
          log_rec = LOG_GET_LOG_RECORD_HEADER (log_pgptr, &log_lsa);

          /* Find the next log record in the log */
          LSA_COPY (&forward_lsa, &log_rec->forw_lsa);

          if (forward_lsa.pageid == NULL_PAGEID && logpb_is_page_in_archive (log_lsa.pageid))
        {
          forward_lsa.pageid = log_lsa.pageid + 1;
        }

          if (log_rec->trid == tdes->trid)
        {
          switch (log_rec->type)
            {
            case LOG_RUN_POSTPONE:
              LSA_COPY (&local_start_postpone_run_lsa, &log_lsa);
              LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);

              LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_RUN_POSTPONE), &log_lsa, log_pgptr);

              run_posp = (LOG_REC_RUN_POSTPONE *) ((char *) log_pgptr->area + log_lsa.offset);

              if (LSA_EQ (start_postpone_lsa, &run_posp->ref_lsa))
            {
              /* run_postpone_log of start_postpone is found, next_postpone_lsa is the first postpone to be
               * applied. */

              start_postpone_lsa_wasapplied = true;
              isdone = true;
            }
              break;

            case LOG_SYSOP_END:
              {
            LOG_REC_SYSOP_END *sysop_end = NULL;

            LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), &log_lsa, log_pgptr);
            LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (LOG_REC_SYSOP_END), &log_lsa, log_pgptr);

            sysop_end = (LOG_REC_SYSOP_END *) (log_pgptr->area + log_lsa.offset);
            if (sysop_end->type == LOG_SYSOP_END_LOGICAL_RUN_POSTPONE)
              {
                LSA_COPY (&local_start_postpone_run_lsa, &log_lsa);

                if (LSA_EQ (start_postpone_lsa, &sysop_end->run_postpone.postpone_lsa))
                  {
                start_postpone_lsa_wasapplied = true;
                isdone = true;
                  }
              }
              }
              break;

            case LOG_POSTPONE:
              if (LSA_ISNULL (&next_postpone_lsa) && !LSA_EQ (start_postpone_lsa, &log_lsa))
            {
              /* remember next postpone_lsa */
              LSA_COPY (&next_postpone_lsa, &log_lsa);
            }
              break;

            case LOG_END_OF_LOG:
              if (forward_lsa.pageid == NULL_PAGEID && logpb_is_page_in_archive (log_lsa.pageid))
            {
              forward_lsa.pageid = log_lsa.pageid + 1;
            }
              break;

            default:
              break;
            }
        }

          /*
           * We can fix the lsa.pageid in the case of log_records without
           * forward address at this moment.
           */

          if (forward_lsa.offset == NULL_OFFSET && forward_lsa.pageid != NULL_PAGEID
          && forward_lsa.pageid < log_lsa.pageid)
        {
          forward_lsa.pageid = log_lsa.pageid;
        }
        }
    }
    }

end:
  if (nxtop_stack != nxtop_array && nxtop_stack != NULL)
    {
      free_and_init (nxtop_stack);
    }

  if (start_postpone_lsa_wasapplied == false)
    {
      LSA_COPY (ret_lsa, start_postpone_lsa);
    }
  else
    {
      LSA_COPY (ret_lsa, &next_postpone_lsa);
    }

  return NO_ERROR;
}

/*
 * log_rv_undoredo_partial_changes_recursive () - Parse log data recursively
 *                        and apply changes.
 *
 * return    : Error code.
 * thread_p (in) : Thread entry.
 * rcv_buf (in)  : Buffer to process log data.
 * record (in)   : Record being modified.
 * is_undo (in)  : True for undo, false for redo.
 *
 * NOTE: The recursive function is applied for both undo and redo data.
 *   Changes are logged in the order they are made during runtime.
 *   Redo will apply all changes in the same order, but undo will have
 *   to apply them in reversed order.
 */
static int
log_rv_undoredo_partial_changes_recursive (THREAD_ENTRY * thread_p, OR_BUF * rcv_buf, RECDES * record, bool is_undo)
{
  int error_code = NO_ERROR;    /* Error code. */
  int offset_to_data;       /* Offset to data being modified. */
  int old_data_size;        /* Size of old data. */
  int new_data_size;        /* Size of new data. */
  char *new_data = NULL;    /* New data. */

  if (rcv_buf->ptr == rcv_buf->endptr)
    {
      /* Finished. */
      return NO_ERROR;
    }

  /* At least offset_to_data, old_data_size and new_data_size should be stored. */
  if (rcv_buf->ptr + OR_SHORT_SIZE + 2 * OR_BYTE_SIZE > rcv_buf->endptr)
    {
      assert_release (false);
      return ER_TF_BUFFER_OVERFLOW;
    }

  /* Get offset_to_data. */
  offset_to_data = (int) or_get_short (rcv_buf, &error_code);
  if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }

  /* Get old_data_size */
  old_data_size = (int) or_get_byte (rcv_buf, &error_code);
  if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }

  /* Get new_data_size */
  new_data_size = (int) or_get_byte (rcv_buf, &error_code);
  if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }

  if (new_data_size > 0)
    {
      /* Get new data. */
      new_data = rcv_buf->ptr;
      error_code = or_advance (rcv_buf, new_data_size);
      if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }
    }
  else
    {
      /* No new data. */
      new_data = NULL;
    }

  /* Align buffer to expected alignment. */
  or_align (rcv_buf, INT_ALIGNMENT);

  if (!is_undo)
    {
      /* Changes must be applied in the same order they are logged. Change record and then advance to next changes. */
      RECORD_REPLACE_DATA (record, offset_to_data, old_data_size, new_data_size, new_data);
    }
  error_code = log_rv_undoredo_partial_changes_recursive (thread_p, rcv_buf, record, is_undo);
  if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }
  if (is_undo)
    {
      /* Changes must be made in reversed order. Change record after advancing to next changes. */
      RECORD_REPLACE_DATA (record, offset_to_data, old_data_size, new_data_size, new_data);
    }
  return NO_ERROR;
}

/*
 * log_rv_undoredo_record_partial_changes () - Undoredo record data changes.
 *
 * return       : Error code.
 * thread_p (in)    : Thread entry.
 * rcv_data (in)    : Recovery data pointer.
 * rcv_data_length (in) : Recovery data length.
 * record (in)      : Record being modified.
 *
 * TODO: Extend this to undo and undoredo.
 */
int
log_rv_undoredo_record_partial_changes (THREAD_ENTRY * thread_p, char *rcv_data, int rcv_data_length,
                    RECDES * record, bool is_undo)
{
  OR_BUF rcv_buf;       /* Buffer used to process recovery data. */

  /* Assert expected arguments. */
  assert (rcv_data != NULL);
  assert (rcv_data_length > 0);
  assert (record != NULL);

  /* Prepare buffer. */
  or_init (&rcv_buf, rcv_data, rcv_data_length);

  return log_rv_undoredo_partial_changes_recursive (thread_p, &rcv_buf, record, is_undo);
}

/*
 * log_rv_redo_record_modify () - Modify one record of database slotted page.
 *                The change can be one of:
 *                1. New record is inserted.
 *                2. Existing record is removed.
 *                3. Existing record is entirely updated.
 *                4. Existing record is partially updated.
 *
 * return    : Error code.
 * thread_p (in) : Thread entry.
 * rcv (in)  : Recovery data.
 */
int
log_rv_redo_record_modify (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  return log_rv_record_modify_internal (thread_p, rcv, false);
}

/*
 * log_rv_undo_record_modify () - Modify one record of database slotted page.
 *                The change can be one of:
 *                1. New record is inserted.
 *                2. Existing record is removed.
 *                3. Existing record is entirely updated.
 *                4. Existing record is partially updated.
 *
 * return    : Error code.
 * thread_p (in) : Thread entry.
 * rcv (in)  : Recovery data.
 */
int
log_rv_undo_record_modify (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  return log_rv_record_modify_internal (thread_p, rcv, true);
}

/*
 * log_rv_record_modify_internal () - Modify one record of database slotted page.
 *                The change can be one of:
 *                1. New record is inserted.
 *                2. Existing record is removed.
 *                3. Existing record is entirely updated.
 *                4. Existing record is partially updated.
 *
 * return    : Error code.
 * thread_p (in) : Thread entry.
 * rcv (in)  : Recovery data.
 * is_undo (in)  : True if undo recovery, false if redo recovery.
 */
static int
log_rv_record_modify_internal (THREAD_ENTRY * thread_p, LOG_RCV * rcv, bool is_undo)
{
  INT16 flags = rcv->offset & LOG_RV_RECORD_MODIFY_MASK;
  PGSLOTID slotid = rcv->offset & (~LOG_RV_RECORD_MODIFY_MASK);
  RECDES record;
  char data_buffer[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
  char *ptr = NULL;
  int error_code = NO_ERROR;

  if ((!is_undo && LOG_RV_RECORD_IS_INSERT (flags)) || (is_undo && LOG_RV_RECORD_IS_DELETE (flags)))
    {
      /* Insert new record. */
      ptr = (char *) rcv->data;
      /* Get record type. */
      record.type = OR_GET_BYTE (ptr);
      ptr += OR_BYTE_SIZE;
      /* Get record data. */
      record.data = ptr;
      record.length = rcv->length - CAST_BUFLEN (ptr - rcv->data);
      if (spage_insert_at (thread_p, rcv->pgptr, slotid, &record) != SP_SUCCESS)
    {
      /* Unexpected. */
      assert_release (false);
      return ER_FAILED;
    }
      /* Success. */
    }
  else if ((!is_undo && LOG_RV_RECORD_IS_DELETE (flags)) || (is_undo && LOG_RV_RECORD_IS_INSERT (flags)))
    {
      if (spage_delete (thread_p, rcv->pgptr, slotid) != slotid)
    {
      assert_release (false);
      return ER_FAILED;
    }
      /* Success. */
    }
  else if (LOG_RV_RECORD_IS_UPDATE_ALL (flags))
    {
      ptr = (char *) rcv->data;
      /* Get record type. */
      record.type = OR_GET_BYTE (ptr);
      ptr += OR_BYTE_SIZE;
      /* Get record data. */
      record.data = ptr;
      record.length = rcv->length - CAST_BUFLEN (ptr - rcv->data);
      if (spage_update (thread_p, rcv->pgptr, slotid, &record) != SP_SUCCESS)
    {
      assert_release (false);
      return ER_FAILED;
    }
      /* Success */
    }
  else
    {
      assert (LOG_RV_RECORD_IS_UPDATE_PARTIAL (flags));
      /* Limited changes are done to record and updating it entirely is not the most efficient way of using log-space.
       * Only the change is logged (change location, old data size, new data size, new data). */
      /* Copy existing record. */
      record.data = PTR_ALIGN (data_buffer, MAX_ALIGNMENT);
      record.area_size = DB_PAGESIZE;
      if (spage_get_record (thread_p, rcv->pgptr, slotid, &record, COPY) != S_SUCCESS)
    {
      /* Unexpected failure. */
      assert_release (false);
      return ER_FAILED;
    }
      /* Make recorded changes. */
      error_code = log_rv_undoredo_record_partial_changes (thread_p, (char *) rcv->data, rcv->length, &record, is_undo);
      if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }

      /* Update in page. */
      if (spage_update (thread_p, rcv->pgptr, slotid, &record) != SP_SUCCESS)
    {
      /* Unexpected. */
      assert_release (false);
      return error_code;
    }
      /* Success. */
    }
  /* Page was successfully modified. */
  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);
  return NO_ERROR;
}

/*
 * log_rv_pack_redo_record_changes () - Pack recovery data for redo record
 *                  change.
 *
 * return          : Error code.
 * ptr (in)        : Where recovery data is packed.
 * offset_to_data (in) : Offset to data being modified.
 * old_data_size (in)  : Old data size.
 * new_data_size (in)  : New data size.
 * new_data (in)       : New data.
 */
char *
log_rv_pack_redo_record_changes (char *ptr, int offset_to_data, int old_data_size, int new_data_size, char *new_data)
{
  /* Assert expected arguments. */
  assert (ptr != NULL);
  assert (offset_to_data >= 0 && offset_to_data <= 0x8FFF);
  assert (old_data_size >= 0 && new_data_size >= 0);
  assert (old_data_size <= 255 && new_data_size <= 255);
  assert (new_data_size == 0 || new_data != NULL);

  ptr = PTR_ALIGN (ptr, INT_ALIGNMENT);

  OR_PUT_SHORT (ptr, (short) offset_to_data);
  ptr += OR_SHORT_SIZE;

  OR_PUT_BYTE (ptr, (INT16) old_data_size);
  ptr += OR_BYTE_SIZE;

  OR_PUT_BYTE (ptr, (INT16) new_data_size);
  ptr += OR_BYTE_SIZE;

  if (new_data_size > 0)
    {
      memcpy (ptr, new_data, new_data_size);
      ptr += new_data_size;
    }
  ptr = PTR_ALIGN (ptr, INT_ALIGNMENT);

  return ptr;
}

/*
 * log_rv_pack_undo_record_changes () - Pack recovery data for undo record
 *                  change.
 *
 * return          : Error code.
 * ptr (in)        : Where recovery data is packed.
 * offset_to_data (in) : Offset to data being modified.
 * old_data_size (in)  : Old data size.
 * new_data_size (in)  : New data size.
 * old_data (in)       : Old data.
 */
char *
log_rv_pack_undo_record_changes (char *ptr, int offset_to_data, int old_data_size, int new_data_size, char *old_data)
{
  /* Assert expected arguments. */
  assert (ptr != NULL);
  assert (offset_to_data >= 0 && offset_to_data <= 0x8FFF);
  assert (old_data_size >= 0 && new_data_size >= 0);
  assert (old_data_size <= 255 && new_data_size <= 255);
  assert (old_data_size == 0 || old_data != NULL);

  ptr = PTR_ALIGN (ptr, INT_ALIGNMENT);

  OR_PUT_SHORT (ptr, (short) offset_to_data);
  ptr += OR_SHORT_SIZE;

  OR_PUT_BYTE (ptr, (INT16) new_data_size);
  ptr += OR_BYTE_SIZE;

  OR_PUT_BYTE (ptr, (INT16) old_data_size);
  ptr += OR_BYTE_SIZE;

  if (old_data_size > 0)
    {
      memcpy (ptr, old_data, old_data_size);
      ptr += old_data_size;
    }
  ptr = PTR_ALIGN (ptr, INT_ALIGNMENT);

  return ptr;
}

/*
 * log_rv_redo_fix_page () - fix page for recovery without upfront reservation check
 *
 * return        : fixed page or NULL
 * thread_p (in) : thread entry
 * vpid_rcv (in) : page identifier
 */
STATIC_INLINE PAGE_PTR
log_rv_redo_fix_page (THREAD_ENTRY * thread_p, const VPID * vpid_rcv)
{
  PAGE_PTR page = NULL;

  assert (vpid_rcv != NULL && !VPID_ISNULL (vpid_rcv));

  // during recovery, we don't care if a page is deallocated or not, apply the changes regardless. since changes to
  // sector reservation table are applied in parallel with the changes in pages, at times the page may appear to be
  // deallocated (part of an unreserved sector). but the changes were done while the sector was reserved and must be
  // re-applied to get a correct end result.
  //
  // moreover, the sector reservation check is very expensive. running this check on every page fix costs much more
  // than any time gained by skipping redoing changes on deallocated pages.
  //
  // therefore, fix page using RECOVERY_PAGE mode. pgbuf_fix will know to accept even new or deallocated pages.
  //
  page = pgbuf_fix (thread_p, vpid_rcv, RECOVERY_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
  if (page == NULL)
    {
      // this is terrible, because it makes recovery impossible
      assert_release (false);
      return NULL;
    }
  return page;
}

static void
log_rv_simulate_runtime_worker (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
  assert (thread_p != NULL);
  if (tdes->is_active_worker_transaction ())
    {
      thread_p->tran_index = tdes->tran_index;
#if defined (SA_MODE)
      LOG_SET_CURRENT_TRAN_INDEX (thread_p, tdes->tran_index);
#endif // SA_MODE
    }
  else if (tdes->is_system_worker_transaction ())
    {
      log_system_tdes::rv_simulate_system_tdes (tdes->trid);
    }
  else
    {
      assert (false);
    }
}

static void
log_rv_end_simulation (THREAD_ENTRY * thread_p)
{
  assert (thread_p != NULL);
  thread_p->reset_system_tdes ();
  thread_p->tran_index = LOG_SYSTEM_TRAN_INDEX;
#if defined (SA_MODE)
  LOG_SET_CURRENT_TRAN_INDEX (thread_p, LOG_SYSTEM_TRAN_INDEX);
#endif // SA_MODE
}

static UINT64
log_cnt_pages_containing_lsa (const log_lsa * from_lsa, const log_lsa * to_lsa)
{
  if (*to_lsa == *from_lsa)
    {
      return 0;
    }
  else
    {
      return to_lsa->pageid - from_lsa->pageid + 1;
    }
}

/*
 * log_find_unilaterally_largest_undo_lsa - find maximum lsa address to undo
 *
 * return:
 *
 * Note: Find the maximum log sequence address to undo during the undo
 *              crash recovery phase.
 */
void
log_find_unilaterally_largest_undo_lsa (THREAD_ENTRY * thread_p, LOG_LSA & max_undo_lsa)
{
  // *INDENT-OFF*
  int i;
  LOG_TDES *tdes;       /* Transaction descriptor */

  TR_TABLE_CS_ENTER_READ_MODE (thread_p);

  LSA_SET_NULL (&max_undo_lsa);

  auto max_undo_lsa_func = [&] (log_tdes & tdes)
    {
      if (LSA_LT (&max_undo_lsa, &tdes.undo_nxlsa))
        {
          max_undo_lsa = tdes.undo_nxlsa;
        }
    };

  /* Check active transactions. */
  for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
    {
      if (i != LOG_SYSTEM_TRAN_INDEX)
        {
          tdes = log_Gl.trantable.all_tdes[i];
          if (tdes != NULL && tdes->trid != NULL_TRANID
              && (tdes->state == TRAN_UNACTIVE_UNILATERALLY_ABORTED || tdes->state == TRAN_UNACTIVE_ABORTED))
            {
              max_undo_lsa_func (*tdes);
            }
        }
    }
  /* Check system worker transactions. */
  log_system_tdes::map_all_tdes (max_undo_lsa_func);

  TR_TABLE_CS_EXIT (thread_p);
  // *INDENT-ON*
}