File flashback.c¶
File List > cubrid > src > transaction > flashback.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* flashback.c -
*/
#ident "$Id$"
#include <stdio.h>
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <time.h>
#if defined(SOLARIS)
#include <netdb.h>
#endif /* SOLARIS */
#include <sys/stat.h>
#include <assert.h>
#if defined(WINDOWS)
#include <io.h>
#endif /* WINDOWS */
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <cstdint>
#include "flashback.h"
#include "log_impl.h"
#include "log_reader.hpp"
#include "message_catalog.h"
#include "msgcat_set_log.hpp"
#include "memory_alloc.h"
#include "intl_support.h"
#include "log_record.hpp"
#include "heap_file.h"
#include "thread_entry.hpp"
#include "storage_common.h"
#include "system_parameter.h"
#include "object_representation.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
static volatile LOG_PAGEID flashback_Min_log_pageid = NULL_LOG_PAGEID; // Minumun log pageid to keep archive log volume from being removed
static CSS_CONN_ENTRY *flashback_Current_conn = NULL; // the connection entry for a flashback request
static pthread_mutex_t flashback_Conn_lock = PTHREAD_MUTEX_INITIALIZER;
/*
* flashback_is_duplicated_request - check if the caller is duplicated request for flashback
*
* need_reset (in) : if connection is lost and need_reset is true, then it reset the flashback variables
*
* return : duplicated or not
*/
static bool
flashback_is_in_progress ()
{
/* flashback_Current_conn indicates conn_entry in thread_p, and conn_entry can be reused by request handler.
* So, status in flashback_Current_conn can be overwritten. */
if (flashback_Current_conn == NULL)
{
/* previous flashback set flashback_Current_conn to NULL properly. (exited well) */
return false;
}
else
{
if (flashback_Current_conn->in_flashback == true)
{
/* previous flashback is still in progress */
return true;
}
else
{
/* - flashback_Current_conn is overwritten with new connection, so in_flashback value is initialized to false.
* - previous flashback connection has been exited abnormally. */
return false;
}
}
}
/*
* flashback_initialize - check request if is duplicated,
* then initialize variables and connection when flashback request is started
*/
int
flashback_initialize (THREAD_ENTRY * thread_p)
{
/* If multiple requests come at the same time,
* they all can be treated as non-duplicate requests.
* So, latch for check and set flashback connection is required */
pthread_mutex_lock (&flashback_Conn_lock);
if (flashback_is_in_progress ())
{
pthread_mutex_unlock (&flashback_Conn_lock);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FLASHBACK_DUPLICATED_REQUEST, 0);
return ER_FLASHBACK_DUPLICATED_REQUEST;
}
if (flashback_Current_conn != NULL)
{
flashback_Min_log_pageid = NULL_LOG_PAGEID;
flashback_Current_conn->in_flashback = false;
flashback_Current_conn = NULL;
}
flashback_Current_conn = thread_p->conn_entry;
flashback_Current_conn->in_flashback = true;
pthread_mutex_unlock (&flashback_Conn_lock);
flashback_Min_log_pageid = NULL_LOG_PAGEID;
return NO_ERROR;
}
/*
* flashback_set_min_log_pageid_to_keep - set flashback_Min_log_pageid
*/
void
flashback_set_min_log_pageid_to_keep (LOG_LSA * lsa)
{
assert (lsa != NULL);
flashback_Min_log_pageid = lsa->pageid;
}
/*
* flashback_min_log_pageid_to_keep - returns minimum log pageid to keep
*
* return : minimum pageid that flashback have to keep
*/
LOG_PAGEID
flashback_min_log_pageid_to_keep ()
{
return flashback_Min_log_pageid;
}
/*
* flashback_is_needed_to_keep_archive - check if archive log volume is required to be kept
*
* return : true or false
*/
bool
flashback_is_needed_to_keep_archive ()
{
bool is_needed = false;
if (flashback_is_in_progress ())
{
is_needed = true;
}
else
{
is_needed = false;
}
return is_needed;
}
/*
* flashback_reset - reset flashback global variables
*
*/
void
flashback_reset ()
{
flashback_Min_log_pageid = NULL_LOG_PAGEID;
pthread_mutex_lock (&flashback_Conn_lock);
flashback_Current_conn->in_flashback = false;
flashback_Current_conn = NULL;
pthread_mutex_unlock (&flashback_Conn_lock);
}
/*
* flashback_is_loginfo_generation_finished - check if it is finished to generate log infos
*
* return : true or false
*/
bool
flashback_is_loginfo_generation_finished (LOG_LSA * start_lsa, LOG_LSA * end_lsa)
{
/* The way to know whether log info generation is done depends on the direction of traversing the log record
* forward : start_lsa increases as much as log infos are generated for every request.
* So, when start_lsa becomes the same as end_lsa, it is judged to be done.
* backward : end_lsa decreases as much as log infos are generated for every request.
* So, when end_lsa meets the prev_tranlsa == NULL, then it is judged to be done.
*/
return LSA_ISNULL (end_lsa) || LSA_GE (start_lsa, end_lsa);
}
/*
* flashback_fill_dml_summary - fill the dml information into summary entry
*
* thread_p (in) : thread entry
* summary_entry (in/out) : flashback summary entry
* classoid (in) : OID of DML target class
* rec_type(in) : record type that can be classified as DML types
*/
static void
flashback_fill_dml_summary (FLASHBACK_SUMMARY_ENTRY * summary_entry, OID classoid, SUPPLEMENT_REC_TYPE rec_type)
{
assert (summary_entry != NULL);
bool found = false;
int i = 0;
/* fill in the summary_entry info entry
* 1. number of class and put the classoid into the classlist
* 2. number of DML operation */
if (summary_entry->classoid_set.find (classoid) == summary_entry->classoid_set.end ())
{
summary_entry->classoid_set.emplace (classoid);
}
switch (rec_type)
{
case LOG_SUPPLEMENT_INSERT:
case LOG_SUPPLEMENT_TRIGGER_INSERT:
summary_entry->num_insert++;
break;
case LOG_SUPPLEMENT_UPDATE:
case LOG_SUPPLEMENT_TRIGGER_UPDATE:
summary_entry->num_update++;
break;
case LOG_SUPPLEMENT_DELETE:
case LOG_SUPPLEMENT_TRIGGER_DELETE:
summary_entry->num_delete++;
break;
default:
assert (false);
break;
}
return;
}
/*
* flashback_make_summary - read log volumes within the range of start_lsa and end_lsa then make summary information
*
* return : error code
* thread_p (in) : thread entry
* context (in/out) : flashback summary context that contains range of travering logs, filter (user, class), and summary list
*/
int
flashback_make_summary_list (THREAD_ENTRY * thread_p, FLASHBACK_SUMMARY_CONTEXT * context)
{
LOG_PAGE *log_page_p = NULL;
char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
LOG_LSA cur_log_rec_lsa = LSA_INITIALIZER;
LOG_LSA next_log_rec_lsa = LSA_INITIALIZER;
LOG_LSA process_lsa = LSA_INITIALIZER;
LOG_RECORD_HEADER *log_rec_header;
int trid;
time_t current_time = 0;
FLASHBACK_SUMMARY_ENTRY *summary_entry;
bool found = false;
int error = NO_ERROR;
LOG_RECTYPE log_type;
char *supplement_data = NULL;
int supplement_alloc_length = 0;
log_page_p = (LOG_PAGE *) PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
LOG_TDES *tdes = LOG_FIND_CURRENT_TDES (thread_p);
assert (!LSA_ISNULL (&context->end_lsa));
LSA_COPY (&process_lsa, &(context->end_lsa));
/*fetch log page */
error = logpb_fetch_page (thread_p, &process_lsa, LOG_CS_SAFE_READER, log_page_p);
if (error != NO_ERROR)
{
logpb_fatal_error (thread_p, false, ARG_FILE_LINE, "flashback_make_summary_list");
goto exit;
}
while (LSA_GE (&process_lsa, &context->start_lsa))
{
if (tdes->interrupt == true)
{
error = ER_INTERRUPTED;
goto exit;
}
LSA_COPY (&cur_log_rec_lsa, &process_lsa);
log_rec_header = LOG_GET_LOG_RECORD_HEADER (log_page_p, &process_lsa);
log_type = log_rec_header->type;
trid = log_rec_header->trid;
LSA_COPY (&next_log_rec_lsa, &log_rec_header->back_lsa);
if (LSA_ISNULL (&log_rec_header->prev_tranlsa))
{
FLASHBACK_CHECK_AND_GET_SUMMARY (context->summary_list, trid, summary_entry);
if (summary_entry != NULL)
{
/* current time is updated whenever a log record with a time value is encountered.
* this function reads the log records in reverse time order,
* so the current_time represents a time in the future than the log being read now
* Since LOG_DUMMY_HA_SERVER_STATE is logged every second, start_time is set to 1 second before the current_time */
summary_entry->start_time = current_time - 1;
LSA_COPY (&summary_entry->start_lsa, &cur_log_rec_lsa);
}
}
LOG_READ_ADD_ALIGN (thread_p, sizeof (*log_rec_header), &process_lsa, log_page_p);
switch (log_type)
{
case LOG_COMMIT:
{
LOG_REC_DONETIME *donetime;
FLASHBACK_SUMMARY_ENTRY tmp_summary_entry = {
-1, "\0", 0, 0, 0, 0, 0, LSA_INITIALIZER, LSA_INITIALIZER, std::unordered_set < OID > {}
};
if (context->num_summary == FLASHBACK_MAX_NUM_TRAN_TO_SUMMARY)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FLASHBACK_EXCEED_MAX_NUM_TRAN_TO_SUMMARY, 1,
FLASHBACK_MAX_NUM_TRAN_TO_SUMMARY);
error = ER_FLASHBACK_EXCEED_MAX_NUM_TRAN_TO_SUMMARY;
goto exit;
}
LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*donetime), &process_lsa, log_page_p);
donetime = (LOG_REC_DONETIME *) (log_page_p->area + process_lsa.offset);
tmp_summary_entry.trid = trid;
LSA_COPY (&tmp_summary_entry.end_lsa, &cur_log_rec_lsa);
tmp_summary_entry.end_time = donetime->at_time;
context->summary_list.emplace (trid, tmp_summary_entry);
current_time = donetime->at_time;
context->num_summary++;
break;
}
case LOG_DUMMY_HA_SERVER_STATE:
{
LOG_REC_HA_SERVER_STATE *ha_dummy;
LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*ha_dummy), &process_lsa, log_page_p);
ha_dummy = (LOG_REC_HA_SERVER_STATE *) (log_page_p->area + process_lsa.offset);
current_time = ha_dummy->at_time;
break;
}
case LOG_SUPPLEMENTAL_INFO:
{
LOG_REC_SUPPLEMENT *supplement;
int supplement_length;
SUPPLEMENT_REC_TYPE rec_type;
RECDES supp_recdes = RECDES_INITIALIZER;
OID classoid;
FLASHBACK_CHECK_AND_GET_SUMMARY (context->summary_list, trid, summary_entry);
if (summary_entry == NULL)
{
break;
}
LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*supplement), &process_lsa, log_page_p);
supplement = (LOG_REC_SUPPLEMENT *) (log_page_p->area + process_lsa.offset);
supplement_length = supplement->length;
rec_type = supplement->rec_type;
LOG_READ_ADD_ALIGN (thread_p, sizeof (*supplement), &process_lsa, log_page_p);
if (cdc_get_undo_record (thread_p, log_page_p, cur_log_rec_lsa, &supp_recdes) != S_SUCCESS)
{
/* er_set */
error = ER_FAILED;
goto exit;
}
supplement_length = sizeof (supp_recdes.type) + supp_recdes.length;
if (supplement_length > supplement_alloc_length)
{
char *tmp = (char *) db_private_realloc (thread_p, supplement_data, supplement_length);
if (tmp == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, supplement_length);
error = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit;
}
else
{
supplement_data = tmp;
supplement_alloc_length = supplement_length;
}
}
CDC_MAKE_SUPPLEMENT_DATA (supplement_data, supp_recdes);
free_and_init (supp_recdes.data);
switch (rec_type)
{
case LOG_SUPPLEMENT_TRAN_USER:
{
/* user filtering */
if (strlen (context->user) != 0
&& intl_identifier_ncasecmp (context->user, supplement_data, supplement_length) != 0)
{
context->summary_list.erase (trid);
context->num_summary--;
}
else
{
if (strlen (summary_entry->user) == 0)
{
strncpy (summary_entry->user, supplement_data, supplement_length);
}
}
break;
}
case LOG_SUPPLEMENT_INSERT:
case LOG_SUPPLEMENT_TRIGGER_INSERT:
case LOG_SUPPLEMENT_UPDATE:
case LOG_SUPPLEMENT_TRIGGER_UPDATE:
case LOG_SUPPLEMENT_DELETE:
case LOG_SUPPLEMENT_TRIGGER_DELETE:
{
memcpy (&classoid, supplement_data, sizeof (OID));
assert (oid_is_system_class (&classoid) == false);
// *INDENT-OFF*
if (std::find (context->classoids.begin(), context->classoids.end(), classoid) == context->classoids.end())
{
break;
}
// *INDENT-ON*
flashback_fill_dml_summary (summary_entry, classoid, rec_type);
break;
}
default:
break;
}
}
default:
break;
}
LSA_COPY (&process_lsa, &next_log_rec_lsa);
if (process_lsa.pageid != log_page_p->hdr.logical_pageid)
{
error = logpb_fetch_page (thread_p, &process_lsa, LOG_CS_SAFE_READER, log_page_p);
if (error != NO_ERROR)
{
logpb_fatal_error (thread_p, false, ARG_FILE_LINE, "flashback_make_summary_list");
goto exit;
}
}
}
exit:
if (supplement_data != NULL)
{
db_private_free_and_init (thread_p, supplement_data);
}
return error;
}
/*
* flashback_verify_time () - verify the availablity of log records around the 'start_time' and 'end_time'
*
* return : error_code
* thread_p (in) : thread entry
* start_time (in) : start point of flashback.
* end_time (in) : end point of flashback
* start_lsa (out) : LSA near the 'start_time'
* end_lsa (out) : LSA near the 'end_time'
*/
int
flashback_verify_time (THREAD_ENTRY * thread_p, time_t * start_time, time_t * end_time, LOG_LSA * start_lsa,
LOG_LSA * end_lsa)
{
int error_code = NO_ERROR;
time_t ret_time = 0;
time_t current_time = time (NULL);
/* 1. Check time value statically */
if (*start_time > current_time || *end_time <= log_Gl.hdr.db_creation)
{
char start_date[20];
char db_creation_date[20];
char cur_date[20];
strftime (start_date, 20, "%d-%m-%Y:%H:%M:%S", localtime (start_time));
strftime (db_creation_date, 20, "%d-%m-%Y:%H:%M:%S", localtime (&log_Gl.hdr.db_creation));
strftime (cur_date, 20, "%d-%m-%Y:%H:%M:%S", localtime (¤t_time));
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_FLASHBACK_INVALID_TIME, 3, start_date, db_creation_date,
cur_date);
return ER_FLASHBACK_INVALID_TIME;
}
if (*start_time < log_Gl.hdr.db_creation)
{
/* If the start time received from the user is earlier than the db_creation,
* set the start time to log_Gl.hdr.db_creation.
* Since it was checked above that end_time must be greater than db_creation, even if start_time is set to db_creation,
* it can be confirmed that start_time is greater than end_time.
*/
*start_time = log_Gl.hdr.db_creation;
}
/* 2. Check if there is log record at start_time */
ret_time = *start_time;
error_code = cdc_find_lsa (thread_p, &ret_time, start_lsa);
if (error_code == NO_ERROR || error_code == ER_CDC_ADJUSTED_LSA)
{
/* find log record at the time
* ret_time : time of the commit record or dummy record that is found to be greater than or equal to the start_time at first */
if (ret_time >= *end_time)
{
char start_date[20];
char db_creation_date[20];
strftime (start_date, 20, "%d-%m-%Y:%H:%M:%S", localtime (start_time));
strftime (db_creation_date, 20, "%d-%m-%Y:%H:%M:%S", localtime (&log_Gl.hdr.db_creation));
/* out of range : start_time (ret_time) can not be greater than end_time */
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_FLASHBACK_INVALID_TIME, 3, start_date,
db_creation_date, start_date);
return ER_FLASHBACK_INVALID_TIME;
}
}
else
{
/* failed to find a log at the time due to failure while reading log page or volume (ER_FAILED, ER_LOG_READ) */
return error_code;
}
*start_time = ret_time;
/* 3. If start_time is valid, then get end_lsa with end_time */
error_code = cdc_find_lsa (thread_p, end_time, end_lsa);
if (!(error_code == NO_ERROR || error_code == ER_CDC_ADJUSTED_LSA))
{
/* failed to find a log record */
return error_code;
}
return NO_ERROR;
}
/*
* flashback_pack_summary_entry ()
*
* return : memory pointer after packing summary entries
* ptr (in) : memory pointer where to pack summary entries
* context (in) : context which contains summary entry list
*/
char *
flashback_pack_summary_entry (char *ptr, FLASHBACK_SUMMARY_CONTEXT context, int *num_summary)
{
FLASHBACK_SUMMARY_ENTRY entry;
// *INDENT-OFF*
for (auto iter : context.summary_list)
{
entry = iter.second;
if (entry.num_insert + entry.num_update + entry.num_delete > 0)
{
ptr = or_pack_int (ptr, entry.trid);
ptr = or_pack_string (ptr, entry.user);
ptr = or_pack_int64 (ptr, entry.start_time);
ptr = or_pack_int64 (ptr, entry.end_time);
ptr = or_pack_int (ptr, entry.num_insert);
ptr = or_pack_int (ptr, entry.num_update);
ptr = or_pack_int (ptr, entry.num_delete);
ptr = or_pack_log_lsa (ptr, &entry.start_lsa);
ptr = or_pack_log_lsa (ptr, &entry.end_lsa);
ptr = or_pack_int (ptr, entry.classoid_set.size());
for (auto item : entry.classoid_set)
{
ptr = or_pack_oid (ptr, &item);
}
(*num_summary)++;
}
}
// *INDENT-ON*
return ptr;
}
/*
* flashback_pack_loginfo () -
*
* return: memory pointer after putting log info
*
* thread_p(in):
* ptr(in): memory pointer where to pack sequence of log info
* context(in): context contains log infos to pack
*
*/
char *
flashback_pack_loginfo (THREAD_ENTRY * thread_p, char *ptr, FLASHBACK_LOGINFO_CONTEXT context)
{
CDC_LOGINFO_ENTRY *entry;
for (int i = 0; i < context.num_loginfo; i++)
{
ptr = PTR_ALIGN (ptr, MAX_ALIGNMENT);
// *INDENT-OFF*
entry = context.loginfo_queue.front ();
// *INDENT-ON*
memcpy (ptr, PTR_ALIGN (entry->log_info, MAX_ALIGNMENT), entry->length);
ptr = ptr + entry->length;
free_and_init (entry->log_info);
db_private_free_and_init (thread_p, entry);
// *INDENT-OFF*
context.loginfo_queue.pop ();
// *INDENT-ON*
}
return ptr;
}
static int
flashback_find_start_lsa (THREAD_ENTRY * thread_p, FLASHBACK_LOGINFO_CONTEXT * context)
{
LOG_PAGE *log_page_p = NULL;
char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
int error = NO_ERROR;
LOG_LSA process_lsa = LSA_INITIALIZER;
LOG_LSA cur_log_rec_lsa = LSA_INITIALIZER;
LOG_RECORD_HEADER *log_rec_header;
log_page_p = (LOG_PAGE *) PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
log_page_p->hdr.logical_pageid = NULL_LOG_PAGEID;
assert (!LSA_ISNULL (&context->end_lsa));
LSA_COPY (&process_lsa, &context->end_lsa);
while (!LSA_ISNULL (&process_lsa))
{
/* fetch page from archive or active, if previous page is needed */
if (log_page_p->hdr.logical_pageid != process_lsa.pageid)
{
if (logpb_is_page_in_archive (process_lsa.pageid))
{
LOG_CS_ENTER_READ_MODE (thread_p);
if (logpb_fetch_from_archive (thread_p, process_lsa.pageid, log_page_p, 0, NULL, false) == NULL)
{
/* archive log volume has been removed */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FLASHBACK_LOG_NOT_EXIST, 2, LSA_AS_ARGS (&process_lsa));
error = ER_FLASHBACK_LOG_NOT_EXIST;
LOG_CS_EXIT (thread_p);
goto error;
}
LOG_CS_EXIT (thread_p);
}
else
{
error = logpb_fetch_page (thread_p, &process_lsa, LOG_CS_SAFE_READER, log_page_p);
if (error != NO_ERROR)
{
logpb_fatal_error (thread_p, false, ARG_FILE_LINE, "flashback_find_start_lsa");
goto error;
}
}
}
log_rec_header = LOG_GET_LOG_RECORD_HEADER (log_page_p, &process_lsa);
LSA_COPY (&cur_log_rec_lsa, &process_lsa);
LSA_COPY (&process_lsa, &log_rec_header->prev_tranlsa);
}
LSA_COPY (&context->start_lsa, &cur_log_rec_lsa);
return error;
error:
return error;
}
int
flashback_make_loginfo (THREAD_ENTRY * thread_p, FLASHBACK_LOGINFO_CONTEXT * context)
{
LOG_LSA cur_log_rec_lsa = LSA_INITIALIZER;
LOG_LSA next_log_rec_lsa = LSA_INITIALIZER;
LOG_LSA process_lsa = LSA_INITIALIZER;
LOG_PAGE *log_page_p = NULL;
char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
LOG_RECORD_HEADER *log_rec_header;
int trid;
int error = NO_ERROR;
LOG_RECTYPE log_type;
char *supplement_data = NULL;
int supplement_alloc_length = 0;
RECDES supp_recdes = RECDES_INITIALIZER;
RECDES undo_recdes = RECDES_INITIALIZER;
RECDES redo_recdes = RECDES_INITIALIZER;
int num_loginfo = 0;
CDC_LOGINFO_ENTRY *log_info_entry = NULL;
OID classoid;
LOG_TDES *tdes = LOG_FIND_CURRENT_TDES (thread_p);
if (LSA_ISNULL (&context->start_lsa))
{
error = flashback_find_start_lsa (thread_p, context);
if (error != NO_ERROR)
{
goto error;
}
/* if start_lsa was NULL at the caller, flashback_min_log_pageid was not set at the caller */
flashback_set_min_log_pageid_to_keep (&context->start_lsa);
}
if (context->forward)
{
LSA_COPY (&process_lsa, &context->start_lsa);
}
else
{
LSA_COPY (&process_lsa, &context->end_lsa);
}
LSA_COPY (&cur_log_rec_lsa, &process_lsa);
log_page_p = (LOG_PAGE *) PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
/* fetch log page */
error = logpb_fetch_page (thread_p, &process_lsa, LOG_CS_SAFE_READER, log_page_p);
if (error != NO_ERROR)
{
logpb_fatal_error (thread_p, false, ARG_FILE_LINE, "flashback_make_loginfo");
goto error;
}
/* backward : if prev_tranlsa is null, then finish the while loop (process_lsa = prev_tranlsa)
* forward : if forw_lsa is greater than context->end_lsa then quit (process_lsa = forw_lsa)
* num_loginfo will be generated less or equal than context->num_loginfo */
while (!LSA_ISNULL (&process_lsa) && (LSA_LE (&process_lsa, &context->end_lsa) || num_loginfo < context->num_loginfo))
{
if (tdes->interrupt == true)
{
error = ER_INTERRUPTED;
goto error;
}
if (log_page_p->hdr.logical_pageid != process_lsa.pageid)
{
error = logpb_fetch_page (thread_p, &process_lsa, LOG_CS_SAFE_READER, log_page_p);
if (error != NO_ERROR)
{
logpb_fatal_error (thread_p, false, ARG_FILE_LINE, "flashback_make_loginfo");
goto error;
}
if (process_lsa.offset == NULL_OFFSET && (process_lsa.offset = log_page_p->hdr.offset) == NULL_OFFSET)
{
/* can not find the first log record in the page
* goto next page, this occurs only when direction is forward */
process_lsa.pageid++;
continue;
}
}
log_rec_header = LOG_GET_LOG_RECORD_HEADER (log_page_p, &process_lsa);
log_type = log_rec_header->type;
trid = log_rec_header->trid;
if (context->forward)
{
LSA_COPY (&next_log_rec_lsa, &log_rec_header->forw_lsa);
if (LSA_ISNULL (&next_log_rec_lsa) && logpb_is_page_in_archive (cur_log_rec_lsa.pageid))
{
next_log_rec_lsa.pageid = cur_log_rec_lsa.pageid + 1;
}
}
else
{
LSA_COPY (&next_log_rec_lsa, &log_rec_header->prev_tranlsa);
}
LOG_READ_ADD_ALIGN (thread_p, sizeof (*log_rec_header), &process_lsa, log_page_p);
if (trid == context->trid && log_type == LOG_SUPPLEMENTAL_INFO)
{
LOG_REC_SUPPLEMENT *supplement;
int supplement_length;
SUPPLEMENT_REC_TYPE rec_type;
LOG_LSA undo_lsa, redo_lsa;
LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*supplement), &process_lsa, log_page_p);
supplement = (LOG_REC_SUPPLEMENT *) (log_page_p->area + process_lsa.offset);
supplement_length = supplement->length;
rec_type = supplement->rec_type;
LOG_READ_ADD_ALIGN (thread_p, sizeof (*supplement), &process_lsa, log_page_p);
if (cdc_get_undo_record (thread_p, log_page_p, cur_log_rec_lsa, &supp_recdes) != S_SUCCESS)
{
/* TODO:
* after refactor cdc_get_undo_record, then remove error setting
* modify cdc_get_undo_record() to return error, not scan code
* */
error = ER_FAILED;
goto error;
}
supplement_length = sizeof (supp_recdes.type) + supp_recdes.length;
if (supplement_length > supplement_alloc_length)
{
char *tmp = (char *) db_private_realloc (thread_p, supplement_data, supplement_length);
if (tmp == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, supplement_length);
error = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
else
{
supplement_data = tmp;
supplement_alloc_length = supplement_length;
}
}
/* will be replaced with CDC_MAKE_SUPPLEMENT_DATA */
memcpy (supplement_data, &supp_recdes.type, sizeof (supp_recdes.type));
memcpy (supplement_data + sizeof (supp_recdes.type), supp_recdes.data, supp_recdes.length);
free_and_init (supp_recdes.data);
switch (rec_type)
{
case LOG_SUPPLEMENT_INSERT:
case LOG_SUPPLEMENT_TRIGGER_INSERT:
memcpy (&classoid, supplement_data, sizeof (OID));
/* classoid filter */
if (context->classoid_set.find (classoid) == context->classoid_set.end ())
{
break;
}
memcpy (&redo_lsa, supplement_data + sizeof (OID), sizeof (LOG_LSA));
error = cdc_get_recdes (thread_p, NULL, NULL, &redo_lsa, &redo_recdes, true);
if (error != NO_ERROR)
{
goto error;
}
log_info_entry = (CDC_LOGINFO_ENTRY *) db_private_alloc (thread_p, sizeof (CDC_LOGINFO_ENTRY));
if (log_info_entry == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (CDC_LOGINFO_ENTRY));
error = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
error =
cdc_make_dml_loginfo (thread_p, trid, context->user,
rec_type == LOG_SUPPLEMENT_INSERT ? CDC_INSERT : CDC_TRIGGER_INSERT, classoid,
NULL, &redo_recdes, log_info_entry, true);
if (error != NO_ERROR)
{
goto error;
}
context->loginfo_queue.push (log_info_entry);
context->queue_size += log_info_entry->length;
num_loginfo++;
log_info_entry = NULL;
break;
case LOG_SUPPLEMENT_UPDATE:
case LOG_SUPPLEMENT_TRIGGER_UPDATE:
memcpy (&classoid, supplement_data, sizeof (OID));
/* classoid filter */
if (context->classoid_set.find (classoid) == context->classoid_set.end ())
{
break;
}
memcpy (&undo_lsa, supplement_data + sizeof (OID), sizeof (LOG_LSA));
memcpy (&redo_lsa, supplement_data + sizeof (OID) + sizeof (LOG_LSA), sizeof (LOG_LSA));
error = cdc_get_recdes (thread_p, &undo_lsa, &undo_recdes, &redo_lsa, &redo_recdes, true);
if (error != NO_ERROR)
{
goto error;
}
log_info_entry = (CDC_LOGINFO_ENTRY *) db_private_alloc (thread_p, sizeof (CDC_LOGINFO_ENTRY));
if (log_info_entry == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (CDC_LOGINFO_ENTRY));
error = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
if (undo_recdes.type == REC_ASSIGN_ADDRESS)
{
/* This occurs when series of logs are appended like
* INSERT record for reserve OID (REC_ASSIGN_ADDRESS) then UPDATE to some record.
* And this is a sequence for INSERT a record with OID reservation.
* undo record with REC_ASSIGN_ADDRESS type has no undo image to extract, so this will be treated as INSERT
* CUBRID engine used to do INSERT a record like this way,
* for instance CREATE a class or INSERT a record by trigger execution */
assert (rec_type == LOG_SUPPLEMENT_TRIGGER_UPDATE);
error =
cdc_make_dml_loginfo (thread_p, trid, context->user, CDC_TRIGGER_INSERT, classoid, NULL,
&redo_recdes, log_info_entry, true);
}
else
{
error =
cdc_make_dml_loginfo (thread_p, trid, context->user,
rec_type == LOG_SUPPLEMENT_UPDATE ? CDC_UPDATE : CDC_TRIGGER_UPDATE, classoid,
&undo_recdes, &redo_recdes, log_info_entry, true);
}
if (error != NO_ERROR)
{
goto error;
}
context->loginfo_queue.push (log_info_entry);
context->queue_size += log_info_entry->length;
num_loginfo++;
log_info_entry = NULL;
break;
case LOG_SUPPLEMENT_DELETE:
case LOG_SUPPLEMENT_TRIGGER_DELETE:
memcpy (&classoid, supplement_data, sizeof (OID));
/* classoid filter */
if (context->classoid_set.find (classoid) == context->classoid_set.end ())
{
break;
}
memcpy (&undo_lsa, supplement_data + sizeof (OID), sizeof (LOG_LSA));
error = cdc_get_recdes (thread_p, &undo_lsa, &undo_recdes, NULL, NULL, true);
if (error != NO_ERROR)
{
goto error;
}
log_info_entry = (CDC_LOGINFO_ENTRY *) db_private_alloc (thread_p, sizeof (CDC_LOGINFO_ENTRY));
if (log_info_entry == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (CDC_LOGINFO_ENTRY));
error = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
error =
cdc_make_dml_loginfo (thread_p, trid, context->user,
rec_type == LOG_SUPPLEMENT_DELETE ? CDC_DELETE : CDC_TRIGGER_DELETE, classoid,
&undo_recdes, NULL, log_info_entry, true);
if (error != NO_ERROR)
{
goto error;
}
context->loginfo_queue.push (log_info_entry);
context->queue_size += log_info_entry->length;
num_loginfo++;
log_info_entry = NULL;
break;
default:
break;
}
}
LSA_COPY (&process_lsa, &next_log_rec_lsa);
LSA_COPY (&cur_log_rec_lsa, &process_lsa);
if (undo_recdes.data != NULL)
{
free_and_init (undo_recdes.data);
}
if (redo_recdes.data != NULL)
{
free_and_init (redo_recdes.data);
}
}
/* end of making log info */
if (context->forward)
{
LSA_COPY (&context->start_lsa, &process_lsa);
}
else
{
LSA_COPY (&context->end_lsa, &process_lsa);
}
context->num_loginfo = num_loginfo;
if (supplement_data != NULL)
{
db_private_free_and_init (thread_p, supplement_data);
}
return error;
error:
if (supplement_data != NULL)
{
db_private_free_and_init (thread_p, supplement_data);
}
if (undo_recdes.data != NULL)
{
free_and_init (undo_recdes.data);
}
if (redo_recdes.data != NULL)
{
free_and_init (redo_recdes.data);
}
if (log_info_entry != NULL)
{
db_private_free_and_init (thread_p, log_info_entry);
}
if (error == ER_FLASHBACK_SCHEMA_CHANGED)
{
COPY_OID (&context->invalid_class, &classoid);
}
return error;
}