26 #if !defined (WINDOWS) 31 #if !defined (WINDOWS) 36 #include <sys/types.h> 38 #include <sys/socket.h> 68 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 79 #include <sys/types.h> 82 #define LA_DEFAULT_CACHE_BUFFER_SIZE 100 83 #define LA_MAX_REPL_ITEM_WITHOUT_RELEASE_PB 50 84 #define LA_MAX_UNFLUSHED_REPL_ITEMS 200 85 #define LA_DEFAULT_LOG_PAGE_SIZE 4096 86 #define LA_GET_PAGE_RETRY_COUNT 10 87 #define LA_REPL_LIST_COUNT 50 89 #define LA_PAGE_DOESNOT_EXIST 0 90 #define LA_PAGE_EXST_IN_ACTIVE_LOG 1 91 #define LA_PAGE_EXST_IN_ARCHIVE_LOG 2 93 #define LA_STATUS_BUSY 1 94 #define LA_STATUS_IDLE 0 96 #define LA_LOCK_SUFFIX "_lgla__lock" 98 #define LA_QUERY_BUF_SIZE 2048 100 #define LA_MAX_REPL_ITEMS 1000 103 #define LA_NUM_DELAY_HISTORY 10 104 #define LA_MAX_TOLERABLE_DELAY 2 105 #define LA_REINIT_COMMIT_INTERVAL 10 107 #define LA_WS_CULL_MOPS_PER_APPLY (100000) 108 #define LA_WS_CULL_MOPS_INTERVAL (180) 109 #define LA_WS_CULL_MOPS_PER_APPLY_MIN (100) 110 #define LA_WS_CULL_MOPS_INTERVAL_MIN (2) 112 #define LA_NUM_REPL_FILTER 50 114 #define LA_LOG_IS_IN_ARCHIVE(pageid) \ 115 ((pageid) < la_Info.act_log.log_hdr->nxarv_pageid) 117 #define SIZEOF_LA_CACHE_LOG_BUFFER(io_pagesize) \ 118 (offsetof(LA_CACHE_BUFFER, logpage) + (io_pagesize)) 120 #define LA_LOGAREA_SIZE (la_Info.act_log.db_logpagesize - SSIZEOF(LOG_HDRPAGE)) 121 #define LA_LOG_READ_ADVANCE_WHEN_DOESNT_FIT(result, length, offset, pageid, pgptr) \ 123 if ((offset)+(length) >= LA_LOGAREA_SIZE) { \ 124 if (((pgptr) = la_get_page(++(pageid))) == NULL) { \ 125 result = ER_IO_READ; \ 131 #define LA_LOG_READ_ALIGN(result, offset, pageid, log_pgptr) \ 133 (offset) = DB_ALIGN((offset), MAX_ALIGNMENT); \ 134 while ((offset) >= LA_LOGAREA_SIZE) { \ 135 if (((log_pgptr) = la_get_page(++(pageid))) == NULL) { \ 136 result = ER_IO_READ; \ 138 (offset) -= LA_LOGAREA_SIZE; \ 139 (offset) = DB_ALIGN((offset), MAX_ALIGNMENT); \ 143 #define LA_LOG_READ_ADD_ALIGN(result, add, offset, pageid, log_pgptr) \ 146 LA_LOG_READ_ALIGN(result, (offset), (pageid), (log_pgptr)); \ 149 #define LA_SLEEP(sec, usec) \ 151 struct timeval sleep_time_val; \ 152 sleep_time_val.tv_sec = (sec); \ 153 sleep_time_val.tv_usec = (usec); \ 154 select (0, 0, 0, 0, &sleep_time_val); \ 158 #define LA_MOVE_INSIDE_RECORD(rec, dest_offset, src_offset) \ 161 assert ((rec) != NULL && (dest_offset) >= 0 && (src_offset) >= 0); \ 162 assert (((rec)->length - (src_offset)) >= 0); \ 163 assert (((rec)->area_size <= 0) || ((rec)->area_size >= (rec)->length)); \ 164 assert (((rec)->area_size <= 0) \ 165 || (((rec)->length + ((dest_offset) - (src_offset))) \ 166 <= (rec)->area_size)); \ 167 if ((dest_offset) != (src_offset)) \ 169 memmove ((rec)->data + (dest_offset), (rec)->data + (src_offset), \ 170 (rec)->length - (src_offset)); \ 171 (rec)->length = (rec)->length + ((dest_offset) - (src_offset)); \ 282 char log_path[PATH_MAX];
283 char loginf_path[PATH_MAX];
349 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 350 int tde_sock_for_dks;
395 char copied_log_path[4096];
427 #if defined (WINDOWS) 471 static unsigned int log_pageid_hash (
const void *key,
unsigned int htsize);
502 static char *
la_get_zipped_data (
char *undo_data,
int undo_length,
bool is_diff,
bool is_undo_zip,
bool is_overflow,
503 char **rec_type,
char **data,
int *length);
505 char **undo_data,
int *undo_length);
507 unsigned int *rcvindex,
void **logs,
char **rec_type,
char **data,
int *d_length);
510 char **data,
int *d_length);
531 static void la_init (
const char *log_path,
const int max_mem_size);
540 int max_arv_count_to_delete);
554 static float la_get_avg (
int *array,
int size);
566 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 580 #if defined (WINDOWS) 637 else if (tmp_pageid < 0)
691 int remain_bytes = pagesize;
692 off64_t offset = ((off64_t) pagesize) * ((off64_t) pageid);
693 char *current_ptr = (
char *) io_pgptr;
695 if (lseek64 (vdes, offset, SEEK_SET) == -1)
701 while (remain_bytes > 0 && retries != 0)
703 retries = (retries > 0) ? retries - 1 : retries;
706 nbytes = read (vdes, current_ptr, remain_bytes);
732 remain_bytes -= nbytes;
733 current_ptr += nbytes;
736 if (remain_bytes > 0)
765 char arv_log_path[PATH_MAX];
769 if (hdr_page ==
NULL)
772 if (hdr_page ==
NULL)
784 arv_log_vdes =
fileio_open (arv_log_path, O_RDONLY, 0);
809 *npages = log_hdr->
npages;
830 char arv_log_path[PATH_MAX];
834 if (*arv_log_num == -1)
849 guess_num = *arv_log_num;
855 guess_num = MAX (guess_num, left);
874 if (pageid >= fpageid && pageid < fpageid + npages)
876 *arv_log_num = guess_num;
879 else if (pageid < fpageid)
881 right = guess_num - 1;
884 else if (pageid >= fpageid + npages)
886 left = guess_num + 1;
890 while (guess_num >= 0 && guess_num < la_Info.act_log.log_hdr->nxarv_num && left <= right);
907 bool need_guess =
true;
918 if (pageid >= fpageid && pageid < fpageid + npages)
965 (
long long int) pageid);
969 #if defined (LA_VERBOSE_DEBUG) 973 (
long long int) pageid);
1038 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 1106 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 1128 usleep (100 * 1000);
1135 while (error ==
NO_ERROR && --retry > 0);
1137 if (retry <= 0 || la_Info.act_log.log_hdr->append_lsa.pageid < pageid)
1139 #if defined (LA_VERBOSE_DEBUG) 1171 int i, size, bufid, total_buffers;
1186 memset (area, 0, size);
1190 if (slb_log_buffer ==
NULL)
1200 for (i = 0, bufid = cache_pb->
num_buffers; i < slb_cnt; i++, bufid++)
1227 int i, num_recently_free, found = -1;
1228 static unsigned int last = 0;
1232 num_recently_free = 0;
1243 num_recently_free++;
1255 if (cache_buffer->
pageid != 0)
1265 cache_buffer->
pageid = 0;
1269 return cache_buffer;
1272 if (num_recently_free > 0)
1296 if (cache_buffer ==
NULL)
1323 return cache_buffer;
1341 while (cache_buffer ==
NULL);
1343 return &cache_buffer->
logpage;
1361 if (cache_buffer !=
NULL)
1388 if (cache_buffer->
pageid == except_pageid)
1413 if (cache_buffer ==
NULL)
1418 if (cache_buffer->
pageid != 0)
1424 cache_buffer->
pageid = 0;
1439 || (cache_buffer->
pageid > to))
1448 cache_buffer->
pageid = 0;
1470 for (i = 0; i < la_Info.
cur_repl; i++)
1488 LSA_COPY (required_lsa, &lowest_lsa);
1507 #define LA_IN_VALUE_COUNT 2 1508 #define LA_OUT_VALUE_COUNT 23 1511 int in_value_idx, out_value_idx;
1526 snprintf (query_buf,
sizeof (query_buf),
"SELECT " 1527 " db_creation_time, " 1528 " committed_lsa_pageid, " 1529 " committed_lsa_offset, " 1530 " committed_rep_pageid, " 1531 " committed_rep_offset, " 1532 " append_lsa_pageid, " 1533 " append_lsa_offset, " 1536 " final_lsa_pageid, " 1537 " final_lsa_offset, " 1538 " required_lsa_pageid, " 1539 " required_lsa_offset, " 1540 " log_record_time, " 1541 " log_commit_time, " 1542 " last_access_time, " 1581 strncpy (ha_apply_info->
db_name, prefix_name, sizeof (ha_apply_info->
db_name) - 1);
1695 for (i = 0; i < in_value_idx; i++)
1702 #undef LA_IN_VALUE_COUNT 1703 #undef LA_OUT_VALUE_COUNT 1709 #define LA_IN_VALUE_COUNT 15 1722 snprintf (query_buf,
sizeof (query_buf),
"INSERT INTO %s " 1724 " db_creation_time, " 1725 " copied_log_path, " 1726 " committed_lsa_pageid, " 1727 " committed_lsa_offset, " 1728 " committed_rep_pageid, " 1729 " committed_rep_offset, " 1730 " append_lsa_pageid, " 1731 " append_lsa_offset, " 1734 " final_lsa_pageid, " 1735 " final_lsa_offset, " 1736 " required_lsa_pageid, " 1737 " required_lsa_offset, " 1738 " log_record_time, " 1739 " log_commit_time, " 1740 " last_access_time, " 1817 for (i = 0; i < in_value_idx; i++)
1831 msg =
"COMMENT: %s for database %s\n";
1847 #undef LA_IN_VALUE_COUNT 1853 #define LA_IN_VALUE_COUNT 2 1863 snprintf (query_buf,
sizeof (query_buf),
1864 "UPDATE %s SET start_time = SYS_DATETIME, status = 0 WHERE db_name = ? AND copied_log_path = ? ;",
1877 for (i = 0; i < in_value_idx; i++)
1884 #undef LA_IN_VALUE_COUNT 1890 #define LA_IN_VALUE_COUNT 3 1895 int i, in_value_idx = 0;
1899 snprintf (query_buf,
sizeof (query_buf),
"UPDATE %s " 1901 " log_record_time = ?, " 1902 " last_access_time = SYS_DATETIME " 1903 " WHERE db_name = ? AND copied_log_path = ? ;",
1940 for (i = 0; i < in_value_idx; i++)
1946 #undef LA_IN_VALUE_COUNT 1961 time_t log_db_creation;
1963 bool insert_apply_info =
false;
1964 char err_msg[LINE_MAX];
2003 insert_apply_info =
true;
2030 if (insert_apply_info ==
true)
2067 #define LA_IN_VALUE_COUNT 22 2072 int i, in_value_idx;
2076 snprintf (query_buf,
sizeof (query_buf),
"UPDATE %s " 2078 " committed_lsa_pageid = ?, " 2079 " committed_lsa_offset = ?, " 2080 " committed_rep_pageid = ?, " 2081 " committed_rep_offset = ?, " 2082 " append_lsa_pageid = ?, " 2083 " append_lsa_offset = ?, " 2084 " eof_lsa_pageid = ?, " 2085 " eof_lsa_offset = ?, " 2086 " final_lsa_pageid = ?, " 2087 " final_lsa_offset = ?, " 2088 " required_lsa_pageid = ?, " 2089 " required_lsa_offset = ?, " 2090 " log_record_time = IFNULL(?, log_record_time), " 2091 " log_commit_time = IFNULL(?, log_commit_time), " 2092 " last_access_time = SYS_DATETIME, " 2093 " insert_counter = ?, " 2094 " update_counter = ?, " 2095 " delete_counter = ?, " 2096 " schema_counter = ?, " 2097 " commit_counter = ?, " 2098 " fail_counter = ? " 2099 " WHERE db_name = ? AND copied_log_path = ? ;",
2199 for (i = 0; i < in_value_idx; i++)
2205 #undef LA_IN_VALUE_COUNT 2211 #define LA_IN_VALUE_COUNT 2 2221 snprintf (query_buf,
sizeof (query_buf),
"DELETE FROM %s WHERE db_name = ? AND copied_log_path = ?",
2234 for (i = 0; i < in_value_idx; i++)
2243 #undef LA_IN_VALUE_COUNT 2251 errid = abs (errid);
2291 errid = abs (errid);
2318 for (i = 0; i < la_recdes_pool.
num_recdes; i++)
2363 recdes->
data = (
char *) malloc (data_size);
2430 la_recdes_pool.
area = (
char *) malloc (page_size * num_recdes);
2444 p = la_recdes_pool.
area;
2445 for (i = 0; i < num_recdes; i++)
2483 if (cache_pb ==
NULL)
2659 "Active log file(%s) charset is not valid (%s), expecting %s.",
2748 if (log_buffer !=
NULL)
2782 if (need_realloc ==
false)
2804 for (i = j; i < la_Info.
repl_cnt; i++)
2824 for (j = 0; j <
i; j++)
2845 for (i = 0; i < la_Info.
cur_repl; i++)
2867 for (i = 0; i < la_Info.
cur_repl; i++)
2896 int free_index = -1;
2900 if (find_apply !=
NULL)
2906 for (i = 0; i < la_Info.
cur_repl; i++)
2917 if (free_index >= 0)
2967 int rec_length = (int)
sizeof (INT16);
2970 int area_offset = 0;
2978 while (rec_type !=
NULL && rec_length > 0)
2988 memcpy (rec_type + area_offset, (
char *) (pg)->area + log_offset, copy_length);
2989 rec_length -= copy_length;
2990 area_offset += copy_length;
2991 log_offset += copy_length;
2999 while (t_length > 0)
3008 memcpy (area + area_offset, (
char *) (pg)->area + log_offset, copy_length);
3009 t_length -= copy_length;
3010 area_offset += copy_length;
3011 log_offset += copy_length;
3110 repl_log_pgptr = log_pgptr;
3129 length = repl_log->
length;
3137 area = (
char *) malloc (length);
3319 for (; item; item = next_item)
3321 next_item = item->
next;
3347 for (item = apply->
head; (item !=
NULL) && (
LSA_LT (&item->
lsa, commit_lsa)); item = next_item)
3349 next_item = item->
next;
3400 for (i = 0; i < la_Info.
cur_repl; i++)
3491 commit->
type = type;
3563 int i, j, offset, offset2, pad;
3564 char *bits, *start, *v_start;
3582 vars[
i] = offset2 - offset;
3610 att->type->data_readval (buf, &value, att->
domain, -1,
true,
NULL, 0);
3627 pad = (int) (buf->
ptr - start);
3628 if (pad < sm_class->fixed_size)
3641 for (i = sm_class->
fixed_count, j = 0; i < sm_class->att_count && j < sm_class->variable_count;
3644 att->type->data_readval (buf, &value, att->
domain, vars[j],
true,
NULL, 0);
3675 int repid_and_flag_bits = 0;
3708 unsigned int repid_bits;
3720 status = setjmp (buf->
env);
3732 if (mvcc_flags == 0)
3764 error =
la_get_current (buf, sm_class, bound_bit_flag, def, key, offset_size);
3780 la_get_zipped_data (
char *undo_data,
int undo_length,
bool is_diff,
bool is_undo_zip,
bool is_overflow,
char **rec_type,
3781 char **data,
int *length)
3783 int redo_length = 0;
3805 (void)
log_diff (undo_length, undo_data, redo_length, redo_unzip_data->
log_data);
3816 *length = redo_length - rec_len;
3820 *length = redo_length;
3829 *data = (
char *) malloc (*length);
3841 memcpy (*data, (la_Info.
redo_unzip_ptr)->log_data + rec_len, *length);
3872 temp_offset = *offset;
3876 *is_undo_zip =
true;
3880 *undo_data = (
char *) malloc (*undo_length);
3881 if (*undo_data ==
NULL)
3890 if (*is_undo_zip && *undo_length > 0)
3892 if (!
log_unzip (undo_unzip_data, *undo_length, *undo_data))
3904 *pageid = temp_pageid;
3905 *offset = temp_offset;
3927 unsigned int *rcvindex,
void **logs,
char **rec_type,
char **data,
int *d_length)
3944 bool is_undo_zip =
false;
3946 int undo_length = 0;
3947 int temp_length = 0;
3948 char *undo_data =
NULL;
3950 bool is_overflow =
false;
3951 bool is_diff =
false;
3952 bool is_mvcc_log =
false;
3981 if (is_mvcc_log ==
true)
3994 if (is_mvcc_log ==
true)
3997 undoredo = &mvcc_undoredo->
undoredo;
4004 undo_length = undoredo->
ulength;
4005 temp_length = undoredo->
rlength;
4008 if (match_rcvindex == 0 || undoredo->
data.
rcvindex == match_rcvindex)
4016 *logs = (
void *) undoredo;
4021 *logs = (
void *)
NULL;
4032 if (undo_data !=
NULL)
4050 if (is_mvcc_log ==
true)
4062 if (is_mvcc_log ==
true)
4065 undo = &mvcc_undo->
undo;
4072 temp_length = undo->
length;
4075 if (match_rcvindex == 0 || undo->
data.
rcvindex == match_rcvindex)
4079 *logs = (
void *) undo;
4088 *logs = (
void *)
NULL;
4096 if (is_mvcc_log ==
true)
4108 if (is_mvcc_log ==
true)
4111 redo = &mvcc_redo->
redo;
4118 temp_length = redo->
length;
4121 if (match_rcvindex == 0 || redo->
data.
rcvindex == match_rcvindex)
4125 *logs = (
void *) redo;
4134 *logs = (
void *)
NULL;
4151 if (undo_data !=
NULL)
4162 *data = (
char *) malloc (length);
4168 if (undo_data !=
NULL)
4188 if (undo_data !=
NULL)
4197 *data =
la_get_zipped_data (undo_data, undo_length, is_diff, is_undo_zip, is_overflow, rec_type, data, &length);
4212 if (undo_data !=
NULL)
4262 if (ovf_list_data ==
NULL)
4265 while (ovf_list_head)
4267 ovf_list_data = ovf_list_head;
4268 ovf_list_head = ovf_list_head->
next;
4282 &ovf_list_data->
data, &ovf_list_data->
length);
4284 if (error ==
NO_ERROR && log_info && ovf_list_data->
data)
4287 if (ovf_list_head ==
NULL)
4289 ovf_list_head = ovf_list_tail = ovf_list_data;
4293 ovf_list_data->
next = ovf_list_head;
4294 ovf_list_head = ovf_list_data;
4297 length += ovf_list_data->
length;
4318 while (ovf_list_head)
4320 ovf_list_data = ovf_list_head;
4321 ovf_list_head = ovf_list_head->
next;
4331 while (ovf_list_head)
4333 ovf_list_data = ovf_list_head;
4334 ovf_list_head = ovf_list_head->
next;
4345 area_len = ovf_list_data->
length - area_offset;
4346 memcpy (recdes->
data + copyed_len, ovf_list_data->
data + area_offset, area_len);
4347 copyed_len += area_len;
4388 int temp_length = 0;
4389 int undo_length = 0;
4391 bool is_undo_zip =
false;
4392 bool is_mvcc_log =
false;
4394 char *undo_data =
NULL;
4397 bool is_diff =
false;
4428 is_mvcc_log =
false;
4438 if (is_mvcc_log ==
true)
4441 undoredo = &mvcc_undoredo->
undoredo;
4448 undo_length = undoredo->
ulength;
4449 temp_length = undoredo->
rlength;
4463 if (undo_data !=
NULL)
4482 if (!
log_unzip (redo_unzip_data, zip_len, *data))
4484 if (undo_data !=
NULL)
4494 la_get_zipped_data (undo_data, undo_length, is_diff, is_undo_zip,
false, rec_type, data,
4509 if (undo_data !=
NULL)
4536 unsigned int rcvindex;
4597 recdes->
type = *(INT16 *) (rec_type);
4629 recdes->
type = *(INT16 *) (rec_type);
4641 recdes->
type = *(INT16 *) (rec_type);
4691 const char *class_name =
"UNKNOWN CLASS";
4692 const char *server_err_msg =
"UNKOWN";
4711 if (flush_err ==
NULL)
4730 snprintf (pkey_str,
sizeof (pkey_str) - 1, sb.
get_buffer ());
4770 snprintf (buf,
sizeof (buf),
4771 "applylogdb will reconnect to server due to a failure in flushing changes. " 4772 "class: %s, key: %s, server error: %d, %s", class_name, pkey_str, flush_err->
error_code,
4826 bool has_index =
false;
4830 class_oid =
ws_oid (classop);
4873 operation, has_index);
4890 char sql_log_err[LINE_MAX];
4902 if (class_obj ==
NULL)
4918 snprintf (sql_log_err,
sizeof (sql_log_err),
"failed to write SQL log. class: %s, key: %s",
4939 #if defined (LA_VERBOSE_DEBUG) 4945 error,
"internal client error.");
4970 unsigned int rcvindex;
4977 char sql_log_err[LINE_MAX];
5022 if (class_obj ==
NULL)
5041 bool sql_logging_failed =
false;
5050 sql_logging_failed =
true;
5057 if (inst_tp ==
NULL)
5059 sql_logging_failed =
true;
5067 sql_logging_failed =
true;
5075 sql_logging_failed =
true;
5084 if (sql_logging_failed ==
true)
5088 snprintf (sql_log_err,
sizeof (sql_log_err),
"failed to write SQL log. class: %s, key: %s", item->
class_name,
5102 #if defined (LA_VERBOSE_DEBUG) 5108 error,
"internal client error.");
5158 unsigned int rcvindex;
5206 if (class_obj ==
NULL)
5221 bool sql_logging_failed =
false;
5223 char sql_log_err[LINE_MAX];
5232 sql_logging_failed =
true;
5239 if (inst_tp ==
NULL)
5241 sql_logging_failed =
true;
5250 sql_logging_failed =
true;
5257 sql_logging_failed =
true;
5267 if (sql_logging_failed ==
true)
5271 snprintf (sql_log_err,
sizeof (sql_log_err),
"failed to write SQL log. class: %s, key: %s", item->
class_name,
5285 #if defined (LA_VERBOSE_DEBUG) 5291 error,
"internal client error.");
5343 res =
db_execute (sql, &result, &query_error);
5415 const char *stmt_text =
NULL;
5417 const char *error_msg =
"";
5419 char sql_log_err[LINE_MAX];
5420 bool is_ddl =
false;
5529 snprintf (sql_log_err,
sizeof (sql_log_err),
"failed to write SQL log. class: %s, stmt: %s",
5545 snprintf (sql_log_err,
sizeof (sql_log_err),
"failed to change sys prm: %s", item->
ha_sys_prm);
5563 else if (is_ddl ==
true)
5592 if (save_user !=
NULL)
5612 #if defined (LA_VERBOSE_DEBUG) 5644 int apply_repl_log_cnt = 0;
5645 char error_string[1024];
5647 static unsigned int total_repl_items = 0;
5648 bool release_pb =
false;
5649 bool has_more_commit_items =
false;
5690 if (final_pageid !=
NULL_PAGEID && release_pb ==
true)
5733 apply_repl_log_cnt++;
5762 snprintf (buf,
sizeof (buf),
"attempts to try applying failed replication log again. (error:%d)",
5780 has_more_commit_items =
true;
5786 *total_rows += apply_repl_log_cnt;
5790 if (has_more_commit_items)
5882 for (commit = la_Info.
commit_head; commit; commit = commit_next)
5884 commit_next = commit->
next;
5886 if (commit->
tranid == tranid)
5937 return (item->
next);
5958 if (prev_repl_log_record ==
NULL)
5961 if (prev_repl_log_record ==
NULL)
5968 if (!
LSA_EQ (&curr_lsa, &prev_repl_lsa) && prev_repl_log_record->
trid == curr_log_record->
trid)
5989 if (prev_repl_log_record)
6062 #if defined (LA_VERBOSE_DEBUG) 6132 "cannot connect with server");
6180 snprintf (buffer,
sizeof (buffer),
"process log record (type:%d). skip this log record. LSA: %lld|%d",
6181 lrec->
type, (
long long int) final->pageid, (
int) final->offset);
6192 if (ha_server_state ==
NULL)
6233 snprintf (buffer,
sizeof (buffer),
"process last log record in archive. LSA: %lld|%d",
6234 (
long long int) final->pageid, (
int) final->offset);
6242 lrec->forw_lsa.pageid, lrec->forw_lsa.offset, lrec->back_lsa.pageid, lrec->back_lsa.offset, lrec->trid,
6243 lrec->prev_tranlsa.pageid, lrec->prev_tranlsa.offset, lrec->type);
6254 char *hostname =
NULL, *
p;
6256 if (log_path ==
NULL)
6262 p += (
strlen (log_path) - 1);
6280 if (hostname ==
NULL)
6286 if (*hostname !=
'_')
6291 return hostname + 1;
6405 snprintf (buffer,
sizeof (buffer),
"change log apply state from '%s' to '%s'. last committed LSA: %lld|%d",
6439 if (update_commit_time)
6478 static unsigned long 6481 unsigned long vsize = 0;
6485 fp = fopen (
"/proc/self/statm",
"r");
6488 fscanf (fp,
"%lu", &vsize);
6490 vsize *= (sysconf (_SC_PAGESIZE) /
ONE_K);
6494 struct procentry64 entry;
6495 pid_t
pid = getpid ();
6497 if (getprocs64 (&entry,
sizeof (entry),
NULL, 0, &pid, 1))
6499 vsize = (
unsigned long) entry.pi_dvm * (sysconf (_SC_PAGESIZE) /
ONE_K);
6511 unsigned long vsize;
6512 unsigned long max_vsize;
6529 if (vsize > max_vsize)
6548 static int last_time = 0;
6549 static unsigned long long last_applied_item = 0;
6552 unsigned long long curr_applied_item;
6553 unsigned long long diff_applied_item;
6554 unsigned long ws_cull_mops_interval;
6555 unsigned long ws_cull_mops_per_apply;
6560 last_time = time (
NULL);
6563 if (last_applied_item == 0)
6575 curr_time = time (
NULL);
6576 diff_time = curr_time - last_time;
6579 diff_applied_item = curr_applied_item - last_applied_item;
6593 if ((
long unsigned) diff_time >= ws_cull_mops_interval || diff_applied_item >= ws_cull_mops_per_apply)
6598 last_time = curr_time;
6599 last_applied_item = curr_applied_item;
6609 struct timeval curtime;
6611 bool need_commit =
false;
6616 gettimeofday (&curtime,
NULL);
6617 diff_msec = (curtime.tv_sec - time_commit->tv_sec) * 1000 + (curtime.tv_usec / 1000 - time_commit->tv_usec / 1000);
6623 if (threshold < (
unsigned int) diff_msec)
6625 gettimeofday (time_commit,
NULL);
6666 char lock_path[PATH_MAX];
6671 *lockf_vdes =
fileio_open (lock_path, O_RDWR | O_CREAT, 0644);
6688 #if defined (LA_VERBOSE_DEBUG) 6698 char arv_num_str[11];
6701 assert (last_deleted_arv_num >= -1);
6702 if (lockf_vdes ==
NULL_VOLDES || last_deleted_arv_num < -1)
6707 snprintf (arv_num_str,
sizeof (arv_num_str),
"%-10d", last_deleted_arv_num);
6709 if ((lseek (lockf_vdes, 0, SEEK_SET) != 0))
6714 len = write (lockf_vdes, arv_num_str,
sizeof (arv_num_str) - 1);
6715 if (len != (
sizeof (arv_num_str) - 1))
6781 la_init (
const char *log_path,
const int max_mem_size)
6783 static unsigned long start_vsize = 0;
6785 memset (&la_Info, 0,
sizeof (la_Info));
6787 strncpy (la_Info.
log_path, log_path, PATH_MAX - 1);
6837 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 6838 la_Info.tde_sock_for_dks = -1;
6869 bool clear_owner =
false;
6921 for (i = 0; i < la_Info.
repl_cnt; i++)
6972 printf (
"%-30s : %s\n",
"Magic", hdr->
magic);
6974 printf (
"%-30s : %s\n",
"DB name", database_name);
6975 printf (
"%-30s : %s (%ld)\n",
"DB creation time", timebuf, tloc);
6981 printf (
"%-30s : %s\n",
"CUBRID release", hdr->
db_release);
6982 printf (
"%-30s : %d\n",
"DB iopagesize", hdr->
db_iopagesize);
6984 printf (
"%-30s : %d\n",
"Is log shutdown", hdr->
is_shutdown);
6985 printf (
"%-30s : %d\n",
"Next transaction identifier", hdr->
next_trid);
6986 printf (
"%-30s : %d\n",
"Number of pages", hdr->
npages);
6987 printf (
"%-30s : %d (%s)\n",
"Charset", hdr->
db_charset,
6989 printf (
"%-30s : %lld\n",
"Logical pageid", (
long long int) hdr->
fpageid);
6991 printf (
"%-30s : %lld\n",
"Next archive pageid", (
long long int) hdr->
nxarv_pageid);
6992 printf (
"%-30s : %lld\n",
"Next archive physical pageid", (
long long int) hdr->
nxarv_phy_pageid);
6993 printf (
"%-30s : %d\n",
"Next archive number", hdr->
nxarv_num);
6994 printf (
"%-30s : %s\n",
"HA file status",
7002 printf (
"%-30s : %s (%ld)\n",
"HA promotion time", timebuf, tloc);
7007 printf (
"%-30s : %s (%ld)\n",
"DB restore time", timebuf, tloc);
7009 printf (
"%-30s : %s \n",
"Mark will be deleted", (hdr->
mark_will_del ==
true) ?
"true" :
"false");
7028 printf (
"%-30s : %s\n",
"DB name ", database_name);
7029 printf (
"%-30s : %s (%ld)\n",
"DB creation time ", timebuf, tloc);
7032 printf (
"%-30s : %d\n",
"Next transaction identifier", hdr->
next_trid);
7033 printf (
"%-30s : %d\n",
"Number of pages", hdr->
npages);
7034 printf (
"%-30s : %lld\n",
"Logical pageid", (
long long int) hdr->
fpageid);
7036 printf (
"%-30s : %d\n",
"Archive number", hdr->
arv_num);
7047 bool check_copied_info,
bool check_replica_info,
bool verbose,
LOG_LSA * copied_eof_lsa,
7053 char active_log_path[PATH_MAX];
7054 char *replica_time_bound_str;
7059 atchar = (
char *) strchr (database_name,
'@');
7068 if (check_applied_info)
7080 goto check_applied_info_end;
7083 *applied_final_lsa = ha_apply_info.
final_lsa;
7085 printf (
"\n *** Applied Info. *** \n");
7090 printf (
"%-30s : %s\n",
"DB creation time", timebuf);
7095 printf (
"%-30s : %lld | %d\n",
"Last EOF LSA",
LSA_AS_ARGS (&ha_apply_info.
eof_lsa));
7100 printf (
"%-30s : %s\n",
"Log record time", timebuf);
7103 printf (
"%-30s : %s\n",
"Log committed time", timebuf);
7106 printf (
"%-30s : %s\n",
"Last access time", timebuf);
7109 printf (
"%-30s : %ld\n",
"Insert count", ha_apply_info.
insert_counter);
7110 printf (
"%-30s : %ld\n",
"Update count", ha_apply_info.
update_counter);
7111 printf (
"%-30s : %ld\n",
"Delete count", ha_apply_info.
delete_counter);
7112 printf (
"%-30s : %ld\n",
"Schema count", ha_apply_info.
schema_counter);
7113 printf (
"%-30s : %ld\n",
"Commit count", ha_apply_info.
commit_counter);
7114 printf (
"%-30s : %ld\n",
"Fail count", ha_apply_info.
fail_counter);
7119 printf (
"%-30s : %s\n",
"Start time", timebuf);
7122 if (check_replica_info)
7127 printf (
"\n *** Replica-specific Info. *** \n");
7128 if (replica_time_bound_str ==
NULL)
7130 printf (
"%-30s : %d second(s)\n",
"Deliberate lag",
7134 printf (
"%-30s : %s\n",
"Last applied log record time", timebuf);
7135 if (replica_time_bound_str !=
NULL)
7137 printf (
"%-30s : %s\n",
"Will apply log records up to", replica_time_bound_str);
7141 check_applied_info_end:
7148 if (check_copied_info)
7151 memset (active_log_path, 0, PATH_MAX);
7157 goto check_copied_info_end;
7164 goto check_copied_info_end;
7170 printf (
"\n *** Copied Active Info. *** \n");
7174 if (check_copied_info && (page_num > 1))
7189 goto check_copied_info_end;
7197 printf (
"\n *** Copied Archive Info. *** \n");
7207 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 7213 goto check_copied_info_end;
7221 goto check_copied_info_end;
7232 printf (
"Log page %lld (phy: %lld pageid: %lld, offset %d)\n", (
long long int) page_num,
7240 goto check_copied_info_end;
7246 while (lsa.
pageid == page_num)
7250 printf (
"offset:%04ld (tid:%d bck p:%lld,o:%ld frw p:%lld,o:%ld type:%d)\n", lsa.
offset, lrec->
trid,
7259 check_copied_info_end:
7271 INT64 delayed_page_count = 0;
7272 INT64 estimated_delay = 0;
7274 delayed_page_count = target_lsa.
pageid - working_lsa.
pageid;
7276 if (process_rate != 0.0f)
7278 estimated_delay = (INT64) (delayed_page_count / process_rate);
7281 printf (
"%-30s : %lld\n",
"Delayed log page count", (
long long int) delayed_page_count);
7283 if (process_rate == 0.0f)
7285 printf (
"%-30s : - second(s)\n",
"Estimated Delay");
7289 printf (
"%-30s : %ld second(s)\n",
"Estimated Delay", estimated_delay);
7307 const char *info_reason, *catmsg;
7308 char archive_name[PATH_MAX] = {
'\0', }, archive_name_first[PATH_MAX];
7309 int first_arv_num_to_delete = -1;
7310 int last_arv_num_to_delete = -1;
7311 int required_arv_num = -1;
7313 int current_arv_count;
7321 return last_deleted_arv_num;
7323 max_arv_count = MAX (log_max_archives, nxarv_num - required_arv_num);
7327 max_arv_count = log_max_archives;
7330 current_arv_count = nxarv_num - last_deleted_arv_num - 1;
7331 if (current_arv_count > max_arv_count)
7333 first_arv_num_to_delete = last_deleted_arv_num + 1;
7334 last_arv_num_to_delete = nxarv_num - max_arv_count - 1;
7335 if ((last_arv_num_to_delete < 0) || (last_arv_num_to_delete < first_arv_num_to_delete))
7337 return last_deleted_arv_num;
7340 if (max_arv_count_to_delete < last_arv_num_to_delete - first_arv_num_to_delete + 1)
7342 last_arv_num_to_delete = first_arv_num_to_delete + max_arv_count_to_delete - 1;
7345 for (i = first_arv_num_to_delete; i <= last_arv_num_to_delete; i++)
7352 if (info_reason ==
NULL)
7354 info_reason =
"Number of active log archives has been exceeded the max desired number.";
7359 catmsg =
"REMOVE: %d %s to \n%d %s.\nREASON: %s\n";
7361 if (first_arv_num_to_delete == last_arv_num_to_delete)
7364 last_arv_num_to_delete, archive_name, info_reason);
7370 last_arv_num_to_delete, archive_name, info_reason);
7372 return last_arv_num_to_delete;
7375 return last_deleted_arv_num;
7382 char arv_log_path[PATH_MAX];
7386 while (arv_log_num >= 0)
7392 arv_log_vdes =
fileio_open (arv_log_path, O_RDONLY, 0);
7412 for (i = 0; i < size; i++)
7417 return (
float) total / size;
7431 int max_commit_interval;
7433 static int delay_hist_idx = 0;
7448 *time_commit_interval = max_commit_interval;
7450 else if (delay == 0)
7452 *time_commit_interval /= 2;
7455 else if (delay_hist[delay_hist_idx] >= 0)
7459 if (delay < avg_delay)
7461 *time_commit_interval /= 2;
7463 else if (delay > avg_delay)
7465 *time_commit_interval *= 2;
7467 if (*time_commit_interval == 0)
7471 else if (*time_commit_interval > max_commit_interval)
7473 *time_commit_interval = max_commit_interval;
7478 delay_hist[delay_hist_idx++] = delay;
7492 char buffer[LINE_MAX];
7498 last = buffer +
sizeof (buffer);
7509 p += snprintf (p, MAX ((last - p), 0),
"updates only on the following tables will be applied: ");
7513 p += snprintf (p, MAX ((last - p), 0),
"updates on the following tables will be ignored: ");
7516 p += snprintf (p, MAX ((last - p), 0),
"[%s]", filter->
list[0]);
7519 p += snprintf (p, MAX (last - p, 0),
", [%s]", filter->
list[i]);
7539 bool filter_found =
false;
7557 filter_found =
true;
7586 if (classname ==
NULL || classname[0] ==
'\0')
7628 char filter_file_real_path[PATH_MAX];
7629 char buffer[LINE_MAX];
7630 char error_msg[LINE_MAX];
7632 int classname_len = 0;
7646 if (filter_file ==
NULL || filter_file[0] ==
'\0')
7648 snprintf (error_msg, LINE_MAX,
"no replication filter file is specified");
7659 fp = fopen (filter_file,
"r");
7662 snprintf (error_msg, LINE_MAX,
"failed to open %s", filter_file);
7680 while (fgets ((
char *) buffer, LINE_MAX, fp) !=
NULL)
7683 classname_len =
strlen (buffer);
7684 if (classname_len > 0 && buffer[classname_len - 1] ==
'\n')
7686 buffer[classname_len - 1] =
'\0';
7690 if (classname_len <= 0)
7709 snprintf_dots_truncate (error_msg, LINE_MAX - 1,
"cannot find table [%s] listed in %s", buffer, filter_file);
7821 struct timeval time_commit;
7823 int last_nxarv_num = 0;
7825 int now = 0, last_eof_time = 0;
7827 int time_commit_interval;
7830 int remove_arv_interval_in_secs;
7831 int max_arv_count_to_delete = 0;
7839 #if defined(WINDOWS) 7867 la_init (log_path, max_mem_size);
7956 "failed to initialize replication filters");
7970 gettimeofday (&time_commit,
NULL);
7971 last_eof_time = time (
NULL);
7973 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 7974 error = tde_get_data_keys ();
7978 error = la_start_dk_sharing ();
7989 int retry_count = 0;
8016 if (last_nxarv_num == 0)
8021 if (remove_arv_interval_in_secs == 0)
8025 max_arv_count_to_delete = INT_MAX;
8030 max_arv_count_to_delete = 1;
8033 if (max_arv_count_to_delete > 0)
8045 max_arv_count_to_delete = 0;
8067 last_eof_time = time (
NULL);
8110 usleep (100 * 1000);
8118 if (log_buf ==
NULL)
8132 usleep (300 * 1000);
8139 "requested pageid (%lld) is greater than append_las.pageid (%lld) in log header",
8142 usleep (100 * 1000);
8152 usleep (300 * 1000 + (retry_count * 100));
8197 #if defined (LA_VERBOSE_DEBUG) 8203 usleep (100 * 1000);
8221 usleep (100 * 1000);
8243 #if defined (LA_VERBOSE_DEBUG) 8245 "this page is grater than eof_lsa. (%lld|%d) > eof (%lld|%d). appended (%lld|%d)",
8384 usleep (100 * 1000);
8393 char error_str[LINE_MAX];
8401 "Replication logs and catalog have been reinitialized due to rebuilt database on the peer node");
8411 #if !defined(WINDOWS) 8415 "Disconnected with the cub_master and will shut itself down",
"");
8436 static int replica_delay = -1;
8437 static time_t replica_time_bound = -1;
8438 static char *replica_time_bound_str = (
char *) -1;
8439 char buffer[LINE_MAX];
8446 if (replica_delay < 0)
8451 if (replica_time_bound_str == (
void *) -1)
8458 if (replica_time_bound_str !=
NULL)
8460 if (replica_time_bound == -1)
8463 assert (replica_time_bound != 0);
8466 if (eot_time >= replica_time_bound)
8474 snprintf (buffer,
sizeof (buffer),
8475 "applylogdb paused since it reached a log record committed on master at %s or later.\n" 8476 "Adjust or remove %s and restart applylogdb to resume", replica_time_bound_str,
8484 else if (replica_delay > 0)
8486 while ((time (
NULL) - eot_time) < replica_delay)
8496 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 8498 la_start_dk_sharing (
void)
8501 char sock_path[PATH_MAX] = { 0, };
8504 pthread_t processing_th;
8506 struct sockaddr_un serveraddr;
8508 fileio_make_ha_sock_name (sock_path, la_Info.
log_path, TDE_HA_SOCK_NAME);
8510 if (access (sock_path, F_OK) == 0)
8512 if (unlink (sock_path) < 0)
8515 return ER_TDE_DK_SHARING_SOCK_UNLINK;
8519 if ((server_sockfd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
8522 return ER_TDE_DK_SHARING_SOCK_OPEN;
8525 bzero (&serveraddr,
sizeof (serveraddr));
8526 serveraddr.sun_family = AF_UNIX;
8527 strcpy (serveraddr.sun_path, sock_path);
8529 if (bind (server_sockfd, (
struct sockaddr *) &serveraddr,
sizeof (serveraddr)) < 0)
8532 return ER_TDE_DK_SHARING_SOCK_BIND;
8535 if (listen (server_sockfd, 1) < 0)
8538 return ER_TDE_DK_SHARING_SOCK_LISTEN;
8541 la_Info.tde_sock_for_dks = server_sockfd;
8542 if (pthread_create (&processing_th,
NULL, la_process_dk_request,
NULL) < 0)
8545 return ER_TDE_DK_SHARING_PTHREAD_CREATE;
8551 la_process_dk_request (
void *arg)
8553 int client_sockfd, server_sockfd;
8554 unsigned int client_len;
8555 struct sockaddr_un clientaddr;
8561 server_sockfd = la_Info.tde_sock_for_dks;
8563 client_len =
sizeof (clientaddr);
8567 client_sockfd = accept (server_sockfd, (
struct sockaddr *) &clientaddr, &client_len);
8568 if (client_sockfd < 0)
8576 nbytes = read (client_sockfd, bufptr, len);
8586 error = ER_TDE_DK_SHARING_SOCK_READ;
8604 if (memcmp (buf, la_Info.
log_path, PATH_MAX) != 0)
8607 error = ER_TDE_WRONG_DK_REQUEST;
8613 bufptr = (
char *) &error;
8614 len =
sizeof (
error);
8617 nbytes = write (client_sockfd, bufptr, len);
8627 error = ER_TDE_DK_SHARING_SOCK_WRITE;
8643 nbytes = write (client_sockfd, bufptr, len);
8653 error = ER_TDE_DK_SHARING_SOCK_WRITE;
8662 close (client_sockfd);
#define ER_HA_LA_FAILED_TO_APPLY_UPDATE
bool la_force_shutdown(void)
DB_OBJECT * db_find_class(const char *name)
static bool la_enable_sql_logging
#define ER_LK_UNILATERALLY_ABORTED
#define ER_IO_LZ4_DECOMPRESS_FAIL
static time_t la_retrieve_eot_time(LOG_PAGE *pgptr, LOG_LSA *lsa)
static int la_init_repl_lists(bool need_realloc)
static int la_fetch_log_hdr(LA_ACT_LOG *act_log)
int tp_domain_disk_size(TP_DOMAIN *domain)
int db_set_system_parameters_for_ha_repl(const char *data)
#define LA_IN_VALUE_COUNT
static int la_create_repl_filter(void)
int db_make_datetime(DB_VALUE *value, const DB_DATETIME *datetime)
static int la_add_repl_filter(const char *classname)
#define LANG_SYS_COLLATION
static LOG_PHY_PAGEID la_log_phypageid(LOG_PAGEID logical_pageid)
static int la_find_last_deleted_arv_num(void)
static void la_release_all_page_buffers(LOG_PAGEID except_pageid)
static void la_shutdown_by_signal(int)
#define ER_LC_FAILED_TO_FLUSH_REPL_ITEMS
static void la_clear_recdes_pool(void)
static int check_reinit_copylog(void)
char * or_unpack_string(char *ptr, char **string)
#define LOG_IS_UNDOREDO_RECORD_TYPE(type)
#define LA_REINIT_COMMIT_INTERVAL
int sl_init(const char *db_name, const char *repl_log_path)
MOP ws_mop(const OID *oid, MOP class_mop)
bool LSA_EQ(const log_lsa *plsa1, const log_lsa *plsa2)
int db_datetime_to_string2(char *buf, int bufsize, DB_DATETIME *datetime)
static void la_destroy_repl_filter(void)
void ws_clear_all_repl_objs(void)
static int la_apply_delete_log(LA_ITEM *item)
LOG_LSA last_committed_rep_lsa
void LSA_COPY(log_lsa *plsa1, const log_lsa *plsa2)
#define LOG_IS_PAGE_TDE_ENCRYPTED(log_page_p)
void ws_release_user_instance(MOP mop)
static void la_invalidate_page_buffer(LA_CACHE_BUFFER *cache_buffer)
static void la_free_repl_item(LA_APPLY *apply, LA_ITEM *item)
int boot_notify_ha_log_applier_state(HA_LOG_APPLIER_STATE state)
#define ER_LOG_PAGE_CORRUPTED
int db_make_bigint(DB_VALUE *value, const DB_BIGINT num)
int db_get_int(const DB_VALUE *value)
#define ER_HA_LA_FAILED_TO_APPLY_INSERT
static int la_repl_add_object(MOP classop, LA_ITEM *item, RECDES *recdes)
static int la_log_fetch(LOG_PAGEID pageid, LA_CACHE_BUFFER *cache_buffer)
static int la_get_range_of_archive(int arv_log_num, LOG_PAGEID *fpageid, DKNPAGES *npages)
#define ER_HA_LA_FAILED_TO_CHANGE_STATE
#define GET_ZIP_LEN(length)
#define LA_PAGE_DOESNOT_EXIST
#define CUBRID_MAGIC_LOG_ACTIVE
#define LOG_IS_MVCC_OP_RECORD_TYPE(type)
LOG_PHY_PAGEID phy_pageid
int db_make_varchar(DB_VALUE *value, const int max_char_length, DB_CONST_C_CHAR str, const int char_str_byte_size, const int codeset, const int collation_id)
static int la_update_last_deleted_arv_num(int lockf_vdes, int last_deleted_arv_num)
#define CUBRID_MAGIC_LOG_ARCHIVE
int ws_add_to_repl_obj_list(OID *class_oid, char *packed_pkey_value, int packed_pkey_value_length, RECDES *recdes, int operation, bool has_index)
LOG_LSA committed_rep_lsa
int mht_rem(MHT_TABLE *ht, const void *key, int(*rem_func)(const void *key, void *data, void *args), void *func_args)
#define LA_LOG_READ_ADVANCE_WHEN_DOESNT_FIT(result, length, offset, pageid, pgptr)
static int la_lock_dbname(int *lockf_vdes, char *db_name, char *log_path)
FILEIO_LOCKF_TYPE fileio_lock_la_log_path(const char *db_full_name_p, const char *lock_path_p, int vol_fd, int *last_deleted_arv_num)
static int la_delete_ha_apply_info(void)
void fileio_unformat(THREAD_ENTRY *thread_p, const char *vol_label_p)
unsigned long insert_counter
#define LOG_GET_LOG_RECORD_HEADER(log_page_p, lsa)
static LA_ITEM * la_make_repl_item(LOG_PAGE *log_pgptr, int log_type, int tranid, LOG_LSA *lsa)
static bool la_ignore_on_error(int errid)
#define LA_GET_PAGE_RETRY_COUNT
static int la_apply_commit_list(LOG_LSA *lsa, LOG_PAGEID final_pageid)
#define CUBRID_MAGIC_LOG_INFO
SM_ATTRIBUTE * attributes
const void * mht_put(MHT_TABLE *ht, const void *key, void *data)
const LOG_PAGEID LOGPB_HEADER_PAGE_ID
int db_query_end(DB_QUERY_RESULT *result)
#define OR_MVCC_FLAG_VALID_INSID
#define assert_release(e)
static LA_CACHE_BUFFER * la_get_page_buffer(LOG_PAGEID pageid)
#define ER_NET_CANT_CONNECT_SERVER
static bool la_applier_shutdown_by_signal
#define SM_MAX_IDENTIFIER_LENGTH
char copied_log_path[4096]
#define ER_HA_LA_INVALID_REPL_LOG_PAGEID_OFFSET
static int la_update_query_execute_with_values(const char *sql, int arg_count, DB_VALUE *vals, bool au_disable)
void ws_clear_all_repl_errors_of_error_link(void)
static void la_clear_all_repl_and_commit_list(void)
static void la_add_repl_item(LA_APPLY *apply, LA_ITEM *item)
int sl_write_update_sql(DB_OTMPL *inst_tp, DB_VALUE *key)
static int la_log_io_read_with_max_retries(char *vname, int vdes, void *io_pgptr, LOG_PHY_PAGEID pageid, int pagesize, int retries)
static LOG_PAGE * la_get_page(LOG_PAGEID pageid)
#define OR_MVCC_FLAG_VALID_DELID
static bool la_need_filter_out(LA_ITEM *item)
#define SIZEOF_LA_CACHE_LOG_BUFFER(io_pagesize)
static int la_apply_update_log(LA_ITEM *item)
struct sm_component * next
void sm_downcase_name(const char *name, char *buf, int maxlen)
#define ER_HA_LA_REPL_FILTER_GENERIC
#define OR_MVCC_FLAG_SHIFT_BITS
#define ER_LOG_MOUNT_FAIL
static void la_free_and_add_next_repl_item(LA_APPLY *apply, LA_ITEM *next_item, LOG_LSA *lsa)
MOP locator_find_class(const char *classname)
bool fileio_is_volume_exist(const char *vol_label_p)
#define ER_HA_LA_FAILED_TO_APPLY_STATEMENT
unsigned long fail_counter
bool classobj_class_has_indexes(SM_CLASS *class_)
#define LSA_AS_ARGS(lsa_ptr)
static void la_log_copy_fromlog(char *rec_type, char *area, int length, LOG_PAGEID log_pageid, PGLENGTH log_offset, LOG_PAGE *log_pgptr)
#define LA_SLEEP(sec, usec)
LOG_ZIP_SIZE_T data_length
#define OR_BOUND_BIT_BYTES(count)
void dbt_abort_object(DB_OTMPL *def)
#define PTR_ALIGN(addr, boundary)
static int la_commit_transaction(void)
int mht_compare_logpageids_are_equal(const void *key1, const void *key2)
bool LSA_LT(const log_lsa *plsa1, const log_lsa *plsa2)
#define er_log_debug(...)
static void la_release_page_buffer(LOG_PAGEID pageid)
LOG_ZIP * log_zip_alloc(LOG_ZIP_SIZE_T size)
DB_DATETIME last_access_time
int la_log_page_check(const char *database_name, const char *log_path, INT64 page_num, bool check_applied_info, bool check_copied_info, bool check_replica_info, bool verbose, LOG_LSA *copied_eof_lsa, LOG_LSA *copied_append_lsa, LOG_LSA *applied_final_lsa)
int sl_write_statement_sql(char *class_name, char *db_user, int item_type, const char *stmt_text, char *ha_sys_prm)
#define ER_OBJ_NO_CONNECT
#define LC_UPDATE_OPERATION_TYPE(p)
LA_CACHE_BUFFER_AREA * next
#define ER_HA_LA_FAILED_TO_APPLY_DELETE
bool is_apply_info_updated
static RECDES * la_assign_recdes_from_pool(void)
static int la_delay_replica(time_t eot_time)
int dbt_put_internal(DB_OTMPL *def, const char *name, DB_VALUE *value)
unsigned long update_counter
#define LA_WS_CULL_MOPS_INTERVAL
MOBJ locator_fetch_class(MOP class_mop, DB_FETCH_MODE purpose)
int db_make_string(DB_VALUE *value, DB_CONST_C_CHAR str)
unsigned long delete_counter
#define LA_DEFAULT_LOG_PAGE_SIZE
void mht_destroy(MHT_TABLE *ht)
INTL_CODESET lang_charset(void)
#define LA_REPL_LIST_COUNT
unsigned long schema_counter
static char * la_get_zipped_data(char *undo_data, int undo_length, bool is_diff, bool is_undo_zip, bool is_overflow, char **rec_type, char **data, int *length)
static void la_make_room_for_mvcc_insid(RECDES *recdes)
void er_set(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
LA_REPL_FILTER repl_filter
const char * sm_ch_name(const MOBJ clobj)
LA_CACHE_BUFFER_AREA * buffer_area
#define LA_WS_CULL_MOPS_INTERVAL_MIN
#define OR_MVCC_FLAG_VALID_PREV_VERSION
static int la_check_mem_size(void)
#define ER_LC_PARTIALLY_FAILED_TO_FLUSH
int sl_write_insert_sql(DB_OTMPL *inst_tp, DB_VALUE *key)
static void la_free_repl_items_by_tranid(int tranid)
#define LA_MAX_TOLERABLE_DELAY
static bool la_apply_pre(void)
#define OR_MVCC_FLAG_MASK
int prm_get_integer_value(PARAM_ID prm_id)
int au_fetch_class(MOP op, SM_CLASS **class_ptr, AU_FETCHMODE fetchmode, DB_AUTH type)
static int la_find_log_pagesize(LA_ACT_LOG *act_log, const char *logpath, const char *dbname, bool check_charset)
static char la_peer_host[CUB_MAXHOSTNAMELEN+1]
int or_mvcc_get_repid_and_flags(OR_BUF *buf, int *error)
void fileio_close(int vol_fd)
int db_query_first_tuple(DB_QUERY_RESULT *result)
#define CUBRID_MAGIC_MAX_LENGTH
#define ER_OUT_OF_VIRTUAL_MEMORY
#define LA_WS_CULL_MOPS_PER_APPLY
#define MSGCAT_LOG_LOGINFO_COMMENT
static int la_log_commit(bool update_commit_time)
bool LSA_LE(const log_lsa *plsa1, const log_lsa *plsa2)
static void la_print_repl_filter_info(void)
static void la_free_all_repl_items(LA_APPLY *apply)
unsigned long start_vsize
int la_apply_log_file(const char *database_name, const char *log_path, const int max_mem_size)
char loginf_path[PATH_MAX]
const char * css_ha_server_state_string(HA_SERVER_STATE state)
#define ER_HA_GENERIC_ERROR
#define ER_HB_PROCESS_EVENT
#define DB_MAX_IDENTIFIER_LENGTH
static LA_CACHE_BUFFER * la_cache_buffer_replace(LA_CACHE_PB *cache_pb, LOG_PAGEID pageid, int io_pagesize, int buffer_size)
LOG_PAGEID logical_pageid
void db_localdatetime(time_t *epoch_time, DB_DATETIME *datetime)
#define LA_NUM_REPL_FILTER
const char * db_error_string(int level)
static enum scanner_mode mode
static int la_add_node_into_la_commit_list(int tranid, LOG_LSA *lsa, int type, time_t eot_time)
#define LA_MAX_REPL_ITEM_WITHOUT_RELEASE_PB
LOG_LSA last_committed_lsa
enum ha_log_applier_state HA_LOG_APPLIER_STATE
int db_reset_system_parameters_from_assignments(const char *data)
static int la_apply_repl_log(int tranid, int rectype, LOG_LSA *commit_lsa, int *total_rows, LOG_PAGEID final_pageid)
static int la_apply_statement_log(LA_ITEM *item)
static int la_get_log_data(LOG_RECORD_HEADER *lrec, LOG_LSA *lsa, LOG_PAGE *pgptr, unsigned int match_rcvindex, unsigned int *rcvindex, void **logs, char **rec_type, char **data, int *d_length)
LOG_LSA committed_rep_lsa
static LA_ITEM * la_get_next_repl_item(LA_ITEM *item, bool is_long_trans, LOG_LSA *last_lsa)
static int la_log_fetch_from_archive(LOG_PAGEID pageid, char *data)
DB_DATETIME log_record_time
void * mht_get(MHT_TABLE *ht, const void *key)
static int la_get_undoredo_diff(LOG_PAGE **pgptr, LOG_PAGEID *pageid, PGLENGTH *offset, bool *is_undo_zip, char **undo_data, int *undo_length)
#define ER_HA_LA_EXCEED_MAX_MEM_SIZE
const char * er_msg(void)
int sl_write_delete_sql(char *class_name, MOBJ mclass, DB_VALUE *key)
int db_execute(const char *CSQL_query, DB_QUERY_RESULT **result, DB_QUERY_ERROR *query_error)
MOP au_find_user(const char *user_name)
static int la_get_relocation_recdes(LOG_RECORD_HEADER *lrec, LOG_PAGE *pgptr, unsigned int match_rcvindex, void **logs, char **rec_type, RECDES *recdes)
static int la_get_next_update_log(LOG_RECORD_HEADER *prev_lrec, LOG_PAGE *pgptr, void **logs, char **rec_type, char **data, int *d_length)
static int la_get_current(OR_BUF *buf, SM_CLASS *sm_class, int bound_bit_flag, DB_OTMPL *def, DB_VALUE *key, int offset_size)
DB_DATETIME log_commit_time
static int la_realloc_recdes_data(RECDES *recdes, int data_size)
static int la_change_state(void)
static int la_unlock_dbname(int *lockf_vdes, char *db_name, bool clear_owner)
bool LSA_ISNULL(const log_lsa *lsa_ptr)
#define LA_MAX_UNFLUSHED_REPL_ITEMS
#define db_private_free_and_init(thrd, ptr)
static int la_init_recdes_pool(int page_size, int num_recdes)
static LA_APPLY * la_find_apply_list(int tranid)
MHT_TABLE * mht_create(const char *name, int est_size, unsigned int(*hash_func)(const void *key, unsigned int ht_size), int(*cmp_func)(const void *key1, const void *key2))
const char * get_buffer() const
static int la_get_last_ha_applied_info(void)
char * or_unpack_int(char *ptr, int *number)
void or_init(OR_BUF *buf, char *data, int length)
LA_CACHE_BUFFER ** log_buffer
static struct timeval start_time
static bool la_retry_on_error(int errid)
need_clear_type need_clear
#define CEIL_PTVDIV(dividend, divisor)
#define ER_LOG_NOTIN_ARCHIVE
int sm_flush_objects(MOP obj)
#define HB_START_WAITING_TIME_IN_SECS
void er_set_with_oserror(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
int pr_clear_value(DB_VALUE *value)
int db_query_get_tuple_valuelist(DB_QUERY_RESULT *result, int size, DB_VALUE *value_list)
#define MSGCAT_CATALOG_CUBRID
DB_BIGINT db_get_bigint(const DB_VALUE *value)
static int la_update_ha_apply_info_start_time(void)
void fileio_make_log_active_name(char *log_active_name_p, const char *log_path_p, const char *db_name_p)
const char * lang_charset_cubrid_name(const INTL_CODESET codeset)
#define LA_PAGE_EXST_IN_ACTIVE_LOG
static float la_get_avg(int *array, int size)
static int la_find_archive_num(int *arv_log_num, LOG_PAGEID pageid)
char * or_unpack_mem_value(char *buf, DB_VALUE *value)
void fileio_make_log_info_name(char *log_info_name_p, const char *log_path_p, const char *db_name_p)
LOG_REC_UNDOREDO undoredo
bool last_is_end_of_record
int db_datetime_to_string(char *buf, int bufsize, DB_DATETIME *datetime)
void ws_init_repl_objs(void)
#define LA_RETRY_ON_ERROR(error)
bool LSA_GE(const log_lsa *plsa1, const log_lsa *plsa2)
void fileio_make_log_archive_name(char *log_archive_name_p, const char *log_path_p, const char *db_name_p, int archive_number)
int db_query_column_count(DB_QUERY_RESULT *result)
static char * la_get_hostname_from_log_path(char *log_path)
#define LA_LOG_IS_IN_ARCHIVE(pageid)
static int la_remove_archive_logs(const char *db_name, int last_deleted_arv_num, int nxarv_num, int max_arv_count_to_delete)
int or_get_offset_internal(OR_BUF *buf, int *error, int offset_size)
static int la_apply_insert_log(LA_ITEM *item)
#define MSGCAT_LOG_LOGINFO_REMOVE_REASON
static int la_init_cache_log_buffer(LA_CACHE_PB *cache_pb, int slb_cnt, int slb_size)
static void error(const char *msg)
int locator_repl_flush_all(void)
static char la_slave_db_name[DB_MAX_IDENTIFIER_LENGTH+1]
static void la_clear_applied_info(LA_APPLY *apply)
static int la_get_overflow_recdes(LOG_RECORD_HEADER *lrec, void *logs, RECDES *recdes, unsigned int rcvindex)
static unsigned int log_pageid_hash(const void *key, unsigned int htsize)
static LA_ITEM * la_get_next_repl_item_from_list(LA_ITEM *item)
DB_OTMPL * dbt_create_object_internal(DB_OBJECT *classobj)
int db_execute_with_values(const char *CSQL_query, DB_QUERY_RESULT **result, DB_QUERY_ERROR *query_error, int arg_count, DB_VALUE *vals)
FILEIO_LOCKF_TYPE fileio_unlock_la_dbname(int *lockf_vdes, char *db_name, bool clear_owner)
static char database_name[MAX_HA_DBINFO_LENGTH]
void db_sprint_value(const db_value *value, string_buffer &sb)
int sm_partitioned_class_type(DB_OBJECT *classop, int *partition_type, char *keyattr, MOP **partitions)
#define snprintf_dots_truncate(dest, max_len,...)
const char * logwr_log_ha_filestat_to_string(enum LOG_HA_FILESTAT val)
bool log_unzip(LOG_ZIP *log_unzip, LOG_ZIP_SIZE_T length, void *data)
time_t last_time_archive_deleted
#define MSGCAT_LOG_MAX_ARCHIVES_HAS_BEEN_EXCEEDED
#define LA_DEFAULT_CACHE_BUFFER_SIZE
#define LOG_IS_DIFF_UNDOREDO_TYPE(type)
static LA_ITEM * la_new_repl_item(LOG_LSA *lsa, LOG_LSA *target_lsa)
int db_get_client_type(void)
int packed_key_value_length
WS_REPL_FLUSH_ERR * ws_get_repl_error_from_error_link(void)
void log_zip_free(LOG_ZIP *log_zip)
static void la_free_all_repl_items_except_head(LA_APPLY *apply)
#define free_and_init(ptr)
static int la_expand_cache_log_buffer(LA_CACHE_PB *cache_pb, int slb_cnt, int slb_size)
char * prm_get_string_value(PARAM_ID prm_id)
void LSA_SET_NULL(log_lsa *lsa_ptr)
#define LA_MAX_REPL_ITEMS
bool LSA_GT(const log_lsa *plsa1, const log_lsa *plsa2)
static int la_update_query_execute(const char *sql, bool au_disable)
#define LOG_IS_REDO_RECORD_TYPE(type)
static LA_ITEM * la_get_next_repl_item_from_log(LA_ITEM *item, LOG_LSA *last_lsa)
FILEIO_LOCKF_TYPE fileio_lock_la_dbname(int *lockf_vdes, char *db_name, char *log_path)
#define DB_CURSOR_SUCCESS
enum intl_codeset INTL_CODESET
bool log_diff(LOG_ZIP_SIZE_T undo_length, const void *undo_data, LOG_ZIP_SIZE_T redo_length, void *redo_data)
static int la_flush_repl_items(bool immediate)
static int la_check_duplicated(const char *logpath, const char *dbname, int *lockf_vdes, int *last_deleted_arv_num)
LA_RECDES_POOL la_recdes_pool
bool prm_get_bool_value(PARAM_ID prm_id)
static LOG_REC_HA_SERVER_STATE * la_get_ha_server_state(LOG_PAGE *pgptr, LOG_LSA *lsa)
#define LC_IS_FLUSH_UPDATE(operation)
#define ZIP_CHECK(length)
#define AU_SAVE_AND_DISABLE(save)
static int la_disk_to_obj(MOBJ classobj, RECDES *record, DB_OTMPL *def, DB_VALUE *key)
#define ER_TM_SERVER_DOWN_UNILATERALLY_ABORTED
#define OR_GET_MVCC_REPID_AND_FLAG(ptr)
#define DB_VALUE_TYPE(value)
int db_make_null(DB_VALUE *value)
char * msgcat_message(int cat_id, int set_id, int msg_id)
TDE_DATA_KEY_SET data_keys
static int la_set_repl_log(LOG_PAGE *log_pgptr, int log_type, int tranid, LOG_LSA *lsa)
const char * prm_get_name(PARAM_ID prm_id)
static int la_check_time_commit(struct timeval *time, unsigned int threshold)
#define DB_IS_NULL(value)
#define OR_GET_OFFSET_SIZE(ptr)
#define ER_HA_LA_STOPPED_BY_SIGNAL
static int la_log_io_open(const char *vlabel, int flags, int mode)
static void la_init(const char *log_path, const int max_mem_size)
char * strdup(const char *str)
int log_dump_log_info(const char *logname_info, bool also_stdout, const char *fmt,...)
static LA_APPLY * la_add_apply_list(int tranid)
DB_DATETIME * db_get_datetime(const DB_VALUE *value)
#define FILEIO_PATH_SEPARATOR(path)
static int la_update_ha_apply_info_log_record_time(time_t new_time)
#define LA_MOVE_INSIDE_RECORD(rec, dest_offset, src_offset)
#define ER_HA_LA_INVALID_REPL_LOG_RECORD
#define LA_WS_CULL_MOPS_PER_APPLY_MIN
void ws_filter_dirty(void)
int db_value_clear(DB_VALUE *value)
void la_print_log_arv_header(const char *database_name, LOG_ARV_HEADER *hdr, bool verbose)
int db_make_int(DB_VALUE *value, const int num)
#define LA_QUERY_BUF_SIZE
static bool la_applier_need_shutdown
static int la_log_record_process(LOG_RECORD_HEADER *lrec, LOG_LSA *final, LOG_PAGE *pg_ptr)
#define LC_INSERT_OPERATION_TYPE(p)
LA_CACHE_BUFFER * buffer_area
enum ha_server_state HA_SERVER_STATE
#define OR_BOUND_BIT_FLAG
time_t util_str_to_time_since_epoch(char *str)
#define LA_NUM_DELAY_HISTORY
static int la_update_ha_last_applied_info(void)
#define LA_PAGE_EXST_IN_ARCHIVE_LOG
unsigned long commit_counter
static int la_get_ha_apply_info(const char *log_path, const char *prefix_name, LA_HA_APPLY_INFO *ha_apply_info)
static void la_init_ha_apply_info(LA_HA_APPLY_INFO *ha_apply_info)
static bool la_is_repl_lists_empty()
static void la_decache_page_buffers(LOG_PAGEID from, LOG_PAGEID to)
bool required_lsa_changed
#define CUB_MAXHOSTNAMELEN
static void la_unlink_repl_item(LA_APPLY *apply, LA_ITEM *item)
#define LA_LOG_READ_ALIGN(result, offset, pageid, log_pgptr)
#define LC_IS_FLUSH_INSERT(operation)
static void la_get_adaptive_time_commit_interval(int *time_commit_interval, int *delay_hist)
void la_print_log_header(const char *database_name, LOG_HEADER *hdr, bool verbose)
static bool la_restart_on_bulk_flush_error(int errid)
static void la_shutdown(void)
#define ER_TDE_CIPHER_IS_NOT_LOADED
int tde_decrypt_log_page(const LOG_PAGE *logpage_cipher, TDE_ALGORITHM tde_algo, LOG_PAGE *logpage_plain)
#define ER_NET_SERVER_COMM_ERROR
int last_deleted_archive_num
static int la_log_io_read(char *vname, int vdes, void *io_pgptr, LOG_PHY_PAGEID pageid, int pagesize)
bool sysprm_find_err_in_integer_list(PARAM_ID prm_id, int error_code)
static int la_get_recdes(LOG_LSA *lsa, LOG_PAGE *pgptr, RECDES *recdes, unsigned int *rcvindex, char *rec_type)
static int la_insert_ha_apply_info(DB_DATETIME *creation_time)
int fileio_open(const char *vol_label_p, int flags, int mode)
#define LA_LOG_READ_ADD_ALIGN(result, add, offset, pageid, log_pgptr)
int db_ping_server(int client_val, int *server_val)
void la_print_delay_info(LOG_LSA working_lsa, LOG_LSA target_lsa, float process_rate)
static int la_does_page_exist(LOG_PAGEID pageid)
DB_DATETIME creation_time
static int la_find_required_lsa(LOG_LSA *required_lsa)
SIGNAL_HANDLER_FUNCTION os_set_signal_handler(const int sig_no, SIGNAL_HANDLER_FUNCTION sig_handler)
DB_CONST_C_CHAR db_get_string(const DB_VALUE *value)
#define THREAD_CALLING_CONVENTION
int or_advance(OR_BUF *buf, int offset)
#define ER_HA_LA_UNEXPECTED_EOF_IN_ARCHIVE_LOG
const TRANID LOG_SYSTEM_TRANID
static unsigned long la_get_mem_size(void)
#define OR_GET_BOUND_BIT(bitptr, element)
int db_commit_transaction(void)
const char * css_ha_applier_state_string(HA_LOG_APPLIER_STATE state)
void ws_free_repl_flush_error(WS_REPL_FLUSH_ERR *flush_err)
char * envvar_confdir_file(char *path, size_t size, const char *filename)
static DB_VALUE * la_get_item_pk_value(LA_ITEM *item)
static LA_CACHE_PB * la_init_cache_pb(void)
#define LA_OUT_VALUE_COUNT