31 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 32 #include "sys/socket.h" 50 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 53 #if defined(SERVER_MODE) 66 #define LOGWR_THREAD_SUSPEND_TIMEOUT 10 68 #define LOGWR_COPY_LOG_BUFFER_NPAGES LOGPB_BUFFER_NPAGES_LOWER 92 #define logwr_er_log(...) if (prm_get_bool_value (PRM_ID_DEBUG_LOGWR)) _er_log_debug (ARG_FILE_LINE, __VA_ARGS__) 98 init_cs_logwr_header ()
111 LOGWR_GLOBAL logwr_Gl = {
113 init_cs_logwr_header (),
172 static int logwr_read_log_header (
void);
173 static int logwr_read_bgarv_log_header (
void);
174 static int logwr_initialize (
const char *
db_name,
const char *log_path,
int mode,
LOG_PAGEID start_pageid);
175 static void logwr_finalize (
void);
177 static int logwr_flush_all_append_pages (
void);
178 static int logwr_archive_active_log (
void);
179 static int logwr_flush_bgarv_header_page (
void);
180 static void logwr_reinit_copylog (
void);
181 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 182 static int logwr_load_tde (
void);
205 tmp_pageid = logical_pageid - logwr_Gl.hdr.fpageid;
207 if (tmp_pageid >= logwr_Gl.hdr.npages)
209 tmp_pageid %= logwr_Gl.hdr.npages;
211 else if (tmp_pageid < 0)
213 tmp_pageid = (logwr_Gl.hdr.npages - ((-tmp_pageid) % logwr_Gl.hdr.npages));
216 if (tmp_pageid > logwr_Gl.hdr.npages)
218 tmp_pageid %= logwr_Gl.hdr.npages;
286 logwr_read_log_header (
void)
289 char *aligned_log_pgbuf;
294 log_pgptr = (
LOG_PAGE *) aligned_log_pgbuf;
305 logwr_Gl.append_vdes =
321 logwr_Gl.active_name);
351 logwr_read_bgarv_log_header (
void)
354 char *aligned_log_pgbuf;
360 bg_arv_info = &logwr_Gl.bg_archive_info;
363 log_pgptr = (
LOG_PAGE *) aligned_log_pgbuf;
367 error = logwr_fetch_header_page (log_pgptr, bg_arv_info->
vdes);
401 logwr_shutdown_by_signal (
int)
426 logwr_initialize (
const char *
db_name,
const char *log_path,
int mode,
LOG_PAGEID start_pageid)
430 char *at_char =
NULL;
444 strncpy (logwr_Gl.db_name, db_name, PATH_MAX - 1);
445 if ((at_char = strchr (logwr_Gl.db_name,
'@')) !=
NULL)
448 logwr_Gl.hostname = at_char + 1;
450 strncpy (logwr_Gl.log_path, log_path, PATH_MAX - 1);
462 if (logwr_Gl.logpg_area ==
NULL)
465 logwr_Gl.logpg_area = (
char *) malloc (logwr_Gl.logpg_area_size);
466 if (logwr_Gl.logpg_area ==
NULL)
469 logwr_Gl.logpg_area_size = 0;
474 if (logwr_Gl.toflush ==
NULL)
478 logwr_Gl.max_toflush = log_nbuffers - 1;
479 logwr_Gl.toflush = (
LOG_PAGE **) calloc (logwr_Gl.max_toflush, sizeof (logwr_Gl.toflush));
480 if (logwr_Gl.toflush ==
NULL)
483 logwr_Gl.max_toflush * sizeof (logwr_Gl.toflush));
484 logwr_Gl.max_toflush = 0;
487 for (i = 0; i < logwr_Gl.max_toflush; i++)
489 logwr_Gl.toflush[
i] =
NULL;
493 error = logwr_read_log_header ();
499 logwr_Gl.start_pageid = start_pageid;
506 logwr_Gl.action = LOGWR_ACTION_NONE;
508 logwr_Gl.last_arv_fpageid = logwr_Gl.hdr.nxarv_pageid;
509 logwr_Gl.last_arv_num = logwr_Gl.hdr.nxarv_num;
511 logwr_Gl.force_flush =
false;
512 logwr_Gl.last_flush_time.tv_sec = 0;
513 logwr_Gl.last_flush_time.tv_usec = 0;
520 bg_arv_info = &logwr_Gl.bg_archive_info;
532 error = logwr_read_bgarv_log_header ();
557 logwr_Gl.hdr.npages + 1,
false,
false,
false, LOG_PAGESIZE, 0,
false);
561 "Unable to create temporary archive log");
569 error = logwr_flush_bgarv_header_page ();
587 logwr_finalize (
void)
589 if (logwr_Gl.logpg_area !=
NULL)
592 logwr_Gl.logpg_area_size = 0;
593 logwr_Gl.logpg_fill_size = 0;
594 logwr_Gl.loghdr_pgptr =
NULL;
596 if (logwr_Gl.toflush !=
NULL)
599 logwr_Gl.max_toflush = 0;
600 logwr_Gl.num_toflush = 0;
609 logwr_Gl.action = LOGWR_ACTION_NONE;
611 logwr_Gl.force_flush =
false;
612 logwr_Gl.last_flush_time.tv_sec = 0;
613 logwr_Gl.last_flush_time.tv_usec = 0;
616 logwr_Gl.start_pageid = -2;
627 logwr_Gl.reinit_copylog =
false;
637 logwr_set_hdr_and_flush_info (
void)
645 while (p < (logwr_Gl.logpg_area + logwr_Gl.logpg_fill_size))
648 logwr_Gl.toflush[num_toflush++] = log_pgptr;
652 last_pgptr = log_pgptr;
653 logwr_Gl.num_toflush = num_toflush;
656 logwr_Gl.ori_nxarv_pageid = logwr_Gl.
hdr.nxarv_pageid;
661 log_pgptr = (
LOG_PAGE *) logwr_Gl.logpg_area;
663 logwr_Gl.loghdr_pgptr = log_pgptr;
666 if (logwr_Gl.last_arv_fpageid ==
NULL_PAGEID || logwr_Gl.last_arv_num < 0)
668 logwr_Gl.last_arv_fpageid = logwr_Gl.
hdr.nxarv_pageid;
669 logwr_Gl.last_arv_num = logwr_Gl.hdr.nxarv_num;
672 logwr_Gl.bg_archive_info.start_page_id = logwr_Gl.last_arv_fpageid;
677 if (((logwr_Gl.last_arv_num + 1 < logwr_Gl.hdr.nxarv_num)
679 && (logwr_Gl.last_arv_fpageid <= logwr_Gl.last_recv_pageid))
682 logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action | LOGWR_ACTION_ARCHIVING);
683 logwr_Gl.last_arv_lpageid = logwr_Gl.last_recv_pageid;
685 else if ((logwr_Gl.last_arv_num + 1 == logwr_Gl.hdr.nxarv_num)
686 && (last_pgptr->hdr.logical_pageid >= logwr_Gl.hdr.nxarv_pageid))
688 logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action | LOGWR_ACTION_ARCHIVING);
689 logwr_Gl.last_arv_lpageid = logwr_Gl.hdr.nxarv_pageid - 1;
692 if (last_pgptr !=
NULL && last_pgptr->hdr.logical_pageid < logwr_Gl.hdr.eof_lsa.pageid)
696 logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action | LOGWR_ACTION_DELAYED_WRITE);
700 logwr_Gl.last_recv_pageid = logwr_Gl.hdr.eof_lsa.pageid;
702 if (logwr_Gl.action & LOGWR_ACTION_DELAYED_WRITE)
705 logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action & ~LOGWR_ACTION_DELAYED_WRITE);
713 log_pgptr = (
LOG_PAGE *) logwr_Gl.logpg_area;
721 logwr_Gl.reinit_copylog =
true;
723 logwr_Gl.loghdr_pgptr = (
LOG_PAGE *) logwr_Gl.logpg_area;
738 logwr_Gl.last_recv_pageid = logwr_Gl.hdr.append_lsa.pageid - 1;
743 logwr_Gl.last_recv_pageid = logwr_Gl.hdr.eof_lsa.pageid - 1;
749 logwr_Gl.hdr.append_lsa.pageid = logwr_Gl.last_recv_pageid;
767 logwr_copy_necessary_log (
LOG_PAGEID to_pageid)
770 char *aligned_log_pgbuf =
NULL;
778 bg_arv_info = &logwr_Gl.bg_archive_info;
789 log_pgptr = (
LOG_PAGE *) aligned_log_pgbuf;
791 assert (logwr_Gl.last_arv_fpageid <= pageid && pageid <= logwr_Gl.hdr.append_lsa.pageid);
792 assert (logwr_Gl.last_arv_fpageid <= to_pageid && to_pageid <= logwr_Gl.hdr.append_lsa.pageid);
794 for (; pageid < to_pageid; pageid += num_pages, ar_phy_pageid += num_pages)
798 num_pages = MIN (num_pages, logwr_Gl.hdr.npages - phy_pageid + 1);
800 if (
fileio_read_pages (
NULL, logwr_Gl.append_vdes, (
char *) log_pgptr, phy_pageid, num_pages, LOG_PAGESIZE) ==
819 logwr_Gl.bg_archive_name);
843 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 850 int tde_load_retries = 3;
852 #if !defined (CS_MODE) 856 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 869 bg_arv_info = &logwr_Gl.bg_archive_info;
882 if (logwr_copy_necessary_log (fpageid) !=
NO_ERROR)
890 for (i = 0; i < npages; i++)
892 log_pgptr = to_flush[
i];
893 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 896 logwr_set_tde_algorithm (
NULL, log_pgptr, tde_algo);
900 error = logwr_load_tde ();
904 if (tde_load_retries-- > 0)
919 log_pgptr = buf_pgptr;
927 logwr_Gl.bg_archive_name, logwr_Gl.hdr.db_logpagesize);
932 logwr_Gl.bg_archive_name);
940 logwr_er_log (
"background archiving current_page_id[%lld], fpageid[%lld], npages[%d]",
943 error = logwr_flush_bgarv_header_page ();
951 tde_load_retries = 3;
955 for (i = 0; i < npages; i++)
957 log_pgptr = to_flush[
i];
958 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 961 logwr_set_tde_algorithm (
NULL, log_pgptr, tde_algo);
965 error = logwr_load_tde ();
969 if (tde_load_retries-- > 0)
984 log_pgptr = buf_pgptr;
987 if (
fileio_write (
NULL, logwr_Gl.append_vdes, log_pgptr, phy_pageid + i, LOG_PAGESIZE, write_mode) ==
NULL)
992 logwr_Gl.active_name, logwr_Gl.hdr.db_logpagesize);
997 logwr_Gl.active_name);
1014 logwr_flush_all_append_pages (
void)
1020 int flush_page_count;
1026 flush_page_count = 0;
1028 for (i = 0; i < logwr_Gl.num_toflush; i++)
1030 pgptr = logwr_Gl.toflush[
i];
1032 if (idxflush != -1 && prv_pgptr !=
NULL)
1042 if ((pageid != prv_pageid + 1)
1050 if (logwr_writev_append_pages (&logwr_Gl.toflush[idxflush], i - idxflush) ==
NULL)
1063 flush_page_count += i - idxflush;
1088 int page_toflush = logwr_Gl.num_toflush - idxflush;
1091 if (logwr_writev_append_pages (&logwr_Gl.toflush[idxflush], page_toflush) ==
NULL)
1099 flush_page_count += page_toflush;
1100 pgptr = logwr_Gl.toflush[idxflush + page_toflush - 1];
1108 if (need_sync ==
true 1127 for (i = 0; i < logwr_Gl.num_toflush; i++)
1129 logwr_Gl.toflush[
i] =
NULL;
1131 logwr_Gl.num_toflush = 0;
1133 logwr_er_log (
"logwr_write_log_pages, flush_page_count(%d)\n", flush_page_count);
1145 logwr_flush_bgarv_header_page (
void)
1148 char *aligned_log_pgbuf;
1156 bg_arv_info = &logwr_Gl.bg_archive_info;
1162 log_pgptr = (
LOG_PAGE *) aligned_log_pgbuf;
1186 logwr_Gl.bg_archive_name, LOG_PAGESIZE);
1192 logwr_Gl.bg_archive_name);
1207 logwr_flush_header_page (
void)
1213 if (logwr_Gl.loghdr_pgptr ==
NULL)
1218 logwr_Gl.hdr.is_shutdown =
true;
1221 logwr_Gl.hdr.nxarv_num = logwr_Gl.last_arv_num;
1222 logwr_Gl.hdr.nxarv_pageid = logwr_Gl.last_arv_fpageid;
1225 memcpy (logwr_Gl.loghdr_pgptr->area, &logwr_Gl.hdr, sizeof (logwr_Gl.hdr));
1231 if (
fileio_write (
NULL, logwr_Gl.append_vdes, logwr_Gl.loghdr_pgptr, phy_pageid, LOG_PAGESIZE,
1238 nbytes = logwr_Gl.hdr.db_logpagesize;
1240 logwr_Gl.active_name, nbytes);
1245 logwr_Gl.active_name);
1250 logwr_Gl.last_chkpt_pageid = logwr_Gl.hdr.chkpt_lsa.pageid;
1255 error_msg (
"change the state of HA server (%s@%s) from '%s' to '%s'", logwr_Gl.db_name,
1256 (logwr_Gl.hostname !=
NULL) ? logwr_Gl.hostname :
"unknown",
1261 prev_ha_server_state = logwr_Gl.hdr.ha_server_state;
1263 logwr_er_log (
"logwr_flush_header_page, ha_server_state=%s, ha_file_status=%s\n",
1275 logwr_archive_active_log (
void)
1277 char archive_name[PATH_MAX] = {
'\0' };
1280 char *aligned_log_pgbuf;
1290 char buffer[LINE_MAX];
1296 malloc_arv_hdr_pgptr = (
LOG_PAGE *) malloc (LOG_PAGESIZE);
1297 if (malloc_arv_hdr_pgptr ==
NULL)
1312 arvhdr->
fpageid = logwr_Gl.last_arv_fpageid;
1313 arvhdr->
arv_num = logwr_Gl.last_arv_num;
1320 snprintf (buffer,
sizeof (buffer),
"log archiving started for archive %03d", arvhdr->
arv_num);
1324 bg_arv_info = &logwr_Gl.bg_archive_info;
1331 "invalid temporary archive log file");
1334 vdes = bg_arv_info->
vdes;
1351 false,
false, LOG_PAGESIZE, 0,
false);
1379 pageid = logwr_Gl.last_arv_fpageid;
1383 log_pgptr = (
LOG_PAGE *) aligned_log_pgbuf;
1386 for (; pageid <= logwr_Gl.last_arv_lpageid; pageid += num_pages, ar_phy_pageid += num_pages)
1392 num_pages = MIN (
LOGPB_IO_NPAGES, (
int) (logwr_Gl.last_arv_lpageid - pageid + 1));
1395 num_pages = MIN (num_pages, logwr_Gl.hdr.npages - phy_pageid + 1);
1397 if (
fileio_read_pages (
NULL, logwr_Gl.append_vdes, (
char *) log_pgptr, phy_pageid, num_pages, LOG_PAGESIZE) ==
1436 logwr_Gl.hdr.npages,
false,
false,
false, LOG_PAGESIZE, 0,
false);
1447 error_code = logwr_flush_bgarv_header_page ();
1455 logwr_Gl.last_arv_num++;
1456 logwr_Gl.last_arv_fpageid = logwr_Gl.last_arv_lpageid + 1;
1460 LSA_COPY (&saved_append_lsa, &logwr_Gl.hdr.append_lsa);
1461 logwr_Gl.hdr.append_lsa.pageid = logwr_Gl.last_arv_lpageid;
1465 logwr_flush_header_page ();
1468 LSA_COPY (&logwr_Gl.hdr.append_lsa, &saved_append_lsa);
1476 catmsg =
"ARCHIVE: %d %s %lld %lld\n";
1481 logwr_er_log (
"logwr_archive_active_log, arv_num(%d), fpageid(%lld) lpageid(%lld)\n", arvhdr->
arv_num,
1490 if (malloc_arv_hdr_pgptr !=
NULL)
1511 logwr_write_log_pages (
void)
1514 struct timeval curtime;
1517 if (logwr_Gl.num_toflush <= 0)
1522 gettimeofday (&curtime,
NULL);
1524 (((curtime.tv_sec - logwr_Gl.last_flush_time.tv_sec) * 1000) +
1525 ((curtime.tv_usec - logwr_Gl.last_flush_time.tv_usec) / 1000));
1527 if (logwr_Gl.force_flush ==
false && !LOGWR_AT_SERVER_ARCHIVING ()
1528 && (logwr_Gl.hdr.eof_lsa.pageid <= logwr_Gl.toflush[0]->hdr.logical_pageid) && (diff_msec < 1000))
1533 logwr_Gl.force_flush =
false;
1539 logwr_Gl.append_vdes =
1554 if (logwr_Gl.action & LOGWR_ACTION_ARCHIVING)
1556 error = logwr_archive_active_log ();
1563 error = logwr_flush_all_append_pages ();
1569 logwr_flush_header_page ();
1571 gettimeofday (&logwr_Gl.last_flush_time,
NULL);
1576 #if !defined(WINDOWS) 1586 logwr_copy_log_header_check (
const char *db_name,
bool verbose,
LOG_LSA * master_eof_lsa)
1592 char *request, *reply;
1594 char *logpg_area =
NULL;
1599 atchar = (
char *) strchr (db_name, '@');
1615 (
char **) &logpg_area, verbose);
1623 loghdr_pgptr = (
LOG_PAGE *) logpg_area;
1624 memcpy (&hdr, loghdr_pgptr->area, sizeof (
LOG_HEADER));
1626 *master_eof_lsa = hdr.
eof_lsa;
1628 printf (
"\n *** Active Info. *** \n");
1642 (
char **) &logpg_area, verbose);
1664 if ((error = logwr_initialize (db_name, log_path, mode, start_page_id)) !=
NO_ERROR)
1680 #if !defined(WINDOWS) 1683 "Encountered an unrecoverable error and will shut itself down",
"");
1687 if (logwr_Gl.reinit_copylog)
1689 char error_str[LINE_MAX];
1692 "Replication logs and catalog have been reinitialized " 1693 "due to rebuilt database on the peer node");
1697 logwr_reinit_copylog ();
1704 if (logwr_Gl.action & LOGWR_ACTION_ASYNC_WRITE)
1706 error = logwr_write_log_pages ();
1713 logwr_Gl.action = (LOGWR_ACTION) (logwr_Gl.action & LOGWR_ACTION_DELAYED_WRITE);
1716 #if !defined(WINDOWS) 1720 "Disconnected with the cub_master and will shut itself down",
"");
1729 logwr_Gl.force_flush =
true;
1730 logwr_write_log_pages ();
1756 logwr_reinit_copylog (
void)
1758 #if !defined(WINDOWS) 1761 char log_archive_path[2 * PATH_MAX];
1762 char archive_log_prefix[2 * PATH_MAX];
1763 int archive_log_prefix_len;
1767 logwr_Gl.hdr.mark_will_del =
true;
1768 logwr_flush_header_page ();
1775 bg_arv_info = &logwr_Gl.bg_archive_info;
1789 archive_log_prefix_len =
strlen (archive_log_prefix);
1790 dirp = opendir (logwr_Gl.log_path);
1800 dp = readdir (dirp);
1810 if (strncmp (archive_log_prefix, dp->d_name, archive_log_prefix_len) == 0)
1832 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 1834 logwr_load_tde (
void)
1838 char sock_path[PATH_MAX];
1844 struct sockaddr_un clientaddr;
1846 fileio_make_ha_sock_name (sock_path, logwr_Gl.log_path, TDE_HA_SOCK_NAME);
1848 client_sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
1849 if (client_sockfd == -1)
1852 return ER_TDE_DK_SHARING_SOCK_OPEN;
1855 bzero (&clientaddr,
sizeof (clientaddr));
1856 clientaddr.sun_family = AF_UNIX;
1857 strcpy (clientaddr.sun_path, sock_path);
1858 client_len =
sizeof (clientaddr);
1860 if (connect (client_sockfd, (
struct sockaddr *) &clientaddr, client_len) < 0)
1863 return ER_TDE_DK_SHARING_SOCK_CONNECT;
1866 bufptr = logwr_Gl.log_path;
1870 nbytes = write (client_sockfd, bufptr, len);
1880 close (client_sockfd);
1882 return ER_TDE_DK_SHARING_SOCK_WRITE;
1891 bufptr = (
char *) &err_msg;
1892 len =
sizeof (err_msg);
1895 nbytes = read (client_sockfd, bufptr, len);
1905 close (client_sockfd);
1907 return ER_TDE_DK_SHARING_SOCK_READ;
1917 close (client_sockfd);
1923 bufptr = (
char *) &dks;
1927 nbytes = read (client_sockfd, bufptr, len);
1937 close (client_sockfd);
1939 return ER_TDE_DK_SHARING_SOCK_READ;
1953 close (client_sockfd);
1980 int error_code =
NO_ERROR, saved_checksum_crc32;
1981 const int block_size = 4096;
1983 const int sample_nbytes = 16;
1984 int sampling_offset;
1985 char buf[max_num_pages * sample_nbytes * 2];
1986 const int num_pages = LOG_PAGESIZE / block_size;
1987 const size_t sizeof_buf = num_pages * sample_nbytes * 2;
1994 if (saved_checksum_crc32 == 0)
2003 for (
int i = 0; i < num_pages; i++)
2006 sampling_offset = (i * block_size);
2007 memcpy (p, ((
char *) log_pgptr) + sampling_offset, sample_nbytes);
2011 sampling_offset = (i * block_size) + (block_size - sample_nbytes);
2012 memcpy (p, ((
char *) log_pgptr) + sampling_offset, sample_nbytes);
2016 crypt_crc32 ((
char *) buf, (
int) sizeof_buf, &checksum_crc32);
2021 if (checksum_crc32 != saved_checksum_crc32)
2024 "logwr_check_page_checksum: log page %lld has checksum = %d, computed checksum = %d\n",
2025 (
long long int) log_pgptr->
hdr.
logical_pageid, saved_checksum_crc32, checksum_crc32);
2050 return "SYNCHRONIZED";
2056 #ifdef UNSTABLE_TDE_FOR_REPLICATION_LOG 2058 logwr_get_tde_algorithm (
const LOG_PAGE * log_pgptr)
2099 #if defined(SERVER_MODE) 2101 int mode,
bool copy_from_first_phy_page);
2102 static bool logwr_unregister_writer_entry (
LOGWR_ENTRY * wr_entry,
int status);
2103 static int logwr_pack_log_pages (
THREAD_ENTRY * thread_p,
char *logpg_area,
int *logpg_used_size,
int *status,
2105 static void logwr_cs_exit (
THREAD_ENTRY * thread_p,
bool * check_cs_own);
2109 static void logwr_update_last_sent_eof_lsa (
LOGWR_ENTRY * entry);
2125 bool copy_from_first_phy_page)
2141 entry = entry->
next;
2181 *wr_entry_p = entry;
2197 logwr_unregister_writer_entry (
LOGWR_ENTRY * wr_entry,
int status)
2215 entry = entry->
next;
2218 is_all_done = (entry ==
NULL) ?
true :
false;
2226 if (entry == wr_entry)
2240 entry = entry->
next;
2263 logwr_pack_log_pages (
THREAD_ENTRY * thread_p,
char *logpg_area,
int *logpg_used_size,
int *status,
LOGWR_ENTRY * entry,
2264 bool copy_from_file)
2271 bool is_hdr_page_only;
2325 nxarv_phy_pageid = 1;
2326 nxarv_pageid = arvhdr.
fpageid;
2336 lpageid = nxarv_pageid;
2337 fpageid = nxarv_pageid;
2339 else if (!is_hdr_page_only)
2351 if (fpageid > nxio_lsa.
pageid)
2353 fpageid = nxio_lsa.
pageid;
2360 lpageid = eof_lsa.
pageid;
2374 if (fpageid == arvhdr.
fpageid)
2384 if (lpageid == eof_lsa.
pageid)
2395 num_logpgs = (is_hdr_page_only) ? 1 : (UINT64) ((lpageid - fpageid + 1) + 1);
2397 assert (lpageid >= fpageid);
2419 if (!is_hdr_page_only)
2421 for (pageid = fpageid; pageid >= 0 && pageid <= lpageid; pageid++)
2424 if (copy_from_file ==
true)
2441 if (pageid >= nxio_lsa.
pageid)
2456 *logpg_used_size = (int) (p - logpg_area);
2459 if (!is_hdr_page_only && (lpageid >= eof_lsa.
pageid))
2471 logwr_er_log (
"logwr_pack_log_pages, fpageid(%lld), lpageid(%lld), num_pages(%lld)," 2472 "\n status(%d)\n", fpageid, lpageid, num_logpgs, entry->
status);
2478 *logpg_used_size = 0;
2485 logwr_cs_exit (
THREAD_ENTRY * thread_p,
bool * check_cs_own)
2489 *check_cs_own =
false;
2501 INT64 saved_start_time;
2505 prev_status = entry->
status;
2508 if (entry !=
NULL && logwr_unregister_writer_entry (entry, status))
2512 assert (saved_start_time > 0);
2540 logwr_set_eof_lsa (thread_p, entry);
2550 logwr_update_last_sent_eof_lsa (
LOGWR_ENTRY * entry)
2575 int logpg_used_size;
2583 bool check_cs_own =
false;
2584 bool is_interrupted =
false;
2585 bool copy_from_file =
false;
2586 bool need_cs_exit_after_send =
true;
2589 bool copy_from_first_phy_page =
false;
2591 logpg_used_size = 0;
2593 if (logpg_area ==
NULL)
2598 if (thread_p->conn_entry)
2607 copy_from_first_phy_page =
true;
2611 copy_from_first_phy_page =
false;
2613 mode = (
LOGWR_MODE) (mode & ~LOGWR_COPY_FROM_FIRST_PHY_PAGE_MASK);
2616 orig_mode = MAX (mode, orig_mode);
2618 logwr_er_log (
"[tid:%ld] xlogwr_get_log_pages, fpageid(%lld), mode(%s)\n",
2619 thread_p->get_posix_id (), first_pageid,
2624 error_code = logwr_register_writer_entry (&entry, thread_p, first_pageid, mode, copy_from_first_phy_page);
2634 bool continue_checking =
true;
2639 to.tv_sec = time (
NULL) + timeout;
2645 to.tv_sec = to.tv_nsec = 0;
2658 pthread_cond_signal (&writer_info->flush_end_cond);
2686 if (logwr_is_delayed (thread_p, entry))
2688 is_interrupted =
true;
2709 check_cs_own =
true;
2714 logwr_set_eof_lsa (thread_p, entry);
2715 is_interrupted =
true;
2718 copy_from_file = (is_interrupted) ?
true :
false;
2720 error_code = logwr_pack_log_pages (thread_p, logpg_area, &logpg_used_size, &status, entry, copy_from_file);
2735 rv = pthread_cond_wait (&writer_info->flush_wait_cond, &writer_info->flush_wait_mutex);
2757 logwr_cs_exit (thread_p, &check_cs_own);
2758 logwr_write_end (thread_p, writer_info, entry, status);
2759 need_cs_exit_after_send =
false;
2770 if (need_cs_exit_after_send ==
true)
2786 logwr_update_last_sent_eof_lsa (entry);
2789 if (need_cs_exit_after_send)
2791 logwr_cs_exit (thread_p, &check_cs_own);
2792 logwr_write_end (thread_p, writer_info, entry, status);
2796 first_pageid = next_fpageid;
2798 need_cs_exit_after_send =
true;
2800 if (mode & LOGWR_COPY_FROM_FIRST_PHY_PAGE_MASK)
2819 logwr_er_log (
"[tid:%ld] xlogwr_get_log_pages, error(%d)\n", thread_p->get_posix_id (), error_code);
2821 logwr_cs_exit (thread_p, &check_cs_own);
2822 logwr_write_end (thread_p, writer_info, entry, status);
2837 logwr_get_min_copied_fpageid (
void)
2841 int num_entries = 0;
2850 if (min_fpageid > entry->
fpageid)
2854 entry = entry->
next;
2865 return (min_fpageid);
#define difftime64(time1, time2)
void net_client_logwr_send_end_msg(int rc, int error)
pthread_mutex_t flush_end_mutex
LOG_LSA tmp_last_sent_eof_lsa
#define ER_CSS_PTHREAD_COND_TIMEDOUT
void LSA_COPY(log_lsa *plsa1, const log_lsa *plsa2)
#define LOG_IS_PAGE_TDE_ENCRYPTED(log_page_p)
void * fileio_read(THREAD_ENTRY *thread_p, int vol_fd, void *io_page_p, PAGEID page_id, size_t page_size)
#define ER_LOG_PAGE_CORRUPTED
int logpb_set_page_checksum(THREAD_ENTRY *thread_p, LOG_PAGE *log_pgptr)
INT64 last_writer_elapsed_time
LOG_PAGEID logpb_find_oldest_available_page_id(THREAD_ENTRY *thread_p)
void fileio_dismount_without_fsync(THREAD_ENTRY *thread_p, int vol_fd)
static bool logwr_need_shutdown
#define LOG_HDRPAGE_FLAG_ENCRYPTED_MASK
#define CUBRID_MAGIC_LOG_ARCHIVE
struct tde_data_key_set TDE_DATA_KEY_SET
#define pthread_mutex_unlock(a)
CLIENTIDS last_writer_client_info
bool dwb_is_created(void)
int hb_deregister_from_master(void)
void fileio_unformat(THREAD_ENTRY *thread_p, const char *vol_label_p)
const LOG_PAGEID LOGPB_HEADER_PAGE_ID
int xlog_send_log_pages_to_client(THREAD_ENTRY *thread_p, char *logpg_area, int area_size, LOGWR_MODE mode)
#define assert_release(e)
pthread_mutex_t wr_list_mutex
INT64 log_get_clock_msec(void)
void LOG_CS_ENTER(THREAD_ENTRY *thread_p)
#define logwr_er_log(...)
#define ER_CSS_PTHREAD_MUTEX_LOCK
const VOLID LOG_DBLOG_ACTIVE_VOLID
#define OR_ALIGNED_BUF(size)
LOG_PHY_PAGEID logwr_to_physical_pageid(LOG_PAGEID logical_pageid)
void fileio_make_log_archive_temp_name(char *log_archive_temp_name_p, const char *log_path_p, const char *db_name_p)
LOG_PAGE * logpb_fetch_from_archive(THREAD_ENTRY *thread_p, LOG_PAGEID pageid, LOG_PAGE *log_pgptr, int *ret_arv_num, LOG_ARV_HEADER *arv_hdr, bool is_fatal)
int logwr_get_log_pages(LOGWR_CONTEXT *ctx_ptr)
bool fileio_is_volume_exist(const char *vol_label_p)
#define OR_ALIGNED_BUF_SIZE(abuf)
int xlog_get_page_request_with_reply(THREAD_ENTRY *thread_p, LOG_PAGEID *fpageid_ptr, LOGWR_MODE *mode_ptr, int timeout)
#define PTR_ALIGN(addr, boundary)
void crypt_crc32(const char *src, int src_len, int *dest)
void _er_log_debug(const char *file_name, const int line_no, const char *fmt,...)
const char * fileio_rename(VOLID vol_id, const char *old_label_p, const char *new_label_p)
#define ER_CSS_PTHREAD_COND_WAIT
int fileio_mount(THREAD_ENTRY *thread_p, const char *db_full_name_p, const char *vol_label_p, VOLID vol_id, int lock_wait, bool is_do_sync)
void * fileio_read_pages(THREAD_ENTRY *thread_p, int vol_fd, char *io_pages_p, PAGEID page_id, int num_pages, size_t page_size)
#define ER_IO_WRITE_OUT_OF_SPACE
#define LOGWR_THREAD_SUSPEND_TIMEOUT
#define OR_ALIGNED_BUF_START(abuf)
int logtb_get_client_ids(int tran_index, CLIENTIDS *client_info)
char * or_pack_int64(char *ptr, INT64 number)
#define ER_IO_FORMAT_FAIL
#define MSGCAT_LOG_LOGINFO_ARCHIVE
void er_set(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
const VOLID LOG_DBLOG_ARCHIVE_VOLID
int logpb_copy_page_from_file(THREAD_ENTRY *thread_p, LOG_PAGEID pageid, LOG_PAGE *log_pgptr)
int prm_get_integer_value(PARAM_ID prm_id)
#define TDE_DATA_KEY_LENGTH
#define CUBRID_MAGIC_MAX_LENGTH
#define ER_OUT_OF_VIRTUAL_MEMORY
void LOG_CS_EXIT(THREAD_ENTRY *thread_p)
LOG_PAGEID current_page_id
enum logwr_status LOGWR_STATUS
#define ER_LOG_WRITE_OUT_OF_SPACE
const char * css_ha_server_state_string(HA_SERVER_STATE state)
#define ER_HA_GENERIC_ERROR
#define ER_HB_PROCESS_EVENT
LOG_PAGEID last_sync_pageid
LOG_PAGEID logical_pageid
#define LOGWR_COPY_LOG_BUFFER_NPAGES
static enum scanner_mode mode
unsigned char perm_key[TDE_DATA_KEY_LENGTH]
#define LOGWR_COPY_FROM_FIRST_PHY_PAGE_MASK
void fileio_dismount(THREAD_ENTRY *thread_p, int vol_fd)
LOGWR_ENTRY * writer_list
#define ER_HA_LW_STOPPED_BY_SIGNAL
#define LOG_HDRPAGE_FLAG_ENCRYPTED_AES
bool copy_from_first_phy_page
bool LSA_ISNULL(const log_lsa *lsa_ptr)
#define LOG_HDRPAGE_FLAG_ENCRYPTED_ARIA
#define db_private_free_and_init(thrd, ptr)
pthread_cond_t flush_end_cond
const char * get_buffer() const
#define db_private_alloc(thrd, size)
void er_set_with_oserror(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
#define MSGCAT_CATALOG_CUBRID
void fileio_make_log_active_name(char *log_active_name_p, const char *log_path_p, const char *db_name_p)
void fileio_make_log_info_name(char *log_info_name_p, const char *log_path_p, const char *db_name_p)
bool LSA_GE(const log_lsa *plsa1, const log_lsa *plsa2)
void fileio_make_log_archive_name(char *log_archive_name_p, const char *log_path_p, const char *db_name_p, int archive_number)
bool logtb_is_interrupted(THREAD_ENTRY *thread_p, bool clear, bool *continue_checking)
static void error(const char *msg)
#define FILEIO_SUFFIX_LOGARCHIVE
void * fileio_write(THREAD_ENTRY *thread_p, int vol_fd, void *io_page_p, PAGEID page_id, size_t page_size, FILEIO_WRITE_MODE write_mode)
char * or_pack_int(char *ptr, int number)
#define LOG_FIND_THREAD_TRAN_INDEX(thrd)
const char * logwr_log_ha_filestat_to_string(enum LOG_HA_FILESTAT val)
unsigned char temp_key[TDE_DATA_KEY_LENGTH]
int fileio_synchronize(THREAD_ENTRY *thread_p, int vol_fd, const char *vlabel, FILEIO_SYNC_OPTION sync_dwb)
#define ER_LOG_CREATE_LOGARCHIVE_FAIL
#define free_and_init(ptr)
void LSA_SET_NULL(log_lsa *lsa_ptr)
static int logwr_check_page_checksum(THREAD_ENTRY *thread_p, LOG_PAGE *log_pgptr)
#define ER_HA_LW_FAILED_GET_LOG_PAGE
int thread_suspend_with_other_mutex(cubthread::entry *thread_p, pthread_mutex_t *mutex_p, int timeout, struct timespec *to, thread_resume_suspend_status suspended_reason)
const VOLID LOG_DBLOG_BG_ARCHIVE_VOLID
#define ER_LOG_ARCHIVE_CREATED
bool logpb_is_page_in_archive(LOG_PAGEID pageid)
bool prm_get_bool_value(PARAM_ID prm_id)
#define ER_CSS_PTHREAD_MUTEX_UNLOCK
#define ER_LOG_INCOMPATIBLE_DATABASE
HA_SERVER_STATE css_ha_server_state(void)
enum logwr_mode LOGWR_MODE
int fileio_format(THREAD_ENTRY *thread_p, const char *db_full_name_p, const char *vol_label_p, VOLID vol_id, DKNPAGES npages, bool is_sweep_clean, bool is_do_lock, bool is_do_sync, size_t page_size, int kbytes_to_be_written_per_sec, bool reuse_file)
LOG_LSA last_sent_eof_lsa
char * msgcat_message(int cat_id, int set_id, int msg_id)
#define ER_LOG_DOESNT_CORRESPOND_TO_DATABASE
TDE_DATA_KEY_SET data_keys
bool logwr_force_shutdown(void)
void * fileio_write_pages(THREAD_ENTRY *thread_p, int vol_fd, char *io_pages_p, PAGEID page_id, int num_pages, size_t page_size, FILEIO_WRITE_MODE write_mode)
#define ER_DISK_INCONSISTENT_VOL_HEADER
int log_dump_log_info(const char *logname_info, bool also_stdout, const char *fmt,...)
#define FILEIO_PATH_SEPARATOR(path)
#define pthread_mutex_lock(a)
LOG_LSA get_nxio_lsa() const
int net_client_check_log_header(LOGWR_CONTEXT *ctx_ptr, char *argbuf, int argsize, char *replybuf, int replysize, char **logpg_area_buf, bool verbose)
unsigned char log_key[TDE_DATA_KEY_LENGTH]
enum ha_server_state HA_SERVER_STATE
static int prev_ha_server_state
#define ER_TDE_CIPHER_LOAD_FAIL
void la_print_log_header(const char *database_name, LOG_HEADER *hdr, bool verbose)
int logpb_copy_page_from_log_buffer(THREAD_ENTRY *thread_p, LOG_PAGEID pageid, LOG_PAGE *log_pgptr)
int logwr_copy_log_file(const char *db_name, const char *log_path, int mode, INT64 start_page_id)
const size_t LOGPB_IO_NPAGES
bool fileio_is_formatted_page(THREAD_ENTRY *thread_p, const char *io_page)
SIGNAL_HANDLER_FUNCTION os_set_signal_handler(const int sig_no, SIGNAL_HANDLER_FUNCTION sig_handler)
int tde_encrypt_log_page(const LOG_PAGE *logpage_plain, TDE_ALGORITHM tde_algo, LOG_PAGE *logpage_cipher)