File log_writer.c¶
File List > cubrid > src > transaction > log_writer.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* log_writer.c -
*/
#ident "$Id$"
#include "config.h"
#include <assert.h>
#include <errno.h>
#if !defined(WINDOWS)
#include <dirent.h>
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
#include "sys/socket.h"
#include "sys/un.h"
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
#endif /* !WINDOWNS */
#include <signal.h>
#include <time.h>
#include "log_writer.h"
#include "error_manager.h"
#include "message_catalog.h"
#include "msgcat_set_log.hpp"
#include "object_representation.h"
#include "system_parameter.h"
#include "connection_support.hpp"
#include "log_applier.h"
#include "log_storage.hpp"
#include "log_volids.hpp"
#include "crypt_opfunc.h"
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
#include "tde.h"
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
#if defined(SERVER_MODE)
#include "log_append.hpp"
#include "log_manager.h"
#include "server_support.h"
#include "network_interface_sr.h"
#else /* !defined(SERVER_MODE) */
#include "network_interface_cl.h"
#endif /* !defined(SERVER_MODE) */
#if !defined(WINDOWS)
#include "heartbeat.h"
#endif
#include "string_buffer.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#define LOGWR_THREAD_SUSPEND_TIMEOUT 10
#define LOGWR_COPY_LOG_BUFFER_NPAGES LOGPB_BUFFER_NPAGES_LOWER
static int prev_ha_server_state = HA_SERVER_STATE_NA;
static bool logwr_need_shutdown = false;
typedef struct log_bgarv_header LOG_BGARV_HEADER;
struct log_bgarv_header
{ /* Background log archive header information */
char magic[CUBRID_MAGIC_MAX_LENGTH];
INT32 dummy;
INT64 db_creation;
LOG_PAGEID start_page_id;
LOG_PAGEID current_page_id;
LOG_PAGEID last_sync_pageid;
};
enum HEADER_FETCH_MODE
{
NORMAL_FETCH_MODE = 0,
CHECK_FORMATTED_PAGE
};
#define logwr_er_log(...) if (prm_get_bool_value (PRM_ID_DEBUG_LOGWR)) _er_log_debug (ARG_FILE_LINE, __VA_ARGS__)
static int logwr_check_page_checksum (THREAD_ENTRY * thread_p, LOG_PAGE * log_pgptr);
#if defined(CS_MODE)
static log_header
init_cs_logwr_header ()
{
log_header hdr;
hdr.next_trid = NULL_TRANID;
hdr.nxarv_pageid = NULL_PAGEID;
hdr.nxarv_phy_pageid = NULL_PAGEID;
hdr.nxarv_num = -1;
hdr.last_arv_num_for_syscrashes = -1;
hdr.last_deleted_arv_num = -1;
return hdr;
}
// *INDENT-OFF*
LOGWR_GLOBAL logwr_Gl = {
/* log header */
init_cs_logwr_header (),
/* loghdr_pgptr */
NULL,
/* db_name */
{'0'},
/* hostname */
NULL,
/* log_path */
{'0'},
/* loginf_path */
{'0'},
/* active_name */
{'0'},
/* append_vdes */
NULL_VOLDES,
/* logpg_area */
NULL,
/* logpg_area_size */
0,
/* logpg_fill_size */
0,
/* toflush */
NULL,
/* max_toflush */
0,
/* num_toflush */
0,
/* mode */
LOGWR_MODE_ASYNC,
/* action */
LOGWR_ACTION_NONE,
/* last_chkpt_pageid */
NULL_PAGEID,
/* last_recv_pageid */
NULL_PAGEID,
/* last_arv_fpageid */
NULL_PAGEID,
/* last_arv_lpageid */
NULL_PAGEID,
/* last_arv_num */
-1,
/* force_flush */
false,
/* last_flush_time */
{0, 0},
/* background archiving info */
background_archiving_info (),
/* bg_archive_name */
{'0'},
/* ori_nxarv_pageid */
NULL_PAGEID,
/* start_pageid */
-2,
/* reinit_copylog */
false
};
// *INDENT-ON*
static int logwr_fetch_header_page (LOG_PAGE * log_pgptr, int vol_fd, HEADER_FETCH_MODE mode = NORMAL_FETCH_MODE);
static int logwr_read_log_header (void);
static int logwr_read_bgarv_log_header (void);
static int logwr_initialize (const char *db_name, const char *log_path, int mode, LOG_PAGEID start_pageid);
static void logwr_finalize (void);
static LOG_PAGE **logwr_writev_append_pages (LOG_PAGE ** to_flush, DKNPAGES npages);
static int logwr_flush_all_append_pages (void);
static int logwr_archive_active_log (void);
static int logwr_flush_bgarv_header_page (void);
static void logwr_reinit_copylog (void);
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
static int logwr_load_tde (void);
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
/*
* logwr_to_physical_pageid -
*
* return:
* logical_pageid(in):
* Note:
*/
LOG_PHY_PAGEID
logwr_to_physical_pageid (LOG_PAGEID logical_pageid)
{
LOG_PHY_PAGEID phy_pageid;
if (logical_pageid == LOGPB_HEADER_PAGE_ID)
{
phy_pageid = 0;
}
else
{
LOG_PAGEID tmp_pageid;
tmp_pageid = logical_pageid - logwr_Gl.hdr.fpageid;
if (tmp_pageid >= logwr_Gl.hdr.npages)
{
tmp_pageid %= logwr_Gl.hdr.npages;
}
else if (tmp_pageid < 0)
{
tmp_pageid = (logwr_Gl.hdr.npages - ((-tmp_pageid) % logwr_Gl.hdr.npages));
}
tmp_pageid++;
if (tmp_pageid > logwr_Gl.hdr.npages)
{
tmp_pageid %= logwr_Gl.hdr.npages;
}
assert (tmp_pageid <= PAGEID_MAX);
phy_pageid = (LOG_PHY_PAGEID) tmp_pageid;
}
return phy_pageid;
}
/*
* logwr_fetch_header_page -
*
* return:
* log_pgptr(out):
* vol_fd(in):
* mode(in):
*
* Note: if page contents are unexpected :
* - if is just formatted and mode is CHECK_FORMATTED_PAGE, error code ER_DISK_INCONSISTENT_VOL_HEADER
* is returned (no error is set);
* - otherwise ER_LOG_PAGE_CORRUPTED is returned and error is set
*
*/
static int
logwr_fetch_header_page (LOG_PAGE * log_pgptr, int vol_fd, HEADER_FETCH_MODE mode)
{
LOG_PAGEID pageid;
LOG_PHY_PAGEID phy_pageid;
assert (log_pgptr != NULL);
/*
* Page is contained in the active log.
* Find the corresponding physical page and read the page form disk.
*/
pageid = LOGPB_HEADER_PAGE_ID;
phy_pageid = logwr_to_physical_pageid (pageid);
if (fileio_read (NULL, vol_fd, log_pgptr, phy_pageid, LOG_PAGESIZE) == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_READ, 3, pageid, phy_pageid, logwr_Gl.active_name);
return ER_LOG_READ;
}
else
{
if (log_pgptr->hdr.logical_pageid != pageid)
{
if (mode == CHECK_FORMATTED_PAGE && fileio_is_formatted_page (NULL, (char *) log_pgptr))
{
/* set error outside this function */
return ER_DISK_INCONSISTENT_VOL_HEADER;
}
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_PAGE_CORRUPTED, 1, pageid);
return ER_LOG_PAGE_CORRUPTED;
}
}
return NO_ERROR;
}
/*
* logwr_read_log_header -
*
* return:
* Note:
*/
static int
logwr_read_log_header (void)
{
char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
char *aligned_log_pgbuf;
LOG_PAGE *log_pgptr;
int error = NO_ERROR;
aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;
if (!fileio_is_volume_exist (logwr_Gl.active_name))
{
/* Delay to create a new log file until it gets the log header page from server because we need to know the
* npages to create an log file */
;
}
else
{
/* Mount the active log and read the log header */
logwr_Gl.append_vdes =
fileio_mount (NULL, logwr_Gl.db_name, logwr_Gl.active_name, LOG_DBLOG_ACTIVE_VOLID, true, false);
if (logwr_Gl.append_vdes == NULL_VOLDES)
{
/* Unable to mount the active log */
return ER_IO_MOUNT_FAIL;
}
else
{
error = logwr_fetch_header_page (log_pgptr, logwr_Gl.append_vdes, CHECK_FORMATTED_PAGE);
if (error == ER_DISK_INCONSISTENT_VOL_HEADER)
{
/* the page appears to be just formatted (previous instance of log writter was stopped before appending
* expected data; in this case just notify and delete the volume:
* the behavior will be the same as `if (!fileio_is_volume_exist (logwr_Gl.active_name)) branch` */
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_DISK_INCONSISTENT_VOL_HEADER, 1,
logwr_Gl.active_name);
fileio_dismount_without_fsync (NULL, LOG_DBLOG_ACTIVE_VOLID);
fileio_unformat (NULL, logwr_Gl.active_name);
er_clear ();
return NO_ERROR;
}
if (error != NO_ERROR)
{
return error;
}
memcpy (&logwr_Gl.hdr, log_pgptr->area, sizeof (LOG_HEADER));
assert (log_pgptr->hdr.logical_pageid == LOGPB_HEADER_PAGE_ID);
assert (log_pgptr->hdr.offset == NULL_OFFSET);
}
}
return NO_ERROR;
}
/*
* logwr_read_bgarv_log_header -
*
* return:
* Note:
*/
static int
logwr_read_bgarv_log_header (void)
{
char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
char *aligned_log_pgbuf;
LOG_PAGE *log_pgptr;
LOG_BGARV_HEADER *bgarv_header;
BACKGROUND_ARCHIVING_INFO *bg_arv_info;
int error = NO_ERROR;
bg_arv_info = &logwr_Gl.bg_archive_info;
aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;
assert (bg_arv_info->vdes != NULL_VOLDES);
error = logwr_fetch_header_page (log_pgptr, bg_arv_info->vdes);
if (error != NO_ERROR)
{
return error;
}
assert (log_pgptr->hdr.logical_pageid == LOGPB_HEADER_PAGE_ID);
assert (log_pgptr->hdr.offset == NULL_OFFSET);
bgarv_header = (LOG_BGARV_HEADER *) log_pgptr->area;
if (strncmp (bgarv_header->magic, CUBRID_MAGIC_LOG_ARCHIVE, CUBRID_MAGIC_MAX_LENGTH) != 0)
{
/* magic is different */
return ER_LOG_INCOMPATIBLE_DATABASE;
}
bg_arv_info->start_page_id = bgarv_header->start_page_id;
bg_arv_info->current_page_id = bgarv_header->current_page_id;
bg_arv_info->last_sync_pageid = bgarv_header->last_sync_pageid;
return NO_ERROR;
}
/*
* la_shutdown_by_signal() - When the process catches the SIGTERM signal,
* it does the shutdown process.
* return: none
*
* Note:
* set the "logwr_need_shutdown" flag as true, then each threads would
* process "shutdown"
*/
static void
logwr_shutdown_by_signal (int)
{
logwr_need_shutdown = true;
return;
}
bool
logwr_force_shutdown (void)
{
return (logwr_need_shutdown) ? true : false;
}
/*
* logwr_initialize - Initialize logwr_Gl structure
*
* return:
*
* db_name(in):
* log_path(in):
* mode(in):
*
* Note:
*/
static int
logwr_initialize (const char *db_name, const char *log_path, int mode, LOG_PAGEID start_pageid)
{
int log_nbuffers;
int error;
char *at_char = NULL;
/* signal processing */
#if defined(WINDOWS)
(void) os_set_signal_handler (SIGABRT, logwr_shutdown_by_signal);
(void) os_set_signal_handler (SIGINT, logwr_shutdown_by_signal);
(void) os_set_signal_handler (SIGTERM, logwr_shutdown_by_signal);
#else /* ! WINDOWS */
(void) os_set_signal_handler (SIGSTOP, logwr_shutdown_by_signal);
(void) os_set_signal_handler (SIGTERM, logwr_shutdown_by_signal);
(void) os_set_signal_handler (SIGPIPE, SIG_IGN);
#endif /* ! WINDOWS */
/* set the db name and log path */
strncpy (logwr_Gl.db_name, db_name, PATH_MAX - 1);
if ((at_char = strchr (logwr_Gl.db_name, '@')) != NULL)
{
*at_char = '\0';
logwr_Gl.hostname = at_char + 1;
}
strncpy (logwr_Gl.log_path, log_path, PATH_MAX - 1);
/* set the mode */
logwr_Gl.mode = (LOGWR_MODE) mode;
/* set the active log file path */
fileio_make_log_active_name (logwr_Gl.active_name, log_path, logwr_Gl.db_name);
/* set the log info file path */
fileio_make_log_info_name (logwr_Gl.loginf_path, log_path, logwr_Gl.db_name);
/* background archive file path */
fileio_make_log_archive_temp_name (logwr_Gl.bg_archive_name, log_path, logwr_Gl.db_name);
log_nbuffers = LOGWR_COPY_LOG_BUFFER_NPAGES + 1;
if (logwr_Gl.logpg_area == NULL)
{
logwr_Gl.logpg_area_size = log_nbuffers * LOG_PAGESIZE;
logwr_Gl.logpg_area = (char *) malloc (logwr_Gl.logpg_area_size);
if (logwr_Gl.logpg_area == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) logwr_Gl.logpg_area_size);
logwr_Gl.logpg_area_size = 0;
return ER_OUT_OF_VIRTUAL_MEMORY;
}
}
if (logwr_Gl.toflush == NULL)
{
int i;
logwr_Gl.max_toflush = log_nbuffers - 1;
logwr_Gl.toflush = (LOG_PAGE **) calloc (logwr_Gl.max_toflush, sizeof (logwr_Gl.toflush));
if (logwr_Gl.toflush == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
logwr_Gl.max_toflush * sizeof (logwr_Gl.toflush));
logwr_Gl.max_toflush = 0;
return ER_OUT_OF_VIRTUAL_MEMORY;
}
for (i = 0; i < logwr_Gl.max_toflush; i++)
{
logwr_Gl.toflush[i] = NULL;
}
}
error = logwr_read_log_header ();
if (error != NO_ERROR)
{
return error;
}
logwr_Gl.start_pageid = start_pageid;
if (logwr_Gl.start_pageid >= NULL_PAGEID && logwr_Gl.hdr.nxarv_pageid != NULL_PAGEID)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, "Replication transaction log already exist");
return ER_HA_GENERIC_ERROR;
}
logwr_Gl.action = LOGWR_ACTION_NONE;
logwr_Gl.last_arv_fpageid = logwr_Gl.hdr.nxarv_pageid;
logwr_Gl.last_arv_num = logwr_Gl.hdr.nxarv_num;
logwr_Gl.force_flush = false;
logwr_Gl.last_flush_time.tv_sec = 0;
logwr_Gl.last_flush_time.tv_usec = 0;
logwr_Gl.ori_nxarv_pageid = NULL_PAGEID;
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING))
{
BACKGROUND_ARCHIVING_INFO *bg_arv_info;
bg_arv_info = &logwr_Gl.bg_archive_info;
if (fileio_is_volume_exist (logwr_Gl.bg_archive_name) == true)
{
bg_arv_info->vdes =
fileio_mount (NULL, logwr_Gl.bg_archive_name, logwr_Gl.bg_archive_name, LOG_DBLOG_ARCHIVE_VOLID, true,
false);
if (bg_arv_info->vdes == NULL_VOLDES)
{
return ER_IO_MOUNT_FAIL;
}
error = logwr_read_bgarv_log_header ();
if (error != NO_ERROR)
{
if (error == ER_LOG_INCOMPATIBLE_DATABASE)
{
/* Considering the case of upgrading, if it read magic from bg archive file which is not include
* magic, make new bg archive file. */
fileio_dismount (NULL, bg_arv_info->vdes);
fileio_unformat (NULL, logwr_Gl.bg_archive_name);
bg_arv_info->vdes = NULL_VOLDES;
}
else
{
return error;
}
}
}
/* if bg archive file does not exist or magic is diff, */
/* create new bg archive file */
if (bg_arv_info->vdes == NULL_VOLDES)
{
bg_arv_info->vdes =
fileio_format (NULL, logwr_Gl.db_name, logwr_Gl.bg_archive_name, LOG_DBLOG_BG_ARCHIVE_VOLID,
logwr_Gl.hdr.npages + 1, false, false, false, LOG_PAGESIZE, 0, false);
if (bg_arv_info->vdes == NULL_VOLDES)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1,
"Unable to create temporary archive log");
return ER_HA_GENERIC_ERROR;
}
bg_arv_info->start_page_id = logwr_Gl.hdr.nxarv_pageid;
bg_arv_info->current_page_id = NULL_PAGEID;
bg_arv_info->last_sync_pageid = logwr_Gl.hdr.nxarv_pageid;
error = logwr_flush_bgarv_header_page ();
if (error != NO_ERROR)
{
return error;
}
}
}
return NO_ERROR;
}
/*
* logwr_finalize -
*
* return:
* Note:
*/
static void
logwr_finalize (void)
{
if (logwr_Gl.logpg_area != NULL)
{
free_and_init (logwr_Gl.logpg_area);
logwr_Gl.logpg_area_size = 0;
logwr_Gl.logpg_fill_size = 0;
logwr_Gl.loghdr_pgptr = NULL;
}
if (logwr_Gl.toflush != NULL)
{
free_and_init (logwr_Gl.toflush);
logwr_Gl.max_toflush = 0;
logwr_Gl.num_toflush = 0;
}
if (logwr_Gl.append_vdes != NULL_VOLDES)
{
fileio_dismount (NULL, logwr_Gl.append_vdes);
logwr_Gl.append_vdes = NULL_VOLDES;
}
logwr_Gl.last_recv_pageid = NULL_PAGEID;
logwr_Gl.mode = LOGWR_MODE_ASYNC;
logwr_Gl.action = LOGWR_ACTION_NONE;
logwr_Gl.force_flush = false;
logwr_Gl.last_flush_time.tv_sec = 0;
logwr_Gl.last_flush_time.tv_usec = 0;
logwr_Gl.ori_nxarv_pageid = NULL_PAGEID;
logwr_Gl.start_pageid = -2;
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING))
{
if (logwr_Gl.bg_archive_info.vdes != NULL_VOLDES)
{
fileio_dismount (NULL, logwr_Gl.bg_archive_info.vdes);
logwr_Gl.bg_archive_info.vdes = NULL_VOLDES;
}
}
logwr_Gl.reinit_copylog = false;
}
/*
* logwr_set_hdr_and_flush_info -
*
* return:
* Note:
*/
int
logwr_set_hdr_and_flush_info (void)
{
LOG_PAGE *log_pgptr = NULL, *last_pgptr;
char *p;
int num_toflush = 0;
/* Set the flush information */
p = logwr_Gl.logpg_area + LOG_PAGESIZE;
while (p < (logwr_Gl.logpg_area + logwr_Gl.logpg_fill_size))
{
log_pgptr = (LOG_PAGE *) p;
logwr_Gl.toflush[num_toflush++] = log_pgptr;
p += LOG_PAGESIZE;
}
last_pgptr = log_pgptr;
logwr_Gl.num_toflush = num_toflush;
/* get original next archive pageid */
logwr_Gl.ori_nxarv_pageid = logwr_Gl.hdr.nxarv_pageid;
/* Set the header and action information */
if (num_toflush > 0)
{
log_pgptr = (LOG_PAGE *) logwr_Gl.logpg_area;
memcpy (&logwr_Gl.hdr, log_pgptr->area, sizeof (LOG_HEADER));
logwr_Gl.loghdr_pgptr = log_pgptr;
/* Initialize archive info if it is not set */
if (logwr_Gl.last_arv_fpageid == NULL_PAGEID || logwr_Gl.last_arv_num < 0)
{
logwr_Gl.last_arv_fpageid = logwr_Gl.hdr.nxarv_pageid;
logwr_Gl.last_arv_num = logwr_Gl.hdr.nxarv_num;
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING))
{
logwr_Gl.bg_archive_info.start_page_id = logwr_Gl.last_arv_fpageid;
}
}
/* Check if it need archiving */
if (((logwr_Gl.last_arv_num + 1 < logwr_Gl.hdr.nxarv_num)
&& (logwr_Gl.hdr.ha_file_status == LOG_HA_FILESTAT_ARCHIVED))
&& (logwr_Gl.last_arv_fpageid <= logwr_Gl.last_recv_pageid))
{
/* Do delayed archiving */
logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action | LOGWR_ACTION_ARCHIVING);
logwr_Gl.last_arv_lpageid = logwr_Gl.last_recv_pageid;
}
else if ((logwr_Gl.last_arv_num + 1 == logwr_Gl.hdr.nxarv_num)
&& (last_pgptr->hdr.logical_pageid >= logwr_Gl.hdr.nxarv_pageid))
{
logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action | LOGWR_ACTION_ARCHIVING);
logwr_Gl.last_arv_lpageid = logwr_Gl.hdr.nxarv_pageid - 1;
}
if (last_pgptr != NULL && last_pgptr->hdr.logical_pageid < logwr_Gl.hdr.eof_lsa.pageid)
{
/* There are left several pages to get from the server */
logwr_Gl.last_recv_pageid = last_pgptr->hdr.logical_pageid;
logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action | LOGWR_ACTION_DELAYED_WRITE);
}
else
{
logwr_Gl.last_recv_pageid = logwr_Gl.hdr.eof_lsa.pageid;
if (logwr_Gl.action & LOGWR_ACTION_DELAYED_WRITE)
{
/* In case that it finishes delay write */
logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action & ~LOGWR_ACTION_DELAYED_WRITE);
}
}
}
else
{
/* If it gets only the header page, compares both of the headers. There is no update for the header information */
LOG_HEADER hdr;
log_pgptr = (LOG_PAGE *) logwr_Gl.logpg_area;
memcpy (&hdr, log_pgptr->area, sizeof (LOG_HEADER));
if (hdr.ha_server_state != HA_SERVER_STATE_ACTIVE && hdr.ha_server_state != HA_SERVER_STATE_TO_BE_ACTIVE
&& hdr.ha_server_state != HA_SERVER_STATE_TO_BE_STANDBY
&& (hdr.ha_promotion_time == 0 || difftime64 (hdr.ha_promotion_time, logwr_Gl.hdr.ha_promotion_time) == 0)
&& difftime64 (hdr.db_restore_time, logwr_Gl.hdr.db_restore_time) != 0)
{
logwr_Gl.reinit_copylog = true;
logwr_Gl.loghdr_pgptr = (LOG_PAGE *) logwr_Gl.logpg_area;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_DOESNT_CORRESPOND_TO_DATABASE, 1, logwr_Gl.active_name);
return ER_LOG_DOESNT_CORRESPOND_TO_DATABASE;
}
if (difftime64 (hdr.db_creation, logwr_Gl.hdr.db_creation) != 0)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_DOESNT_CORRESPOND_TO_DATABASE, 1, logwr_Gl.active_name);
return ER_LOG_DOESNT_CORRESPOND_TO_DATABASE;
}
if (logwr_Gl.hdr.ha_file_status != LOG_HA_FILESTAT_SYNCHRONIZED)
{
/* In case of delayed write, get last_recv_pageid from the append lsa of the local log header */
logwr_Gl.last_recv_pageid = logwr_Gl.hdr.append_lsa.pageid - 1;
}
else
{
/* To get the last page again, decrease last pageid */
logwr_Gl.last_recv_pageid = logwr_Gl.hdr.eof_lsa.pageid - 1;
}
}
if (logwr_Gl.hdr.ha_file_status != LOG_HA_FILESTAT_SYNCHRONIZED)
{
/* In case of delayed write, save the append lsa of the log to be written locally */
logwr_Gl.hdr.append_lsa.pageid = logwr_Gl.last_recv_pageid;
logwr_Gl.hdr.append_lsa.offset = NULL_OFFSET;
}
return NO_ERROR;
}
/*
* logwr_copy_necessary_log -
* return:
* to_pageid(in): page id
*
* Note: copy active log to background archive file.
* (from bg_arv_info.current to to_pageid)
*/
static int
logwr_copy_necessary_log (LOG_PAGEID to_pageid)
{
char log_pgbuf[IO_MAX_PAGE_SIZE * LOGPB_IO_NPAGES + MAX_ALIGNMENT];
char *aligned_log_pgbuf = NULL;
LOG_PAGEID pageid = NULL_PAGEID;
LOG_PHY_PAGEID phy_pageid = NULL_PAGEID;
LOG_PHY_PAGEID ar_phy_pageid = NULL_PAGEID;
LOG_PAGE *log_pgptr = NULL;
int num_pages = 0;
BACKGROUND_ARCHIVING_INFO *bg_arv_info = NULL;
bg_arv_info = &logwr_Gl.bg_archive_info;
aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
pageid = bg_arv_info->current_page_id;
if (pageid == NULL_PAGEID)
{
pageid = bg_arv_info->start_page_id;
}
ar_phy_pageid = (LOG_PHY_PAGEID) (pageid - bg_arv_info->start_page_id + 1);
log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;
assert (logwr_Gl.last_arv_fpageid <= pageid && pageid <= logwr_Gl.hdr.append_lsa.pageid);
assert (logwr_Gl.last_arv_fpageid <= to_pageid && to_pageid <= logwr_Gl.hdr.append_lsa.pageid);
for (; pageid < to_pageid; pageid += num_pages, ar_phy_pageid += num_pages)
{
num_pages = MIN (LOGPB_IO_NPAGES, (int) (to_pageid - pageid));
phy_pageid = logwr_to_physical_pageid (pageid);
num_pages = MIN (num_pages, logwr_Gl.hdr.npages - phy_pageid + 1);
if (fileio_read_pages (NULL, logwr_Gl.append_vdes, (char *) log_pgptr, phy_pageid, num_pages, LOG_PAGESIZE) ==
NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_READ, 3, pageid, phy_pageid, logwr_Gl.active_name);
return ER_LOG_READ;
}
else
{
if (log_pgptr->hdr.logical_pageid != pageid)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_PAGE_CORRUPTED, 1, pageid);
return ER_LOG_PAGE_CORRUPTED;
}
}
/* no need to encrypt, it is read as not decrypted (TDE) if encrypted */
if (fileio_write_pages (NULL, bg_arv_info->vdes, (char *) log_pgptr, ar_phy_pageid, num_pages, LOG_PAGESIZE,
FILEIO_WRITE_DEFAULT_WRITE) == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE, 3, pageid, ar_phy_pageid,
logwr_Gl.bg_archive_name);
return ER_LOG_WRITE;
}
}
return NO_ERROR;
}
/*
* logwr_fetch_append_pages -
*
* return:
* to_flush(in):
* npages(in):
* Note:
*/
static LOG_PAGE **
logwr_writev_append_pages (LOG_PAGE ** to_flush, DKNPAGES npages)
{
char log_pgbuf[IO_MAX_PAGE_SIZE * LOGPB_IO_NPAGES + MAX_ALIGNMENT];
LOG_PAGEID fpageid;
LOG_PHY_PAGEID phy_pageid;
BACKGROUND_ARCHIVING_INFO *bg_arv_info = NULL;
LOG_PAGE *log_pgptr = NULL;
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
LOG_PAGE *buf_pgptr = NULL;
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
FILEIO_WRITE_MODE write_mode = FILEIO_WRITE_DEFAULT_WRITE;
const TDE_ALGORITHM tde_algo = (TDE_ALGORITHM) prm_get_integer_value (PRM_ID_TDE_DEFAULT_ALGORITHM);
int error = NO_ERROR;
int i;
int tde_load_retries = 3;
#if !defined (CS_MODE)
write_mode = dwb_is_created () == true ? FILEIO_WRITE_NO_COMPENSATE_WRITE : FILEIO_WRITE_DEFAULT_WRITE;
#endif
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
buf_pgptr = (LOG_PAGE *) PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
if (npages > 0)
{
fpageid = to_flush[0]->hdr.logical_pageid;
(void) logwr_check_page_checksum (NULL, *to_flush);
/* 1. archive temp write */
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING))
{
bg_arv_info = &logwr_Gl.bg_archive_info;
/* check archive temp descriptor */
if (bg_arv_info->vdes == NULL_VOLDES)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, "invalid temporary archive log file");
to_flush = NULL;
return to_flush;
}
/* If there exist empty page between current_page_id and (fpageid-1) copy missing logs to background archive
* log file */
if (bg_arv_info->current_page_id < fpageid - 1 || bg_arv_info->current_page_id == NULL_PAGEID)
{
if (logwr_copy_necessary_log (fpageid) != NO_ERROR)
{
to_flush = NULL;
return to_flush;
}
}
phy_pageid = (LOG_PHY_PAGEID) (fpageid - bg_arv_info->start_page_id + 1);
for (i = 0; i < npages; i++)
{
log_pgptr = to_flush[i];
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
if (LOG_IS_PAGE_TDE_ENCRYPTED (log_pgptr))
{
logwr_set_tde_algorithm (NULL, log_pgptr, tde_algo);
while (!tde_is_loaded ())
{
error = logwr_load_tde ();
if (error != NO_ERROR)
{
ASSERT_ERROR ();
if (tde_load_retries-- > 0)
{
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_TDE_CIPHER_LOAD_FAIL, 0);
sleep (1);
er_clear ();
continue;
}
return NULL;
}
}
if (tde_encrypt_log_page (log_pgptr, tde_algo, buf_pgptr) != NO_ERROR)
{
return NULL;
}
log_pgptr = buf_pgptr;
}
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
if (fileio_write (NULL, bg_arv_info->vdes, log_pgptr, phy_pageid + i, LOG_PAGESIZE, write_mode) == NULL)
{
if (er_errid () == ER_IO_WRITE_OUT_OF_SPACE)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE_OUT_OF_SPACE, 4, fpageid, phy_pageid,
logwr_Gl.bg_archive_name, logwr_Gl.hdr.db_logpagesize);
}
else
{
er_set_with_oserror (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE, 3, fpageid, phy_pageid,
logwr_Gl.bg_archive_name);
}
to_flush = NULL;
return to_flush;
}
}
bg_arv_info->current_page_id = fpageid + (npages - 1);
logwr_er_log ("background archiving current_page_id[%lld], fpageid[%lld], npages[%d]",
bg_arv_info->current_page_id, fpageid, npages);
error = logwr_flush_bgarv_header_page ();
if (error != NO_ERROR)
{
to_flush = NULL;
return to_flush;
}
}
tde_load_retries = 3;
/* 2. active write */
phy_pageid = logwr_to_physical_pageid (fpageid);
for (i = 0; i < npages; i++)
{
log_pgptr = to_flush[i];
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
if (LOG_IS_PAGE_TDE_ENCRYPTED (log_pgptr))
{
logwr_set_tde_algorithm (NULL, log_pgptr, tde_algo);
while (!tde_is_loaded ())
{
error = logwr_load_tde ();
if (error != NO_ERROR)
{
ASSERT_ERROR ();
if (tde_load_retries-- > 0)
{
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_TDE_CIPHER_LOAD_FAIL, 0);
sleep (1);
er_clear ();
continue;
}
return NULL;
}
}
if (tde_encrypt_log_page (log_pgptr, tde_algo, buf_pgptr) != NO_ERROR)
{
return NULL;
}
log_pgptr = buf_pgptr;
}
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
if (fileio_write (NULL, logwr_Gl.append_vdes, log_pgptr, phy_pageid + i, LOG_PAGESIZE, write_mode) == NULL)
{
if (er_errid () == ER_IO_WRITE_OUT_OF_SPACE)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE_OUT_OF_SPACE, 4, fpageid, phy_pageid,
logwr_Gl.active_name, logwr_Gl.hdr.db_logpagesize);
}
else
{
er_set_with_oserror (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE, 3, fpageid, phy_pageid,
logwr_Gl.active_name);
}
to_flush = NULL;
return to_flush;
}
}
}
return to_flush;
}
/*
* logwr_flush_all_append_pages -
*
* return:
* Note:
*/
static int
logwr_flush_all_append_pages (void)
{
LOG_PAGE *pgptr, *prv_pgptr;
LOG_PAGEID pageid, prv_pageid;
int idxflush;
bool need_sync;
int flush_page_count;
int i;
idxflush = -1;
prv_pgptr = NULL;
need_sync = false;
flush_page_count = 0;
for (i = 0; i < logwr_Gl.num_toflush; i++)
{
pgptr = logwr_Gl.toflush[i];
if (idxflush != -1 && prv_pgptr != NULL)
{
/*
* This append log page should be dirty and contiguous to previous
* append page. If it is not, we need to flush the accumulated pages
* up to this point, and then start accumulating pages again.
*/
pageid = pgptr->hdr.logical_pageid;
prv_pageid = prv_pgptr->hdr.logical_pageid;
if ((pageid != prv_pageid + 1)
|| (logwr_to_physical_pageid (pageid) != logwr_to_physical_pageid (prv_pageid) + 1))
{
/*
* This page is not contiguous.
*
* Flush the accumulated contiguous pages
*/
if (logwr_writev_append_pages (&logwr_Gl.toflush[idxflush], i - idxflush) == NULL)
{
assert (er_errid () != NO_ERROR);
return er_errid ();
}
else
{
need_sync = true;
/*
* Start over the accumulation of pages
*/
flush_page_count += i - idxflush;
idxflush = -1;
}
}
}
if (idxflush == -1)
{
/*
* This page should be included in the flush
*/
idxflush = i;
}
/* prv_pgptr was not pgptr's previous buffer. prv_pgptr was the first buffer to flush, so only 2 continous pages
* always were flushed together. */
prv_pgptr = pgptr;
}
/*
* If there are any accumulated pages, flush them at this point
*/
if (idxflush != -1)
{
int page_toflush = logwr_Gl.num_toflush - idxflush;
/* last countious pages */
if (logwr_writev_append_pages (&logwr_Gl.toflush[idxflush], page_toflush) == NULL)
{
assert (er_errid () != NO_ERROR);
return er_errid ();
}
else
{
need_sync = true;
flush_page_count += page_toflush;
pgptr = logwr_Gl.toflush[idxflush + page_toflush - 1];
}
}
/*
* Make sure that all of the above log writes are synchronized with any
* future log writes. That is, the pages should be stored on physical disk.
*/
if (need_sync == true && fileio_synchronize (NULL, logwr_Gl.append_vdes, logwr_Gl.active_name, false) == NULL_VOLDES)
{
assert (er_errid () != NO_ERROR);
return er_errid ();
}
/* It's for dual write. */
if (need_sync == true && prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING))
{
if (fileio_synchronize (NULL, logwr_Gl.bg_archive_info.vdes, logwr_Gl.bg_archive_name, false) == NULL_VOLDES)
{
assert (er_errid () != NO_ERROR);
return er_errid ();
}
}
/* Initialize flush info */
for (i = 0; i < logwr_Gl.num_toflush; i++)
{
logwr_Gl.toflush[i] = NULL;
}
logwr_Gl.num_toflush = 0;
logwr_er_log ("logwr_write_log_pages, flush_page_count(%d)\n", flush_page_count);
return NO_ERROR;
}
/*
* logwr_flush_bgarv_header_page -
*
* return:
* Note:
*/
int
logwr_flush_bgarv_header_page (void)
{
char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
char *aligned_log_pgbuf;
LOG_PAGE *log_pgptr;
BACKGROUND_ARCHIVING_INFO *bg_arv_info = NULL;
LOG_BGARV_HEADER *bgarvhdr = NULL;
LOG_PAGEID logical_pageid;
LOG_PHY_PAGEID phy_pageid;
int error_code = NO_ERROR;
bg_arv_info = &logwr_Gl.bg_archive_info;
assert (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING));
assert (bg_arv_info->vdes != NULL_VOLDES);
aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;
log_pgptr->hdr.logical_pageid = LOGPB_HEADER_PAGE_ID;
log_pgptr->hdr.offset = NULL_OFFSET;
logical_pageid = LOGPB_HEADER_PAGE_ID;
/* Construct the bg archive log header */
bgarvhdr = (LOG_BGARV_HEADER *) log_pgptr->area;
strncpy (bgarvhdr->magic, CUBRID_MAGIC_LOG_ARCHIVE, CUBRID_MAGIC_MAX_LENGTH);
bgarvhdr->db_creation = logwr_Gl.hdr.db_creation;
bgarvhdr->start_page_id = bg_arv_info->start_page_id;
bgarvhdr->current_page_id = bg_arv_info->current_page_id;
bgarvhdr->last_sync_pageid = bg_arv_info->last_sync_pageid;
phy_pageid = logwr_to_physical_pageid (log_pgptr->hdr.logical_pageid);
if (fileio_write (NULL, bg_arv_info->vdes, log_pgptr, phy_pageid, LOG_PAGESIZE,
FILEIO_WRITE_NO_COMPENSATE_WRITE) == NULL
|| fileio_synchronize (NULL, bg_arv_info->vdes, logwr_Gl.bg_archive_name, false) == NULL_VOLDES)
{
if (er_errid () == ER_IO_WRITE_OUT_OF_SPACE)
{
error_code = ER_LOG_WRITE_OUT_OF_SPACE;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE_OUT_OF_SPACE, 4, logical_pageid, phy_pageid,
logwr_Gl.bg_archive_name, LOG_PAGESIZE);
}
else
{
error_code = ER_LOG_WRITE;
er_set_with_oserror (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE, 3, logical_pageid, phy_pageid,
logwr_Gl.bg_archive_name);
}
return error_code;
}
return NO_ERROR;
}
/*
* logwr_flush_header_page -
*
* return:
* Note:
*/
void
logwr_flush_header_page (void)
{
LOG_PAGEID logical_pageid;
LOG_PHY_PAGEID phy_pageid;
int nbytes;
if (logwr_Gl.loghdr_pgptr == NULL)
{
return;
}
logwr_Gl.hdr.is_shutdown = true;
/* flush current archiving status */
logwr_Gl.hdr.nxarv_num = logwr_Gl.last_arv_num;
logwr_Gl.hdr.nxarv_pageid = logwr_Gl.last_arv_fpageid;
logwr_Gl.hdr.nxarv_phy_pageid = logwr_to_physical_pageid (logwr_Gl.last_arv_fpageid);
memcpy (logwr_Gl.loghdr_pgptr->area, &logwr_Gl.hdr, sizeof (logwr_Gl.hdr));
logical_pageid = LOGPB_HEADER_PAGE_ID;
phy_pageid = logwr_to_physical_pageid (logical_pageid);
/* logwr_Gl.append_vdes is only changed while starting or finishing or recovering server. So, log cs is not needed. */
if (fileio_write (NULL, logwr_Gl.append_vdes, logwr_Gl.loghdr_pgptr, phy_pageid, LOG_PAGESIZE,
FILEIO_WRITE_NO_COMPENSATE_WRITE) == NULL
|| fileio_synchronize (NULL, logwr_Gl.append_vdes, logwr_Gl.active_name, false) == NULL_VOLDES)
{
if (er_errid () == ER_IO_WRITE_OUT_OF_SPACE)
{
nbytes = logwr_Gl.hdr.db_logpagesize;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE_OUT_OF_SPACE, 4, logical_pageid, phy_pageid,
logwr_Gl.active_name, nbytes);
}
else
{
er_set_with_oserror (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE, 3, logical_pageid, phy_pageid,
logwr_Gl.active_name);
}
}
/* save last checkpoint pageid */
logwr_Gl.last_chkpt_pageid = logwr_Gl.hdr.chkpt_lsa.pageid;
if (prev_ha_server_state != logwr_Gl.hdr.ha_server_state)
{
string_buffer error_msg;
error_msg ("change the state of HA server (%s@%s) from '%s' to '%s'", logwr_Gl.db_name,
(logwr_Gl.hostname != NULL) ? logwr_Gl.hostname : "unknown",
css_ha_server_state_string ((HA_SERVER_STATE) prev_ha_server_state),
css_ha_server_state_string ((HA_SERVER_STATE) logwr_Gl.hdr.ha_server_state));
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, error_msg.get_buffer ());
}
prev_ha_server_state = logwr_Gl.hdr.ha_server_state;
logwr_er_log ("logwr_flush_header_page, ha_server_state=%s, ha_file_status=%s\n",
css_ha_server_state_string ((HA_SERVER_STATE) logwr_Gl.hdr.ha_server_state),
logwr_log_ha_filestat_to_string ((LOG_HA_FILESTAT) logwr_Gl.hdr.ha_file_status));
}
/*
* logwr_archive_active_log -
*
* return:
* Note:
*/
static int
logwr_archive_active_log (void)
{
char archive_name[PATH_MAX] = { '\0' };
LOG_ARV_HEADER *arvhdr;
char log_pgbuf[IO_MAX_PAGE_SIZE * LOGPB_IO_NPAGES + MAX_ALIGNMENT];
char *aligned_log_pgbuf;
LOG_PAGE *log_pgptr = NULL;
LOG_PAGE *malloc_arv_hdr_pgptr = NULL;
LOG_PAGEID pageid;
LOG_LSA saved_append_lsa;
LOG_PHY_PAGEID ar_phy_pageid = NULL_PAGEID, phy_pageid = NULL_PAGEID;
int vdes = NULL_VOLDES;
int error_code = NO_ERROR;
int num_pages = 0;
const char *catmsg;
char buffer[LINE_MAX];
BACKGROUND_ARCHIVING_INFO *bg_arv_info;
aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
/* Create the archive header page */
malloc_arv_hdr_pgptr = (LOG_PAGE *) malloc (LOG_PAGESIZE);
if (malloc_arv_hdr_pgptr == NULL)
{
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) LOG_PAGESIZE);
goto error;
}
malloc_arv_hdr_pgptr->hdr.logical_pageid = LOGPB_HEADER_PAGE_ID;
malloc_arv_hdr_pgptr->hdr.offset = NULL_OFFSET;
/* Construct the archive log header */
arvhdr = (LOG_ARV_HEADER *) malloc_arv_hdr_pgptr->area;
strncpy (arvhdr->magic, CUBRID_MAGIC_LOG_ARCHIVE, CUBRID_MAGIC_MAX_LENGTH);
arvhdr->db_creation = logwr_Gl.hdr.db_creation;
arvhdr->vol_creation = time (NULL);
arvhdr->next_trid = NULL_TRANID;
arvhdr->fpageid = logwr_Gl.last_arv_fpageid;
arvhdr->arv_num = logwr_Gl.last_arv_num;
arvhdr->npages = (DKNPAGES) (logwr_Gl.last_arv_lpageid - arvhdr->fpageid + 1);
/*
* Now create the archive and start copying pages
*/
snprintf (buffer, sizeof (buffer), "log archiving started for archive %03d", arvhdr->arv_num);
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, buffer);
fileio_make_log_archive_name (archive_name, logwr_Gl.log_path, logwr_Gl.db_name, arvhdr->arv_num);
bg_arv_info = &logwr_Gl.bg_archive_info;
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING))
{
if (bg_arv_info->vdes == NULL_VOLDES)
{
error_code = ER_HA_GENERIC_ERROR;
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1,
"invalid temporary archive log file");
goto error;
}
vdes = bg_arv_info->vdes;
}
else
{
if (fileio_is_volume_exist (archive_name) == true)
{
vdes = fileio_mount (NULL, archive_name, archive_name, LOG_DBLOG_ARCHIVE_VOLID, true, false);
if (vdes == NULL_VOLDES)
{
error_code = ER_IO_MOUNT_FAIL;
goto error;
}
}
else
{
vdes =
fileio_format (NULL, logwr_Gl.db_name, archive_name, LOG_DBLOG_ARCHIVE_VOLID, arvhdr->npages + 1, false,
false, false, LOG_PAGESIZE, 0, false);
if (vdes == NULL_VOLDES)
{
/* Unable to create archive log to archive */
error_code = ER_LOG_CREATE_LOGARCHIVE_FAIL;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_CREATE_LOGARCHIVE_FAIL, 3, archive_name,
arvhdr->fpageid, arvhdr->fpageid + arvhdr->npages - 1);
goto error;
}
}
}
if (fileio_write (NULL, vdes, malloc_arv_hdr_pgptr, 0, LOG_PAGESIZE, FILEIO_WRITE_NO_COMPENSATE_WRITE) == NULL)
{
/* Error archiving header page into archive */
error_code = ER_LOG_WRITE;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE, 3, 0LL, 0LL, archive_name);
goto error;
}
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING) && bg_arv_info->vdes != NULL_VOLDES
&& logwr_Gl.last_arv_fpageid == bg_arv_info->start_page_id)
{
pageid = bg_arv_info->current_page_id;
ar_phy_pageid = (LOG_PHY_PAGEID) (bg_arv_info->current_page_id - bg_arv_info->start_page_id + 1);
}
else
{
pageid = logwr_Gl.last_arv_fpageid;
ar_phy_pageid = 1;
}
log_pgptr = (LOG_PAGE *) aligned_log_pgbuf;
/* Now start dumping the current active pages to archive */
for (; pageid <= logwr_Gl.last_arv_lpageid; pageid += num_pages, ar_phy_pageid += num_pages)
{
/*
* Page is contained in the active log.
* Find the corresponding physical page and read the page form disk.
*/
num_pages = MIN (LOGPB_IO_NPAGES, (int) (logwr_Gl.last_arv_lpageid - pageid + 1));
phy_pageid = logwr_to_physical_pageid (pageid);
num_pages = MIN (num_pages, logwr_Gl.hdr.npages - phy_pageid + 1);
if (fileio_read_pages (NULL, logwr_Gl.append_vdes, (char *) log_pgptr, phy_pageid, num_pages, LOG_PAGESIZE) ==
NULL)
{
error_code = ER_LOG_READ;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_READ, 3, pageid, phy_pageid, logwr_Gl.active_name);
goto error;
}
else
{
if (log_pgptr->hdr.logical_pageid != pageid)
{
/* Clean the buffer... since it may be corrupted */
error_code = ER_LOG_PAGE_CORRUPTED;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_PAGE_CORRUPTED, 1, pageid);
goto error;
}
}
/* no need to encrypt, it is read as not decrypted (TDE) if encrypted */
if (fileio_write_pages (NULL, vdes, (char *) log_pgptr, ar_phy_pageid, num_pages, LOG_PAGESIZE,
FILEIO_WRITE_DEFAULT_WRITE) == NULL)
{
error_code = ER_LOG_WRITE;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_WRITE, 3, pageid, ar_phy_pageid, archive_name);
goto error;
}
}
fileio_dismount (NULL, vdes);
vdes = NULL_VOLDES;
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING) && bg_arv_info->vdes != NULL_VOLDES)
{
bg_arv_info->vdes = NULL_VOLDES;
if (fileio_rename (NULL_VOLID, logwr_Gl.bg_archive_name, archive_name) == NULL)
{
goto error;
}
bg_arv_info->vdes =
fileio_format (NULL, logwr_Gl.db_name, logwr_Gl.bg_archive_name, LOG_DBLOG_BG_ARCHIVE_VOLID,
logwr_Gl.hdr.npages, false, false, false, LOG_PAGESIZE, 0, false);
if (bg_arv_info->vdes != NULL_VOLDES)
{
bg_arv_info->start_page_id = logwr_Gl.last_arv_lpageid + 1;
bg_arv_info->current_page_id = NULL_PAGEID;
}
else
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, "Unable to create temporary archive log");
return ER_HA_GENERIC_ERROR;
}
error_code = logwr_flush_bgarv_header_page ();
if (error_code != NO_ERROR)
{
goto error;
}
}
/* Update archive info */
logwr_Gl.last_arv_num++;
logwr_Gl.last_arv_fpageid = logwr_Gl.last_arv_lpageid + 1;
/* set append lsa as last archive logical pageid */
/* in order to prevent log applier reading an immature active log page. */
LSA_COPY (&saved_append_lsa, &logwr_Gl.hdr.append_lsa);
logwr_Gl.hdr.append_lsa.pageid = logwr_Gl.last_arv_lpageid;
logwr_Gl.hdr.append_lsa.offset = NULL_OFFSET;
/* Flush the log header to reflect the archive */
logwr_flush_header_page ();
/* restore append lsa */
LSA_COPY (&logwr_Gl.hdr.append_lsa, &saved_append_lsa);
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_LOG_ARCHIVE_CREATED, 3, archive_name, arvhdr->fpageid,
arvhdr->fpageid + arvhdr->npages - 1);
catmsg = msgcat_message (MSGCAT_CATALOG_CUBRID, MSGCAT_SET_LOG, MSGCAT_LOG_LOGINFO_ARCHIVE);
if (catmsg == NULL)
{
catmsg = "ARCHIVE: %d %s %lld %lld\n";
}
error_code =
log_dump_log_info (logwr_Gl.loginf_path, false, catmsg, arvhdr->arv_num, archive_name, arvhdr->fpageid,
arvhdr->fpageid + arvhdr->npages - 1);
logwr_er_log ("logwr_archive_active_log, arv_num(%d), fpageid(%lld) lpageid(%lld)\n", arvhdr->arv_num,
arvhdr->fpageid, arvhdr->fpageid + arvhdr->npages - 1);
free_and_init (malloc_arv_hdr_pgptr);
return NO_ERROR;
error:
if (malloc_arv_hdr_pgptr != NULL)
{
free_and_init (malloc_arv_hdr_pgptr);
}
if (vdes != NULL_VOLDES)
{
fileio_dismount (NULL, vdes);
fileio_unformat (NULL, archive_name);
}
return error_code;
}
/*
* logwr_write_log_pages -
*
* return:
* Note:
*/
int
logwr_write_log_pages (void)
{
int error;
struct timeval curtime;
int diff_msec;
if (logwr_Gl.num_toflush <= 0)
return NO_ERROR;
if (logwr_Gl.mode == LOGWR_MODE_SEMISYNC)
{
gettimeofday (&curtime, NULL);
diff_msec =
(((curtime.tv_sec - logwr_Gl.last_flush_time.tv_sec) * 1000) +
((curtime.tv_usec - logwr_Gl.last_flush_time.tv_usec) / 1000));
if (logwr_Gl.force_flush == false && !LOGWR_AT_SERVER_ARCHIVING ()
&& (logwr_Gl.hdr.eof_lsa.pageid <= logwr_Gl.toflush[0]->hdr.logical_pageid) && (diff_msec < 1000))
{
return NO_ERROR;
}
logwr_Gl.force_flush = false;
}
if (logwr_Gl.append_vdes == NULL_VOLDES && !fileio_is_volume_exist (logwr_Gl.active_name))
{
/* Create a new active log */
logwr_Gl.append_vdes =
fileio_format (NULL, logwr_Gl.db_name, logwr_Gl.active_name, LOG_DBLOG_ACTIVE_VOLID, (logwr_Gl.hdr.npages + 1),
prm_get_bool_value (PRM_ID_LOG_SWEEP_CLEAN), true, false, LOG_PAGESIZE, 0, false);
if (logwr_Gl.append_vdes == NULL_VOLDES)
{
/* Unable to create an active log */
return ER_IO_FORMAT_FAIL;
}
}
/*
* LWT sets the archiving flag at the time when it sends new active page
* after archiving finished, so that logwr_archive_active_log() should
* be executed before logwr_flush_all_append_pages().
*/
if (logwr_Gl.action & LOGWR_ACTION_ARCHIVING)
{
error = logwr_archive_active_log ();
if (error != NO_ERROR)
{
return error;
}
}
error = logwr_flush_all_append_pages ();
if (error != NO_ERROR)
{
return error;
}
logwr_flush_header_page ();
gettimeofday (&logwr_Gl.last_flush_time, NULL);
return NO_ERROR;
}
#if !defined(WINDOWS)
/*
* logwr_copy_log_header_check
* return:
*
* master_eof_lsa(OUT): record eof_lsa to calculate replication delay
*
* note:
*/
int
logwr_copy_log_header_check (const char *db_name, bool verbose, LOG_LSA * master_eof_lsa)
{
int error = NO_ERROR;
LOGWR_CONTEXT ctx = { -1, 0, false };
OR_ALIGNED_BUF (OR_INT_SIZE * 2 + OR_INT64_SIZE) a_request;
OR_ALIGNED_BUF (OR_INT_SIZE * 2) a_reply;
char *request, *reply;
char *ptr;
char *logpg_area = NULL;
LOG_PAGE *loghdr_pgptr;
LOG_HEADER hdr;
char *atchar;
atchar = (char *) strchr (db_name, '@');
if (atchar)
{
*atchar = '\0';
}
request = OR_ALIGNED_BUF_START (a_request);
reply = OR_ALIGNED_BUF_START (a_reply);
/* HEADER PAGE REQUEST */
ptr = or_pack_int64 (request, LOGPB_HEADER_PAGE_ID);
ptr = or_pack_int (ptr, LOGWR_MODE_ASYNC);
ptr = or_pack_int (ptr, NO_ERROR);
error =
net_client_check_log_header (&ctx, request, OR_ALIGNED_BUF_SIZE (a_request), reply,
OR_ALIGNED_BUF_SIZE (a_reply), (char **) &logpg_area, verbose);
if (error != NO_ERROR)
{
return error;
}
else
{
loghdr_pgptr = (LOG_PAGE *) logpg_area;
memcpy (&hdr, loghdr_pgptr->area, sizeof (LOG_HEADER));
*master_eof_lsa = hdr.eof_lsa;
la_print_log_header (db_name, &hdr, verbose);
free_and_init (logpg_area);
}
/* END REQUEST */
ptr = or_pack_int64 (request, LOGPB_HEADER_PAGE_ID);
ptr = or_pack_int (ptr, LOGWR_MODE_ASYNC);
/* send ER_GENERIC_ERROR to make LWT not wait for more page requests */
ptr = or_pack_int (ptr, ER_GENERIC_ERROR);
error =
net_client_check_log_header (&ctx, request, OR_ALIGNED_BUF_SIZE (a_request), reply,
OR_ALIGNED_BUF_SIZE (a_reply), (char **) &logpg_area, verbose);
return error;
}
#endif /* !WINDOWS */
/*
* logwr_copy_log_file -
*
* return: NO_ERROR if successful, error_code otherwise
*
* db_name(in): database name to copy the log file
* log_path(in): file pathname to copy the log file
* mode(in): LOGWR_MODE_SYNC, LOGWR_MODE_ASYNC or LOGWR_MODE_SEMISYNC
*
* Note:
*/
int
logwr_copy_log_file (const char *db_name, const char *log_path, int mode, INT64 start_page_id)
{
LOGWR_CONTEXT ctx = { -1, 0, false };
int error = NO_ERROR;
if ((error = logwr_initialize (db_name, log_path, mode, start_page_id)) != NO_ERROR)
{
logwr_finalize ();
return error;
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_LW_STARTED, 1, mode);
while (!ctx.shutdown && !logwr_need_shutdown)
{
if ((error = logwr_get_log_pages (&ctx)) != NO_ERROR)
{
ctx.last_error = error;
if (error == ER_HA_LW_FAILED_GET_LOG_PAGE)
{
#if !defined(WINDOWS)
hb_deregister_from_master ();
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HB_PROCESS_EVENT, 2,
"Encountered an unrecoverable error and will shut itself down", "");
#endif /* !WINDOWS */
}
if (logwr_Gl.reinit_copylog)
{
char error_str[LINE_MAX];
sprintf (error_str,
"Replication logs and catalog have been reinitialized "
"due to rebuilt database on the peer node");
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, error_str);
error = ER_HA_GENERIC_ERROR;
logwr_reinit_copylog ();
goto end;
}
}
else
{
if (logwr_Gl.action & LOGWR_ACTION_ASYNC_WRITE)
{
error = logwr_write_log_pages ();
if (error != NO_ERROR)
{
ctx.last_error = error;
}
}
}
logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action & LOGWR_ACTION_DELAYED_WRITE);
}
#if !defined(WINDOWS)
if (hb_Proc_shutdown == true)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HB_PROCESS_EVENT, 2,
"Disconnected with the cub_master and will shut itself down", "");
}
#endif /* ! WINDOWS */
/* SIGNAL caught and shutdown */
if (logwr_need_shutdown)
{
if (logwr_Gl.mode == LOGWR_MODE_SEMISYNC)
{
logwr_Gl.force_flush = true;
logwr_write_log_pages ();
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_LW_STOPPED_BY_SIGNAL, 0);
error = ER_HA_LW_STOPPED_BY_SIGNAL;
}
end:
if (ctx.rc > 0)
{
if (logwr_Gl.start_pageid >= NULL_PAGEID && error == NO_ERROR)
{
/* to shutdown log writer thread of cub_server */
net_client_logwr_send_end_msg (ctx.rc, ER_FAILED);
}
else
{
net_client_logwr_send_end_msg (ctx.rc, error);
}
}
logwr_finalize ();
return error;
}
static void
logwr_reinit_copylog (void)
{
#if !defined(WINDOWS)
DIR *dirp;
struct dirent *dp;
char log_archive_path[2 * PATH_MAX];
char archive_log_prefix[2 * PATH_MAX];
int archive_log_prefix_len;
BACKGROUND_ARCHIVING_INFO *bg_arv_info = NULL;
/* mark will be deleted */
logwr_Gl.hdr.mark_will_del = true;
logwr_flush_header_page ();
/* remove background archive */
if (prm_get_bool_value (PRM_ID_LOG_BACKGROUND_ARCHIVING))
{
if (logwr_Gl.bg_archive_info.vdes != NULL_VOLDES)
{
bg_arv_info = &logwr_Gl.bg_archive_info;
fileio_dismount (NULL, bg_arv_info->vdes);
bg_arv_info->vdes = NULL_VOLDES;
}
fileio_unformat (NULL, logwr_Gl.bg_archive_name);
}
/* remove log info */
fileio_unformat (NULL, logwr_Gl.loginf_path);
/* remove archive logs */
sprintf (archive_log_prefix, "%s%s", logwr_Gl.db_name, FILEIO_SUFFIX_LOGARCHIVE);
archive_log_prefix_len = strlen (archive_log_prefix);
dirp = opendir (logwr_Gl.log_path);
if (dirp == NULL)
{
assert (false);
return;
}
do
{
errno = 0;
dp = readdir (dirp);
if (dp == NULL)
{
if (errno != 0)
{
assert (false);
}
break;
}
if (strncmp (archive_log_prefix, dp->d_name, archive_log_prefix_len) == 0)
{
sprintf (log_archive_path, "%s%s%s", logwr_Gl.log_path, FILEIO_PATH_SEPARATOR (logwr_Gl.log_path),
dp->d_name);
fileio_unformat (NULL, log_archive_path);
}
}
while (1);
closedir (dirp);
/* remove active log */
if (logwr_Gl.append_vdes != NULL_VOLDES)
{
fileio_dismount (NULL, logwr_Gl.append_vdes);
logwr_Gl.append_vdes = NULL_VOLDES;
}
fileio_unformat (NULL, logwr_Gl.active_name);
#endif /* !WINDOWS */
return;
}
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
static int
logwr_load_tde (void)
{
int client_len;
int client_sockfd;
char sock_path[PATH_MAX];
TDE_DATA_KEY_SET dks;
char *bufptr;
int nbytes, len;
int err_msg = NO_ERROR;
struct sockaddr_un clientaddr;
fileio_make_ha_sock_name (sock_path, logwr_Gl.log_path, TDE_HA_SOCK_NAME);
client_sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
if (client_sockfd == -1)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TDE_DK_SHARING_SOCK_OPEN, 1, sock_path);
return ER_TDE_DK_SHARING_SOCK_OPEN;
}
bzero (&clientaddr, sizeof (clientaddr));
clientaddr.sun_family = AF_UNIX;
strcpy (clientaddr.sun_path, sock_path);
client_len = sizeof (clientaddr);
if (connect (client_sockfd, (struct sockaddr *) &clientaddr, client_len) < 0)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TDE_DK_SHARING_SOCK_CONNECT, 0);
return ER_TDE_DK_SHARING_SOCK_CONNECT;
}
bufptr = logwr_Gl.log_path;
len = PATH_MAX;
while (len > 0)
{
nbytes = write (client_sockfd, bufptr, len);
if (nbytes < 0)
{
switch (errno)
{
case EINTR:
case EAGAIN:
continue;
default:
{
close (client_sockfd);
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TDE_DK_SHARING_SOCK_WRITE, 0);
return ER_TDE_DK_SHARING_SOCK_WRITE;
}
}
}
bufptr += nbytes;
len -= nbytes;
}
/* read error message */
bufptr = (char *) &err_msg;
len = sizeof (err_msg);
while (len > 0)
{
nbytes = read (client_sockfd, bufptr, len);
if (nbytes < 0)
{
switch (errno)
{
case EINTR:
case EAGAIN:
continue;
default:
{
close (client_sockfd);
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TDE_DK_SHARING_SOCK_READ, 0);
return ER_TDE_DK_SHARING_SOCK_READ;
}
}
}
bufptr += nbytes;
len -= nbytes;
}
if (err_msg != NO_ERROR)
{
close (client_sockfd);
return err_msg;
}
/* read data keys */
bufptr = (char *) &dks;
len = sizeof (TDE_DATA_KEY_SET);
while (len > 0)
{
nbytes = read (client_sockfd, bufptr, len);
if (nbytes < 0)
{
switch (errno)
{
case EINTR:
case EAGAIN:
continue;
default:
{
close (client_sockfd);
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TDE_DK_SHARING_SOCK_READ, 0);
return ER_TDE_DK_SHARING_SOCK_READ;
}
}
}
bufptr += nbytes;
len -= nbytes;
}
memcpy (tde_Cipher.data_keys.perm_key, dks.perm_key, TDE_DATA_KEY_LENGTH);
memcpy (tde_Cipher.data_keys.temp_key, dks.temp_key, TDE_DATA_KEY_LENGTH);
memcpy (tde_Cipher.data_keys.log_key, dks.log_key, TDE_DATA_KEY_LENGTH);
tde_Cipher.is_loaded = true;
close (client_sockfd);
return NO_ERROR;
}
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
#else /* CS_MODE */
int
logwr_copy_log_file (const char *db_name, const char *log_path, int mode, INT64 start_page_id)
{
return ER_FAILED;
}
#endif /* !CS_MODE */
/*
* logwr_compute_page_checksum - Computes log page checksum.
* return: error code
* thread_p (in) : thread entry
* log_pgptr (in) : log page pointer
* checksum_crc32(out): computed checksum
* Note: Currently CRC32 is used as checksum.
* Note: this is a copy of logpb_compute_page_checksum
*/
static int
logwr_check_page_checksum (THREAD_ENTRY * thread_p, LOG_PAGE * log_pgptr)
{
int error_code = NO_ERROR, saved_checksum_crc32;
const int block_size = 4096;
const int max_num_pages = IO_MAX_PAGE_SIZE / block_size;
const int sample_nbytes = 16;
int sampling_offset;
char buf[max_num_pages * sample_nbytes * 2];
const int num_pages = LOG_PAGESIZE / block_size;
const size_t sizeof_buf = num_pages * sample_nbytes * 2;
int checksum_crc32;
assert (log_pgptr != NULL);
/* Save the old page checksum. */
saved_checksum_crc32 = log_pgptr->hdr.checksum;
if (saved_checksum_crc32 == 0)
{
return NO_ERROR;
}
/* Resets checksum to not affect the new computation. */
log_pgptr->hdr.checksum = 0;
char *p = buf;
for (int i = 0; i < num_pages; i++)
{
// first
sampling_offset = (i * block_size);
memcpy (p, ((char *) log_pgptr) + sampling_offset, sample_nbytes);
p += sample_nbytes;
// last
sampling_offset = (i * block_size) + (block_size - sample_nbytes);
memcpy (p, ((char *) log_pgptr) + sampling_offset, sample_nbytes);
p += sample_nbytes;
}
crypt_crc32 ((char *) buf, (int) sizeof_buf, &checksum_crc32);
/* Restores the saved checksum */
log_pgptr->hdr.checksum = saved_checksum_crc32;
if (checksum_crc32 != saved_checksum_crc32)
{
_er_log_debug (ARG_FILE_LINE,
"logwr_check_page_checksum: log page %lld has checksum = %d, computed checksum = %d\n",
(long long int) log_pgptr->hdr.logical_pageid, saved_checksum_crc32, checksum_crc32);
assert (false);
return ER_FAILED;
}
return error_code;
}
/*
* logwr_log_ha_filestat_to_string() - return the string alias of enum value
*
* return: constant string
*
* val(in):
*/
const char *
logwr_log_ha_filestat_to_string (enum LOG_HA_FILESTAT val)
{
switch (val)
{
case LOG_HA_FILESTAT_CLEAR:
return "CLEAR";
case LOG_HA_FILESTAT_ARCHIVED:
return "ARCHIVED";
case LOG_HA_FILESTAT_SYNCHRONIZED:
return "SYNCHRONIZED";
default:
return "UNKNOWN";
}
}
#ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG
TDE_ALGORITHM
logwr_get_tde_algorithm (const LOG_PAGE * log_pgptr)
{
/* exclusive */
assert (!((log_pgptr->hdr.flags & LOG_HDRPAGE_FLAG_ENCRYPTED_AES)
&& (log_pgptr->hdr.flags & LOG_HDRPAGE_FLAG_ENCRYPTED_ARIA)));
if (log_pgptr->hdr.flags & LOG_HDRPAGE_FLAG_ENCRYPTED_AES)
{
return TDE_ALGORITHM_AES;
}
else if (log_pgptr->hdr.flags & LOG_HDRPAGE_FLAG_ENCRYPTED_ARIA)
{
return TDE_ALGORITHM_ARIA;
}
else
{
return TDE_ALGORITHM_NONE;
}
}
void
logwr_set_tde_algorithm (THREAD_ENTRY * thread_p, LOG_PAGE * log_pgptr, const TDE_ALGORITHM tde_algo)
{
/* clear encrypted flag */
log_pgptr->hdr.flags &= ~LOG_HDRPAGE_FLAG_ENCRYPTED_MASK;
switch (tde_algo)
{
case TDE_ALGORITHM_AES:
log_pgptr->hdr.flags |= LOG_HDRPAGE_FLAG_ENCRYPTED_AES;
break;
case TDE_ALGORITHM_ARIA:
log_pgptr->hdr.flags |= LOG_HDRPAGE_FLAG_ENCRYPTED_ARIA;
break;
case TDE_ALGORITHM_NONE:
/* already cleared */
break;
}
}
#endif /* UNSTABLE_TDE_FOR_REPLICATION_LOG */
#if defined(SERVER_MODE)
static int logwr_register_writer_entry (LOGWR_ENTRY ** wr_entry_p, THREAD_ENTRY * thread_p, LOG_PAGEID fpageid,
int mode, bool copy_from_first_phy_page);
static bool logwr_unregister_writer_entry (LOGWR_ENTRY * wr_entry, int status);
static int logwr_pack_log_pages (THREAD_ENTRY * thread_p, char *logpg_area, int *logpg_used_size, int *status,
LOGWR_ENTRY * entry, bool copy_from_file);
static void logwr_cs_exit (THREAD_ENTRY * thread_p, bool * check_cs_own);
static void logwr_write_end (THREAD_ENTRY * thread_p, LOGWR_INFO * writer_info, LOGWR_ENTRY * entry, int status);
static void logwr_set_eof_lsa (THREAD_ENTRY * thread_p, LOGWR_ENTRY * entry);
static bool logwr_is_delayed (THREAD_ENTRY * thread_p, LOGWR_ENTRY * entry);
static void logwr_update_last_sent_eof_lsa (LOGWR_ENTRY * entry);
/*
* logwr_register_writer_entry -
*
* return:
*
* wr_entry_p(out):
* id(in):
* fpageid(in):
* mode(in):
*
* Note:
*/
static int
logwr_register_writer_entry (LOGWR_ENTRY ** wr_entry_p, THREAD_ENTRY * thread_p, LOG_PAGEID fpageid, int mode,
bool copy_from_first_phy_page)
{
LOGWR_ENTRY *entry;
int rv;
LOGWR_INFO *writer_info = log_Gl.writer_info;
*wr_entry_p = NULL;
rv = pthread_mutex_lock (&writer_info->wr_list_mutex);
entry = writer_info->writer_list;
while (entry)
{
if (entry->thread_p == thread_p)
{
break;
}
entry = entry->next;
}
if (entry == NULL)
{
entry = (LOGWR_ENTRY *) malloc (sizeof (LOGWR_ENTRY));
if (entry == NULL)
{
pthread_mutex_unlock (&writer_info->wr_list_mutex);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (LOGWR_ENTRY));
return ER_OUT_OF_VIRTUAL_MEMORY;
}
entry->thread_p = thread_p;
entry->fpageid = fpageid;
entry->mode = (LOGWR_MODE) mode;
entry->start_copy_time = 0;
entry->copy_from_first_phy_page = copy_from_first_phy_page;
entry->status = LOGWR_STATUS_DELAY;
LSA_SET_NULL (&entry->eof_lsa);
LSA_SET_NULL (&entry->last_sent_eof_lsa);
LSA_SET_NULL (&entry->tmp_last_sent_eof_lsa);
entry->next = writer_info->writer_list;
writer_info->writer_list = entry;
}
else
{
entry->fpageid = fpageid;
entry->mode = (LOGWR_MODE) mode;
entry->copy_from_first_phy_page = copy_from_first_phy_page;
if (entry->status != LOGWR_STATUS_DELAY)
{
entry->status = LOGWR_STATUS_WAIT;
entry->start_copy_time = 0;
}
}
pthread_mutex_unlock (&writer_info->wr_list_mutex);
*wr_entry_p = entry;
return NO_ERROR;
}
/*
* logwr_unregister_writer_entry -
*
* return:
*
* wr_entry(in):
* status(in):
*
* Note:
*/
static bool
logwr_unregister_writer_entry (LOGWR_ENTRY * wr_entry, int status)
{
LOGWR_ENTRY *entry;
bool is_all_done;
int rv;
LOGWR_INFO *writer_info = log_Gl.writer_info;
rv = pthread_mutex_lock (&writer_info->wr_list_mutex);
wr_entry->status = (LOGWR_STATUS) status;
entry = writer_info->writer_list;
while (entry)
{
if (entry->status == LOGWR_STATUS_FETCH)
{
break;
}
entry = entry->next;
}
is_all_done = (entry == NULL) ? true : false;
if (status == LOGWR_STATUS_ERROR)
{
LOGWR_ENTRY *prev_entry = NULL;
entry = writer_info->writer_list;
while (entry)
{
if (entry == wr_entry)
{
if (entry == writer_info->writer_list)
{
writer_info->writer_list = entry->next;
}
else
{
prev_entry->next = entry->next;
}
free_and_init (entry);
break;
}
prev_entry = entry;
entry = entry->next;
}
}
pthread_mutex_unlock (&writer_info->wr_list_mutex);
return is_all_done;
}
/*
* logwr_pack_log_pages -
*
* return:
*
* thread_p(in):
* logpg_area(in):
* logpg_used_size(out):
* status(out): LOGWR_STATUS_DONE, LOGWR_STATUS_DELAY or LOGWR_STATUS_ERROR
* entry(in):
*
* Note:
*/
static int
logwr_pack_log_pages (THREAD_ENTRY * thread_p, char *logpg_area, int *logpg_used_size, int *status, LOGWR_ENTRY * entry,
bool copy_from_file)
{
LOG_PAGEID fpageid, lpageid, pageid;
char *p;
LOG_PAGE *log_pgptr;
UINT64 num_logpgs;
LOG_LSA nxio_lsa = LSA_INITIALIZER;
bool is_hdr_page_only;
int ha_file_status;
int error_code;
LOG_ARV_HEADER arvhdr;
LOG_HEADER *hdr_ptr;
int nxarv_num = 0;
LOG_PAGEID nxarv_pageid = NULL_PAGEID, nxarv_phy_pageid = NULL_PAGEID;
LOG_LSA eof_lsa;
fpageid = NULL_PAGEID;
lpageid = NULL_PAGEID;
ha_file_status = LOG_HA_FILESTAT_CLEAR;
is_hdr_page_only = (entry->fpageid == LOGPB_HEADER_PAGE_ID);
if (is_hdr_page_only == true && entry->copy_from_first_phy_page == true)
{
assert_release (false);
error_code = ER_FAILED;
goto error;
}
if (LSA_ISNULL (&entry->eof_lsa))
{
LSA_COPY (&eof_lsa, &log_Gl.hdr.eof_lsa);
}
else
{
LSA_COPY (&eof_lsa, &entry->eof_lsa);
}
if (entry->copy_from_first_phy_page == true)
{
fpageid = entry->fpageid;
if (fpageid == NULL_PAGEID)
{
fpageid = logpb_find_oldest_available_page_id (thread_p);
if (fpageid == NULL_PAGEID)
{
error_code = ER_FAILED;
goto error;
}
}
if (logpb_is_page_in_archive (fpageid) == true)
{
if (logpb_fetch_from_archive (thread_p, fpageid, NULL, NULL, &arvhdr, false) == NULL)
{
error_code = ER_FAILED;
goto error;
}
nxarv_phy_pageid = 1; /* first physical page id */
nxarv_pageid = arvhdr.fpageid;
nxarv_num = arvhdr.arv_num;
}
else
{
nxarv_phy_pageid = log_Gl.hdr.nxarv_phy_pageid;
nxarv_pageid = log_Gl.hdr.nxarv_pageid;
nxarv_num = log_Gl.hdr.nxarv_num;
}
lpageid = nxarv_pageid;
fpageid = nxarv_pageid;
}
else if (!is_hdr_page_only)
{
/* Find the first pageid to be packed */
fpageid = entry->fpageid;
if (fpageid == NULL_PAGEID)
{
/* In case of first request from the log writer, pack all active pages to be flushed until now */
fpageid = log_Gl.hdr.nxarv_pageid;
}
else
{
nxio_lsa = log_Gl.append.get_nxio_lsa ();
if (fpageid > nxio_lsa.pageid)
{
fpageid = nxio_lsa.pageid;
}
}
/* Find the last pageid which is bounded by several limitations */
if (!logpb_is_page_in_archive (fpageid))
{
lpageid = eof_lsa.pageid;
}
else
{
LOG_ARV_HEADER arvhdr;
/* If the fpageid is in archive log, fetch the page and the header page in the archive */
if (logpb_fetch_from_archive (thread_p, fpageid, NULL, NULL, &arvhdr, false) == NULL)
{
error_code = ER_FAILED;
goto error;
}
/* Reset the lpageid with the last pageid in the archive */
lpageid = arvhdr.fpageid + arvhdr.npages - 1;
if (fpageid == arvhdr.fpageid)
{
ha_file_status = LOG_HA_FILESTAT_ARCHIVED;
}
}
/* Pack the pages which can be in the page area of Log Writer */
if (((size_t) (lpageid - fpageid + 1)) > (LOGWR_COPY_LOG_BUFFER_NPAGES - 1))
{
lpageid = fpageid + (LOGWR_COPY_LOG_BUFFER_NPAGES - 1) - 1;
}
if (lpageid == eof_lsa.pageid)
{
ha_file_status = LOG_HA_FILESTAT_SYNCHRONIZED;
}
}
/* Set the server status on the header information */
log_Gl.hdr.ha_server_state = css_ha_server_state ();
log_Gl.hdr.ha_file_status = ha_file_status;
/* Allocate the log page area */
num_logpgs = (is_hdr_page_only) ? 1 : (UINT64) ((lpageid - fpageid + 1) + 1);
assert (lpageid >= fpageid);
assert (num_logpgs <= LOGWR_COPY_LOG_BUFFER_NPAGES);
p = logpg_area;
/* Fill the header page */
log_pgptr = (LOG_PAGE *) p;
log_pgptr->hdr = log_Gl.loghdr_pgptr->hdr;
memcpy (log_pgptr->area, &log_Gl.hdr, sizeof (log_Gl.hdr));
hdr_ptr = (LOG_HEADER *) (log_pgptr->area);
if (entry->copy_from_first_phy_page == true)
{
hdr_ptr->nxarv_phy_pageid = (LOG_PHY_PAGEID) nxarv_phy_pageid;
hdr_ptr->nxarv_pageid = nxarv_pageid;
hdr_ptr->nxarv_num = nxarv_num;
}
LSA_COPY (&hdr_ptr->eof_lsa, &eof_lsa);
p += LOG_PAGESIZE;
/* Fill the page array with the pages to send */
if (!is_hdr_page_only)
{
for (pageid = fpageid; pageid >= 0 && pageid <= lpageid; pageid++)
{
log_pgptr = (LOG_PAGE *) p;
if (copy_from_file == true)
{
if (logpb_copy_page_from_file (thread_p, pageid, log_pgptr) != NO_ERROR)
{
error_code = ER_FAILED;
goto error;
}
}
else
{
if (logpb_copy_page_from_log_buffer (thread_p, pageid, log_pgptr) != NO_ERROR)
{
error_code = ER_FAILED;
goto error;
}
}
if (pageid >= nxio_lsa.pageid)
{
/* page is not flushed yet, may be changed : update checksum before send */
(void) logpb_set_page_checksum (thread_p, log_pgptr);
}
else
{
(void) logwr_check_page_checksum (thread_p, log_pgptr);
}
assert (pageid == (log_pgptr->hdr.logical_pageid));
p += LOG_PAGESIZE;
}
}
*logpg_used_size = (int) (p - logpg_area);
/* In case that EOL exists at lpageid */
if (!is_hdr_page_only && (lpageid >= eof_lsa.pageid))
{
*status = LOGWR_STATUS_DONE;
LSA_COPY (&entry->tmp_last_sent_eof_lsa, &eof_lsa);
}
else
{
*status = LOGWR_STATUS_DELAY;
entry->tmp_last_sent_eof_lsa.pageid = lpageid;
entry->tmp_last_sent_eof_lsa.offset = NULL_OFFSET;
}
logwr_er_log ("logwr_pack_log_pages, fpageid(%lld), lpageid(%lld), num_pages(%lld),"
"\n status(%d)\n", fpageid, lpageid, num_logpgs, entry->status);
return NO_ERROR;
error:
*logpg_used_size = 0;
*status = LOGWR_STATUS_ERROR;
return error_code;
}
static void
logwr_cs_exit (THREAD_ENTRY * thread_p, bool * check_cs_own)
{
if (*check_cs_own)
{
*check_cs_own = false;
LOG_CS_EXIT (thread_p);
}
return;
}
static void
logwr_write_end (THREAD_ENTRY * thread_p, LOGWR_INFO * writer_info, LOGWR_ENTRY * entry, int status)
{
int rv;
int tran_index;
int prev_status;
INT64 saved_start_time;
rv = pthread_mutex_lock (&writer_info->flush_end_mutex);
prev_status = entry->status;
saved_start_time = entry->start_copy_time;
if (entry != NULL && logwr_unregister_writer_entry (entry, status))
{
if (prev_status == LOGWR_STATUS_FETCH && writer_info->trace_last_writer == true)
{
assert (saved_start_time > 0);
writer_info->last_writer_elapsed_time = log_get_clock_msec () - saved_start_time;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
logtb_get_client_ids (tran_index, &writer_info->last_writer_client_info);
}
pthread_cond_signal (&writer_info->flush_end_cond);
}
pthread_mutex_unlock (&writer_info->flush_end_mutex);
return;
}
static void
logwr_set_eof_lsa (THREAD_ENTRY * thread_p, LOGWR_ENTRY * entry)
{
if (LSA_ISNULL (&entry->eof_lsa))
{
LOG_CS_ENTER (thread_p);
LSA_COPY (&entry->eof_lsa, &log_Gl.hdr.eof_lsa);
LOG_CS_EXIT (thread_p);
}
return;
}
static bool
logwr_is_delayed (THREAD_ENTRY * thread_p, LOGWR_ENTRY * entry)
{
logwr_set_eof_lsa (thread_p, entry);
if (entry == NULL || LSA_ISNULL (&entry->last_sent_eof_lsa) || LSA_GE (&entry->last_sent_eof_lsa, &entry->eof_lsa))
{
return false;
}
return true;
}
static void
logwr_update_last_sent_eof_lsa (LOGWR_ENTRY * entry)
{
if (entry)
{
LSA_COPY (&entry->last_sent_eof_lsa, &entry->tmp_last_sent_eof_lsa);
}
return;
}
/*
* xlogwr_get_log_pages -
*
* return:
*
* thread_p(in):
* first_pageid(in):
* mode(in):
*
* Note:
*/
int
xlogwr_get_log_pages (THREAD_ENTRY * thread_p, LOG_PAGEID first_pageid, LOGWR_MODE mode)
{
LOGWR_ENTRY *entry;
char *logpg_area;
int logpg_used_size;
LOG_PAGEID next_fpageid;
LOGWR_MODE next_mode;
LOGWR_MODE orig_mode = LOGWR_MODE_ASYNC;
int status;
int timeout;
int rv;
int error_code;
bool check_cs_own = false;
bool is_interrupted = false;
bool copy_from_file = false;
bool need_cs_exit_after_send = true;
struct timespec to;
LOGWR_INFO *writer_info = log_Gl.writer_info;
bool copy_from_first_phy_page = false;
logpg_used_size = 0;
logpg_area = (char *) malloc (LOGWR_COPY_LOG_BUFFER_NPAGES * LOG_PAGESIZE);
if (logpg_area == NULL)
{
return ER_OUT_OF_VIRTUAL_MEMORY;
}
if (thread_p->conn_entry)
{
thread_p->conn_entry->stop_phase = THREAD_STOP_LOGWR;
}
while (true)
{
if (mode & LOGWR_COPY_FROM_FIRST_PHY_PAGE_MASK)
{
copy_from_first_phy_page = true;
}
else
{
copy_from_first_phy_page = false;
}
mode = (LOGWR_MODE) (mode & ~LOGWR_COPY_FROM_FIRST_PHY_PAGE_MASK);
/* In case that a non-ASYNC mode client internally uses ASYNC mode */
orig_mode = MAX (mode, orig_mode);
logwr_er_log ("[tid:%ld] xlogwr_get_log_pages, fpageid(%lld), mode(%s)\n",
thread_p->get_posix_id (), first_pageid,
(mode == LOGWR_MODE_SYNC ? "sync" : (mode == LOGWR_MODE_ASYNC ? "async" : "semisync")));
/* Register the writer at the list and wait until LFT start to work */
rv = pthread_mutex_lock (&writer_info->flush_start_mutex);
error_code = logwr_register_writer_entry (&entry, thread_p, first_pageid, mode, copy_from_first_phy_page);
if (error_code != NO_ERROR)
{
pthread_mutex_unlock (&writer_info->flush_start_mutex);
status = LOGWR_STATUS_ERROR;
goto error;
}
if (entry->status == LOGWR_STATUS_WAIT)
{
bool continue_checking = true;
if (mode == LOGWR_MODE_ASYNC)
{
timeout = LOGWR_THREAD_SUSPEND_TIMEOUT;
to.tv_sec = time (NULL) + timeout;
to.tv_nsec = 0;
}
else
{
timeout = INF_WAIT;
to.tv_sec = to.tv_nsec = 0;
}
rv =
thread_suspend_with_other_mutex (thread_p, &writer_info->flush_start_mutex, timeout, &to,
THREAD_LOGWR_SUSPENDED);
if (rv == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
pthread_mutex_unlock (&writer_info->flush_start_mutex);
rv = pthread_mutex_lock (&writer_info->flush_end_mutex);
if (logwr_unregister_writer_entry (entry, LOGWR_STATUS_DELAY))
{
pthread_cond_signal (&writer_info->flush_end_cond);
}
pthread_mutex_unlock (&writer_info->flush_end_mutex);
continue;
}
else if (rv == ER_CSS_PTHREAD_MUTEX_LOCK || rv == ER_CSS_PTHREAD_MUTEX_UNLOCK
|| rv == ER_CSS_PTHREAD_COND_WAIT)
{
pthread_mutex_unlock (&writer_info->flush_start_mutex);
error_code = ER_FAILED;
status = LOGWR_STATUS_ERROR;
goto error;
}
pthread_mutex_unlock (&writer_info->flush_start_mutex);
if (logtb_is_interrupted (thread_p, false, &continue_checking))
{
/* interrupted, shutdown or connection has gone. */
error_code = ER_INTERRUPTED;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error_code, 0);
status = LOGWR_STATUS_ERROR;
goto error;
}
else if (thread_p->resume_status == THREAD_RESUME_DUE_TO_INTERRUPT)
{
if (logwr_is_delayed (thread_p, entry))
{
is_interrupted = true;
logwr_write_end (thread_p, writer_info, entry, LOGWR_STATUS_DELAY);
continue;
}
error_code = ER_INTERRUPTED;
status = LOGWR_STATUS_ERROR;
goto error;
}
else if (thread_p->resume_status != THREAD_LOGWR_RESUMED)
{
error_code = ER_FAILED;
status = LOGWR_STATUS_ERROR;
goto error;
}
}
else
{
assert (entry->status == LOGWR_STATUS_DELAY);
pthread_mutex_unlock (&writer_info->flush_start_mutex);
LOG_CS_ENTER (thread_p);
check_cs_own = true;
}
if (thread_p->resume_status == THREAD_RESUME_DUE_TO_INTERRUPT)
{
logwr_set_eof_lsa (thread_p, entry);
is_interrupted = true;
}
copy_from_file = (is_interrupted) ? true : false;
/* Send the log pages to be flushed until now */
error_code = logwr_pack_log_pages (thread_p, logpg_area, &logpg_used_size, &status, entry, copy_from_file);
if (error_code != NO_ERROR)
{
error_code = ER_HA_LW_FAILED_GET_LOG_PAGE;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error_code, 1, first_pageid);
status = LOGWR_STATUS_ERROR;
goto error;
}
/* wait until LFT finishes flushing */
rv = pthread_mutex_lock (&writer_info->flush_wait_mutex);
if (entry->status == LOGWR_STATUS_FETCH && writer_info->flush_completed == false)
{
rv = pthread_cond_wait (&writer_info->flush_wait_cond, &writer_info->flush_wait_mutex);
assert_release (writer_info->flush_completed == true);
}
rv = pthread_mutex_unlock (&writer_info->flush_wait_mutex);
if (entry->status == LOGWR_STATUS_FETCH)
{
rv = pthread_mutex_lock (&writer_info->wr_list_mutex);
entry->start_copy_time = log_get_clock_msec ();
pthread_mutex_unlock (&writer_info->wr_list_mutex);
}
/* In case of async mode, unregister the writer and wakeup LFT to finish */
/*
* The result mode is the following.
*
* transition \ req mode | req_sync req_async ----------------------------------------- delay -> delay | n/a
* ASYNC delay -> done | n/a SYNC wait -> delay | SYNC ASYNC wait -> done | SYNC ASYNC */
if (orig_mode == LOGWR_MODE_ASYNC
|| (mode == LOGWR_MODE_ASYNC && (entry->status != LOGWR_STATUS_DELAY || status != LOGWR_STATUS_DONE)))
{
logwr_cs_exit (thread_p, &check_cs_own);
logwr_write_end (thread_p, writer_info, entry, status);
need_cs_exit_after_send = false;
}
/* the transmission is performed asynchronously, but waits for a response immediately below. */
/* so it behaves essentially the same as sync. therefore, the log page will not be overwritten. */
error_code = xlog_send_log_pages_to_client (thread_p, logpg_area, logpg_used_size, mode);
if (error_code != NO_ERROR)
{
status = LOGWR_STATUS_ERROR;
goto error;
}
/* Get the next request from the client and reset the arguments */
if (need_cs_exit_after_send == true)
{
error_code =
xlog_get_page_request_with_reply (thread_p, &next_fpageid, &next_mode,
prm_get_integer_value (PRM_ID_HA_COPY_LOG_TIMEOUT));
}
else
{
error_code = xlog_get_page_request_with_reply (thread_p, &next_fpageid, &next_mode, -1);
}
if (error_code != NO_ERROR)
{
status = LOGWR_STATUS_ERROR;
goto error;
}
logwr_update_last_sent_eof_lsa (entry);
/* In case of sync mode, unregister the writer and wakeup LFT to finish */
if (need_cs_exit_after_send)
{
logwr_cs_exit (thread_p, &check_cs_own);
logwr_write_end (thread_p, writer_info, entry, status);
}
/* Reset the arguments for the next request */
first_pageid = next_fpageid;
mode = next_mode;
need_cs_exit_after_send = true;
if (mode & LOGWR_COPY_FROM_FIRST_PHY_PAGE_MASK)
{
assert_release (false);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, "Unexpected copy mode from copylogdb");
error_code = ER_HA_GENERIC_ERROR;
status = LOGWR_STATUS_ERROR;
goto error;
}
}
assert_release (false);
return ER_FAILED;
error:
logwr_er_log ("[tid:%ld] xlogwr_get_log_pages, error(%d)\n", thread_p->get_posix_id (), error_code);
logwr_cs_exit (thread_p, &check_cs_own);
logwr_write_end (thread_p, writer_info, entry, status);
free_and_init (logpg_area);
return error_code;
}
/*
* logwr_get_min_copied_fpageid -
*
* return:
*
* Note:
*/
LOG_PAGEID
logwr_get_min_copied_fpageid (void)
{
LOGWR_INFO *writer_info = log_Gl.writer_info;
LOGWR_ENTRY *entry;
int num_entries = 0;
LOG_PAGEID min_fpageid = LOGPAGEID_MAX;
int rv;
rv = pthread_mutex_lock (&writer_info->wr_list_mutex);
entry = writer_info->writer_list;
while (entry)
{
if (min_fpageid > entry->fpageid)
{
min_fpageid = entry->fpageid;
}
entry = entry->next;
num_entries++;
}
pthread_mutex_unlock (&writer_info->wr_list_mutex);
if (min_fpageid == LOGPAGEID_MAX || min_fpageid == LOGPB_HEADER_PAGE_ID)
{
min_fpageid = NULL_PAGEID;
}
return (min_fpageid);
}
#endif /* SERVER_MODE */
#if defined(CS_MODE)
void
logwr_dump_json_string_value (FILE * out, const char *str)
{
const unsigned char *p;
if (str == NULL)
{
fprintf (out, "null");
return;
}
fputc ('"', out);
for (p = (const unsigned char *) str; *p != '\0'; p++)
{
switch (*p)
{
case '"':
fprintf (out, "\\\"");
break;
case '\\':
fprintf (out, "\\\\");
break;
case '\b':
fprintf (out, "\\b");
break;
case '\f':
fprintf (out, "\\f");
break;
case '\n':
fprintf (out, "\\n");
break;
case '\r':
fprintf (out, "\\r");
break;
case '\t':
fprintf (out, "\\t");
break;
default:
if (*p < 0x20)
{
fprintf (out, "\\u%04x", (unsigned int) *p);
}
else
{
fputc (*p, out);
}
break;
}
}
fputc ('"', out);
}
void
logwr_dump_json_pointer_value (FILE * out, const void *ptr)
{
if (ptr == NULL)
{
fprintf (out, "null");
return;
}
fprintf (out, "\"%p\"", ptr);
}
static void
logwr_dump_json_string_field (FILE * out, int indent, const char *name, const char *value, bool has_comma)
{
fprintf (out, "%*s\"%s\": ", indent, "", name);
logwr_dump_json_string_value (out, value);
fprintf (out, "%s\n", has_comma ? "," : "");
}
void
logwr_dump_logwr_gl (FILE * out)
{
int indent = 2;
LOG_PAGE *loghdr_pgptr = logwr_Gl.loghdr_pgptr;
if (out == NULL)
{
out = stdout;
}
fprintf (out, "{\n");
fprintf (out, "%*s\"logwr_global\": {\n", indent, "");
logwr_dump_log_header (out, &logwr_Gl.hdr, indent + 2);
fprintf (out, ",\n\n");
logwr_dump_log_page_hdr (out, loghdr_pgptr != NULL ? &loghdr_pgptr->hdr : NULL, indent + 2);
fprintf (out, ",\n\n");
logwr_dump_logwr_gl_topfields (out, indent + 2);
fprintf (out, "%*s}\n", indent, "");
fprintf (out, "}\n");
fflush (out);
}
void
logwr_dump_log_lsa (FILE * out, const LOG_LSA * lsa, int indent)
{
if (lsa == NULL)
{
fprintf (out, "null");
return;
}
fprintf (out, "{\n");
fprintf (out, "%*s\"pageid\": %lld,\n", indent + 2, "", (long long) lsa->pageid);
fprintf (out, "%*s\"offset\": %lld\n", indent + 2, "", (long long) lsa->offset);
fprintf (out, "%*s}", indent, "");
}
void
logwr_dump_log_header (FILE * out, const LOG_HEADER * hdr, int indent)
{
const char *ha_file_status = NULL;
const char *ha_server_state = NULL;
if (hdr == NULL)
{
fprintf (out, "%*s\"log_header\": null", indent, "");
return;
}
switch (hdr->ha_file_status)
{
case LOG_HA_FILESTAT_CLEAR:
ha_file_status = "LOG_HA_FILESTAT_CLEAR";
break;
case LOG_HA_FILESTAT_ARCHIVED:
ha_file_status = "LOG_HA_FILESTAT_ARCHIVED";
break;
case LOG_HA_FILESTAT_SYNCHRONIZED:
ha_file_status = "LOG_HA_FILESTAT_SYNCHRONIZED";
break;
default:
ha_file_status = "UNKNOWN";
break;
}
switch (hdr->ha_server_state)
{
case HA_SERVER_STATE_NA:
ha_server_state = "HA_SERVER_STATE_NA";
break;
case HA_SERVER_STATE_IDLE:
ha_server_state = "HA_SERVER_STATE_IDLE";
break;
case HA_SERVER_STATE_ACTIVE:
ha_server_state = "HA_SERVER_STATE_ACTIVE";
break;
case HA_SERVER_STATE_TO_BE_ACTIVE:
ha_server_state = "HA_SERVER_STATE_TO_BE_ACTIVE";
break;
case HA_SERVER_STATE_STANDBY:
ha_server_state = "HA_SERVER_STATE_STANDBY";
break;
case HA_SERVER_STATE_TO_BE_STANDBY:
ha_server_state = "HA_SERVER_STATE_TO_BE_STANDBY";
break;
case HA_SERVER_STATE_MAINTENANCE:
ha_server_state = "HA_SERVER_STATE_MAINTENANCE";
break;
case HA_SERVER_STATE_DEAD:
ha_server_state = "HA_SERVER_STATE_DEAD";
break;
default:
ha_server_state = "UNKNOWN";
break;
}
fprintf (out, "%*s\"log_header\": {\n", indent, "");
logwr_dump_json_string_field (out, indent + 2, "magic", hdr->magic, true);
fprintf (out, "%*s\"db_creation\": %lld,\n", indent + 2, "", (long long) hdr->db_creation);
fprintf (out, "%*s\"vol_creation\": %lld,\n", indent + 2, "", (long long) hdr->vol_creation);
logwr_dump_json_string_field (out, indent + 2, "db_release", hdr->db_release, true);
fprintf (out, "%*s\"db_compatibility\": %f,\n", indent + 2, "", hdr->db_compatibility);
fprintf (out, "%*s\"db_iopagesize\": %d,\n", indent + 2, "", hdr->db_iopagesize);
fprintf (out, "%*s\"db_logpagesize\": %d,\n", indent + 2, "", hdr->db_logpagesize);
fprintf (out, "%*s\"is_shutdown\": %s,\n", indent + 2, "", hdr->is_shutdown ? "true" : "false");
fprintf (out, "%*s\"next_trid\": %d,\n", indent + 2, "", hdr->next_trid);
fprintf (out, "%*s\"mvcc_next_id\": %llu,\n", indent + 2, "", (unsigned long long) hdr->mvcc_next_id);
fprintf (out, "%*s\"avg_ntrans\": %d,\n", indent + 2, "", hdr->avg_ntrans);
fprintf (out, "%*s\"avg_nlocks\": %d,\n", indent + 2, "", hdr->avg_nlocks);
fprintf (out, "%*s\"npages\": %d,\n", indent + 2, "", hdr->npages);
fprintf (out, "%*s\"db_charset\": %d,\n", indent + 2, "", hdr->db_charset);
fprintf (out, "%*s\"was_copied\": %s,\n", indent + 2, "", hdr->was_copied ? "true" : "false");
fprintf (out, "%*s\"fpageid\": %lld,\n", indent + 2, "", (long long) hdr->fpageid);
fprintf (out, "%*s\"append_lsa\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->append_lsa, indent + 4);
fprintf (out, ",\n\n");
fprintf (out, "%*s\"chkpt_lsa\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->chkpt_lsa, indent + 4);
fprintf (out, ",\n\n");
fprintf (out, "%*s\"nxarv_pageid\": %lld,\n", indent + 2, "", (long long) hdr->nxarv_pageid);
fprintf (out, "%*s\"nxarv_phy_pageid\": %lld,\n", indent + 2, "", (long long) hdr->nxarv_phy_pageid);
fprintf (out, "%*s\"nxarv_num\": %d,\n", indent + 2, "", hdr->nxarv_num);
fprintf (out, "%*s\"last_arv_num_for_syscrashes\": %d,\n", indent + 2, "", hdr->last_arv_num_for_syscrashes);
fprintf (out, "%*s\"last_deleted_arv_num\": %d,\n", indent + 2, "", hdr->last_deleted_arv_num);
fprintf (out, "%*s\"bkup_level0_lsa\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->bkup_level0_lsa, indent + 4);
fprintf (out, ",\n");
fprintf (out, "%*s\"bkup_level1_lsa\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->bkup_level1_lsa, indent + 4);
fprintf (out, ",\n");
fprintf (out, "%*s\"bkup_level2_lsa\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->bkup_level2_lsa, indent + 4);
fprintf (out, ",\n\n");
fprintf (out, "%*s\"bkinfo\": [\n", indent + 2, "");
for (int i = 0; i < FILEIO_BACKUP_UNDEFINED_LEVEL; i++)
{
fprintf (out,
"%*s{ \"bkup_attime\": %lld, \"io_baseln_time\": %lld, \"io_bkuptime\": %lld, "
"\"ndirty_pages_post_bkup\": %d, \"io_numpages\": %d }%s\n",
indent + 4, "",
(long long) hdr->bkinfo[i].bkup_attime,
(long long) hdr->bkinfo[i].io_baseln_time,
(long long) hdr->bkinfo[i].io_bkuptime,
hdr->bkinfo[i].ndirty_pages_post_bkup,
hdr->bkinfo[i].io_numpages, (i + 1 < FILEIO_BACKUP_UNDEFINED_LEVEL) ? "," : "");
}
fprintf (out, "%*s],\n\n", indent + 2, "");
logwr_dump_json_string_field (out, indent + 2, "prefix_name", hdr->prefix_name, true);
fprintf (out, "%*s\"has_logging_been_skipped\": %s,\n", indent + 2, "",
hdr->has_logging_been_skipped ? "true" : "false");
fprintf (out, "%*s\"vacuum_last_blockid\": %lld,\n", indent + 2, "", (long long) hdr->vacuum_last_blockid);
fprintf (out, "%*s\"perm_status_obsolete\": %d,\n", indent + 2, "", hdr->perm_status_obsolete);
logwr_dump_json_string_field (out, indent + 2, "ha_server_state", ha_server_state, true);
logwr_dump_json_string_field (out, indent + 2, "ha_file_status", ha_file_status, true);
fprintf (out, "%*s\"eof_lsa\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->eof_lsa, indent + 4);
fprintf (out, ",\n\n");
fprintf (out, "%*s\"smallest_lsa_at_last_chkpt\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->smallest_lsa_at_last_chkpt, indent + 4);
fprintf (out, ",\n\n");
fprintf (out, "%*s\"mvcc_op_log_lsa\": ", indent + 2, "");
logwr_dump_log_lsa (out, &hdr->mvcc_op_log_lsa, indent + 4);
fprintf (out, ",\n\n");
fprintf (out, "%*s\"oldest_visible_mvccid\": %llu,\n", indent + 2, "",
(unsigned long long) hdr->oldest_visible_mvccid);
fprintf (out, "%*s\"newest_block_mvccid\": %llu,\n", indent + 2, "", (unsigned long long) hdr->newest_block_mvccid);
fprintf (out, "%*s\"ha_promotion_time\": %lld,\n", indent + 2, "", (long long) hdr->ha_promotion_time);
fprintf (out, "%*s\"db_restore_time\": %lld,\n", indent + 2, "", (long long) hdr->db_restore_time);
fprintf (out, "%*s\"mark_will_del\": %s,\n", indent + 2, "", hdr->mark_will_del ? "true" : "false");
fprintf (out, "%*s\"does_block_need_vacuum\": %s,\n", indent + 2, "", hdr->does_block_need_vacuum ? "true" : "false");
fprintf (out, "%*s\"was_active_log_reset\": %s\n", indent + 2, "", hdr->was_active_log_reset ? "true" : "false");
fprintf (out, "%*s}", indent, "");
}
void
logwr_dump_log_arv_header (FILE * out, const LOG_ARV_HEADER * arv_hdr, int indent)
{
if (arv_hdr == NULL)
{
fprintf (out, "%*s\"log_arv_header\": null", indent, "");
return;
}
fprintf (out, "%*s\"log_arv_header\": {\n", indent, "");
fprintf (out, "%*s\"db_creation\": %lld,\n", indent + 2, "", (long long) arv_hdr->db_creation);
fprintf (out, "%*s\"vol_creation\": %lld,\n", indent + 2, "", (long long) arv_hdr->vol_creation);
fprintf (out, "%*s\"next_trid\": %d,\n", indent + 2, "", arv_hdr->next_trid);
fprintf (out, "%*s\"npages\": %d,\n", indent + 2, "", arv_hdr->npages);
fprintf (out, "%*s\"fpageid\": %lld,\n", indent + 2, "", (long long) arv_hdr->fpageid);
fprintf (out, "%*s\"arv_num\": %d\n", indent + 2, "", arv_hdr->arv_num);
fprintf (out, "%*s}", indent, "");
}
void
logwr_dump_log_page_hdr (FILE * out, const LOG_HDRPAGE * p, int indent)
{
if (p == NULL)
{
fprintf (out, "%*s\"log_page_hdr\": null", indent, "");
return;
}
fprintf (out, "%*s\"log_page_hdr\": {\n", indent, "");
fprintf (out, "%*s\"logical_pageid\": %lld,\n", indent + 2, "", (long long) p->logical_pageid);
fprintf (out, "%*s\"offset\": %d,\n", indent + 2, "", p->offset);
fprintf (out, "%*s\"flags\": %d,\n", indent + 2, "", p->flags);
fprintf (out, "%*s\"checksum\": %d\n", indent + 2, "", p->checksum);
fprintf (out, "%*s}", indent, "");
}
static void
logwr_dump_logwr_action_item (FILE * out, const char *action_string, bool * need_separator)
{
if (*need_separator)
{
fprintf (out, ", ");
}
logwr_dump_json_string_value (out, action_string);
*need_separator = true;
}
static void
logwr_dump_logwr_action (FILE * out, LOGWR_ACTION action)
{
LOGWR_ACTION known_actions =
(LOGWR_ACTION) (LOGWR_ACTION_DELAYED_WRITE | LOGWR_ACTION_ASYNC_WRITE | LOGWR_ACTION_HDR_WRITE
| LOGWR_ACTION_ARCHIVING);
bool need_separator = false;
fprintf (out, "[");
if (action == LOGWR_ACTION_NONE)
{
logwr_dump_logwr_action_item (out, "LOGWR_ACTION_NONE", &need_separator);
}
else
{
if (action & LOGWR_ACTION_DELAYED_WRITE)
{
logwr_dump_logwr_action_item (out, "LOGWR_ACTION_DELAYED_WRITE", &need_separator);
}
if (action & LOGWR_ACTION_ASYNC_WRITE)
{
logwr_dump_logwr_action_item (out, "LOGWR_ACTION_ASYNC_WRITE", &need_separator);
}
if (action & LOGWR_ACTION_HDR_WRITE)
{
logwr_dump_logwr_action_item (out, "LOGWR_ACTION_HDR_WRITE", &need_separator);
}
if (action & LOGWR_ACTION_ARCHIVING)
{
logwr_dump_logwr_action_item (out, "LOGWR_ACTION_ARCHIVING", &need_separator);
}
if (action & ~known_actions)
{
char unknown_action[64];
snprintf (unknown_action, sizeof (unknown_action), "UNKNOWN_BITS(0x%x)",
(unsigned int) (action & ~known_actions));
logwr_dump_logwr_action_item (out, unknown_action, &need_separator);
}
}
fprintf (out, "]");
}
void
logwr_dump_logwr_gl_topfields (FILE * out, int indent)
{
const char *mode;
long long copy_gap_pages;
long long archive_gap_pages;
long long last_arv_page_count;
time_t now = time (NULL);
switch (logwr_Gl.mode)
{
case LOGWR_MODE_ASYNC:
mode = "LOGWR_MODE_ASYNC";
break;
case LOGWR_MODE_SEMISYNC:
mode = "LOGWR_MODE_SEMISYNC";
break;
case LOGWR_MODE_SYNC:
mode = "LOGWR_MODE_SYNC";
break;
default:
mode = "LOGWR_MODE_UNKNOWN";
break;
}
copy_gap_pages =
(logwr_Gl.hdr.eof_lsa.pageid != NULL_PAGEID && logwr_Gl.last_recv_pageid != NULL_PAGEID)
? (long long) (logwr_Gl.hdr.eof_lsa.pageid - logwr_Gl.last_recv_pageid) : -1;
archive_gap_pages =
(logwr_Gl.hdr.nxarv_pageid != NULL_PAGEID && logwr_Gl.last_arv_fpageid != NULL_PAGEID)
? (long long) (logwr_Gl.hdr.nxarv_pageid - logwr_Gl.last_arv_fpageid) : -1;
last_arv_page_count =
(logwr_Gl.last_arv_fpageid != NULL_PAGEID && logwr_Gl.last_arv_lpageid >= logwr_Gl.last_arv_fpageid)
? (long long) (logwr_Gl.last_arv_lpageid - logwr_Gl.last_arv_fpageid + 1) : 0;
fprintf (out, "%*s\"support_summary\": {\n", indent, "");
fprintf (out, "%*s\"dump_time\": %ld,\n", indent + 2, "", (long) now);
fprintf (out, "%*s\"copy_gap_pages\": %lld,\n", indent + 2, "", copy_gap_pages);
fprintf (out, "%*s\"pending_flush_pages\": %d,\n", indent + 2, "", logwr_Gl.num_toflush);
fprintf (out, "%*s\"archive_gap_pages\": %lld,\n", indent + 2, "", archive_gap_pages);
fprintf (out, "%*s\"action\": ", indent + 2, "");
logwr_dump_logwr_action (out, logwr_Gl.action);
fprintf (out, "\n");
fprintf (out, "%*s},\n\n", indent, "");
logwr_dump_json_string_field (out, indent, "db_name", logwr_Gl.db_name, true);
logwr_dump_json_string_field (out, indent, "hostname", logwr_Gl.hostname, true);
logwr_dump_json_string_field (out, indent, "log_path", logwr_Gl.log_path, true);
logwr_dump_json_string_field (out, indent, "loginf_path", logwr_Gl.loginf_path, true);
logwr_dump_json_string_field (out, indent, "active_name", logwr_Gl.active_name, true);
fprintf (out, "%*s\"append_vdes\": %d,\n", indent, "", logwr_Gl.append_vdes);
fprintf (out, "%*s\"logpg_area\": ", indent, "");
logwr_dump_json_pointer_value (out, logwr_Gl.logpg_area);
fprintf (out, ",\n");
fprintf (out, "%*s\"logpg_area_size\": %d,\n", indent, "", logwr_Gl.logpg_area_size);
fprintf (out, "%*s\"logpg_fill_size\": %d,\n", indent, "", logwr_Gl.logpg_fill_size);
fprintf (out, "%*s\"max_toflush\": %d,\n", indent, "", logwr_Gl.max_toflush);
fprintf (out, "%*s\"num_toflush\": %d,\n", indent, "", logwr_Gl.num_toflush);
logwr_dump_json_string_field (out, indent, "mode", mode, true);
fprintf (out, "%*s\"action\": ", indent, "");
logwr_dump_logwr_action (out, logwr_Gl.action);
fprintf (out, ",\n");
fprintf (out, "%*s\"last_chkpt_pageid\": %lld,\n", indent, "", (long long) logwr_Gl.last_chkpt_pageid);
fprintf (out, "%*s\"last_recv_pageid\": %lld,\n", indent, "", (long long) logwr_Gl.last_recv_pageid);
fprintf (out, "%*s\"last_arv_page_range\": {\n", indent, "");
fprintf (out, "%*s\"from\": %lld,\n", indent + 2, "", (long long) logwr_Gl.last_arv_fpageid);
fprintf (out, "%*s\"to\": %lld,\n", indent + 2, "", (long long) logwr_Gl.last_arv_lpageid);
fprintf (out, "%*s\"count\": %lld,\n", indent + 2, "", last_arv_page_count);
fprintf (out, "%*s\"archive_num\": %d\n", indent + 2, "", logwr_Gl.last_arv_num);
fprintf (out, "%*s},\n\n", indent, "");
fprintf (out, "%*s\"force_flush\": %s,\n", indent, "", logwr_Gl.force_flush ? "true" : "false");
fprintf (out, "%*s\"last_flush_time\": {\n", indent, "");
fprintf (out, "%*s\"tv_sec\": %ld,\n", indent + 2, "", (long) logwr_Gl.last_flush_time.tv_sec);
fprintf (out, "%*s\"tv_usec\": %ld\n", indent + 2, "", (long) logwr_Gl.last_flush_time.tv_usec);
fprintf (out, "%*s},\n\n", indent, "");
fprintf (out, "%*s\"background_archiving_info\": {\n", indent, "");
fprintf (out, "%*s\"start_page_id\": %lld,\n", indent + 2, "", (long long) logwr_Gl.bg_archive_info.start_page_id);
fprintf (out, "%*s\"current_page_id\": %lld,\n", indent + 2, "",
(long long) logwr_Gl.bg_archive_info.current_page_id);
fprintf (out, "%*s\"last_sync_pageid\": %lld,\n", indent + 2, "",
(long long) logwr_Gl.bg_archive_info.last_sync_pageid);
fprintf (out, "%*s\"vdes\": %d\n", indent + 2, "", logwr_Gl.bg_archive_info.vdes);
fprintf (out, "%*s},\n\n", indent, "");
logwr_dump_json_string_field (out, indent, "bg_archive_name", logwr_Gl.bg_archive_name, true);
fprintf (out, "%*s\"ori_nxarv_pageid\": %lld,\n", indent, "", (long long) logwr_Gl.ori_nxarv_pageid);
fprintf (out, "%*s\"start_pageid\": %lld,\n", indent, "", (long long) logwr_Gl.start_pageid);
fprintf (out, "%*s\"reinit_copylog\": %s\n", indent, "", logwr_Gl.reinit_copylog ? "true" : "false");
}
#endif