File log_applier_sql_log.c¶
File List > cubrid > src > transaction > log_applier_sql_log.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* log_applier_sql_log.c : SQL logging module for log applier
*/
#ident "$Id$"
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include <libgen.h>
#include <errno.h>
#include <unistd.h>
#include "log_applier_sql_log.h"
#include "system_parameter.h"
#include "object_primitive.h"
#include "object_template.h"
#include "object_print.h"
#include "error_manager.h"
#include "parser.h"
#include "work_space.h"
#include "class_object.h"
#include "environment_variable.h"
#include "set_object.h"
#include "schema_manager.h"
#include "dbtype.h"
#include "file_io.h"
#include "schema_system_catalog_constants.h"
#include "db_value_printer.hpp"
#include "mem_block.hpp"
#include "string_buffer.hpp"
#define SL_LOG_FILE_MAX_SIZE \
(prm_get_integer_value (PRM_ID_HA_SQL_LOG_MAX_SIZE_IN_MB) * 1024 * 1024)
#define FILE_ID_FORMAT "%u"
#define SQL_ID_FORMAT "%010u"
#define CATALOG_FORMAT FILE_ID_FORMAT " | " SQL_ID_FORMAT
typedef struct sl_info SL_INFO;
struct sl_info
{
unsigned int curr_file_id;
unsigned int last_inserted_sql_id;
};
SL_INFO sl_Info;
static FILE *log_fp;
static FILE *catalog_fp;
static unsigned int sql_log_max_cnt = 0;
static char sql_log_base_path[PATH_MAX];
static char sql_catalog_path[PATH_MAX];
static int sl_write_sql (string_buffer & query, string_buffer * select);
static void sl_print_insert_att_names (string_buffer & strbuf, OBJ_TEMPASSIGN ** assignments, int num_assignments);
static void sl_print_insert_att_values (string_buffer & strbuf, OBJ_TEMPASSIGN ** assignments, int num_assignments);
static int sl_print_pk (string_buffer & strbuf, SM_CLASS * sm_class, DB_VALUE * key);
static void sl_print_midxkey (string_buffer & strbuf, SM_ATTRIBUTE ** attributes, const DB_MIDXKEY * midxkey);
static void sl_print_update_att_set (string_buffer & strbuf, OBJ_TEMPASSIGN ** assignments, int num_assignments);
static void sl_print_att_value (string_buffer & strbuf, const char *att_name, OBJ_TEMPASSIGN ** assignments,
int num_assignments);
static DB_VALUE *sl_find_att_value (const char *att_name, OBJ_TEMPASSIGN ** assignments, int num_assignments);
static FILE *sl_open_next_file (FILE * old_fp);
static FILE *sl_log_open (void);
static void sl_delete_oldest_file_if_needed (void);
static int sl_read_catalog (void);
static int sl_write_catalog (void);
static int sl_create_sql_log_dir (const char *repl_log_path, char *path_buf, int path_buf_size);
static char *
trim_single_quote (char *str, size_t len)
{
if (len < 2 || str[0] != '\'' || str[len - 1] != '\'')
{
return str;
}
str[len - 1] = '\0';
return str + 1;
}
static int
sl_print_select (string_buffer & strbuf, SM_CLASS * sm_class, DB_VALUE * key)
{
strbuf ("SELECT * FROM [%s] WHERE ", sm_ch_name ((MOBJ) sm_class));
if (sl_print_pk (strbuf, sm_class, key) != NO_ERROR)
{
return ER_FAILED;
}
strbuf (";");
return NO_ERROR;
}
static int
sl_write_catalog (void)
{
if (catalog_fp == NULL)
{
if ((catalog_fp = fopen (sql_catalog_path, "r+")) == NULL)
{
catalog_fp = fopen (sql_catalog_path, "w");
}
}
if (catalog_fp == NULL)
{
er_log_debug (ARG_FILE_LINE, "Cannot open SQL catalog file: %s", strerror (errno));
return ER_FAILED;
}
fseek (catalog_fp, 0, SEEK_SET);
fprintf (catalog_fp, CATALOG_FORMAT, sl_Info.curr_file_id, sl_Info.last_inserted_sql_id);
fflush (catalog_fp);
fsync (fileno (catalog_fp));
return NO_ERROR;
}
static int
sl_read_catalog (void)
{
FILE *read_catalog_fp;
char info[LINE_MAX];
read_catalog_fp = fopen (sql_catalog_path, "r");
if (read_catalog_fp == NULL)
{
return sl_write_catalog ();
}
if (fgets (info, LINE_MAX, read_catalog_fp) == NULL)
{
if (read_catalog_fp != NULL)
{
fclose (read_catalog_fp);
}
return ER_FAILED;
}
if (sscanf (info, CATALOG_FORMAT, &sl_Info.curr_file_id, &sl_Info.last_inserted_sql_id) != 2)
{
fclose (read_catalog_fp);
return ER_FAILED;
}
fclose (read_catalog_fp);
return NO_ERROR;
}
int
sl_init (const char *db_name, const char *repl_log_path)
{
char sql_log_path[PATH_MAX];
if (sl_create_sql_log_dir (repl_log_path, sql_log_path, sizeof (sql_log_path)) != NO_ERROR)
{
return ER_FAILED;
}
if (snprintf (sql_log_base_path, PATH_MAX, "%s/%s.sql.log", sql_log_path, basename ((char *) repl_log_path)) >=
PATH_MAX)
{
assert_release (false);
sql_log_base_path[PATH_MAX - 1] = '\0';
}
snprintf (sql_catalog_path, PATH_MAX, "%s/%s_applylogdb.sql.info", dirname (sql_log_path), db_name);
memset (&sl_Info, 0, sizeof (sl_Info));
sl_Info.curr_file_id = 0;
sl_Info.last_inserted_sql_id = 0;
if (log_fp != NULL)
{
fclose (log_fp);
log_fp = NULL;
}
if (catalog_fp != NULL)
{
fclose (catalog_fp);
catalog_fp = NULL;
}
if (sl_read_catalog () != NO_ERROR)
{
return ER_FAILED;
}
assert (prm_get_integer_value (PRM_ID_HA_SQL_LOG_MAX_COUNT) >= 0);
sql_log_max_cnt = (unsigned int) prm_get_integer_value (PRM_ID_HA_SQL_LOG_MAX_COUNT);
return NO_ERROR;
}
static int
sl_print_pk (string_buffer & strbuf, SM_CLASS * sm_class, DB_VALUE * key)
{
DB_MIDXKEY *midxkey;
SM_ATTRIBUTE *pk_att;
SM_CLASS_CONSTRAINT *pk_cons = classobj_find_class_primary_key (sm_class);
if (pk_cons == NULL || pk_cons->attributes == NULL || pk_cons->attributes[0] == NULL)
{
return ER_FAILED;
}
if (DB_VALUE_TYPE (key) == DB_TYPE_MIDXKEY)
{
midxkey = db_get_midxkey (key);
sl_print_midxkey (strbuf, pk_cons->attributes, midxkey);
}
else
{
pk_att = pk_cons->attributes[0];
strbuf ("\"%s\"=", pk_att->header.name);
db_value_printer printer (strbuf);
printer.describe_value (key);
}
return NO_ERROR;
}
static void
sl_print_insert_att_names (string_buffer & strbuf, OBJ_TEMPASSIGN ** assignments, int num_assignments)
{
if (num_assignments > 0)
{
strbuf ("\"%s\"", assignments[0]->att->header.name);
}
for (int i = 1; i < num_assignments; i++)
{
strbuf (", \"%s\"", assignments[i]->att->header.name);
}
}
static void
sl_print_insert_att_values (string_buffer & strbuf, OBJ_TEMPASSIGN ** assignments, int num_assignments)
{
db_value_printer printer (strbuf);
if (num_assignments > 0)
{
printer.describe_value (assignments[0]->variable);
}
for (int i = 1; i < num_assignments; i++)
{
strbuf += ',';
printer.describe_value (assignments[i]->variable);
}
}
/*
* sl_print_sql_midxkey - print midxkey in the following format.
* key1=value1 AND key2=value2 AND ...
*/
static void
sl_print_midxkey (string_buffer & strbuf, SM_ATTRIBUTE ** attributes, const DB_MIDXKEY * midxkey)
{
int prev_i_index = 0;
char *prev_i_ptr = NULL;
DB_VALUE value;
for (int i = 0; i < midxkey->ncolumns && attributes[i] != NULL; i++)
{
if (i > 0)
{
strbuf (" AND ");
}
pr_midxkey_get_element_nocopy (midxkey, i, &value, &prev_i_index, &prev_i_ptr);
strbuf ("\"%s\"=", attributes[i]->header.name);
db_value_printer printer (strbuf);
printer.describe_value (&value);
}
}
static DB_VALUE *
sl_find_att_value (const char *att_name, OBJ_TEMPASSIGN ** assignments, int num_assignments)
{
for (int i = 0; i < num_assignments; i++)
{
if (!strcmp (att_name, assignments[i]->att->header.name))
{
return assignments[i]->variable;
}
}
return NULL;
}
static void
sl_print_att_value (string_buffer & strbuf, const char *att_name, OBJ_TEMPASSIGN ** assignments, int num_assignments)
{
DB_VALUE *val = sl_find_att_value (att_name, assignments, num_assignments);
if (val != NULL)
{
db_value_printer printer (strbuf);
printer.describe_value (val);
}
}
static void
sl_print_update_att_set (string_buffer & strbuf, OBJ_TEMPASSIGN ** assignments, int num_assignments)
{
db_value_printer printer (strbuf);
for (int i = 0; i < num_assignments; i++)
{
strbuf ("\"%s\"=", assignments[i]->att->header.name);
printer.describe_value (assignments[i]->variable);
if (i != num_assignments - 1)
{
strbuf (", ");
}
}
}
int
sl_write_insert_sql (DB_OTMPL * inst_tp, DB_VALUE * key)
{
string_buffer insert_strbuf;
insert_strbuf ("INSERT INTO [%s](", sm_ch_name ((MOBJ) (inst_tp->class_)));
sl_print_insert_att_names (insert_strbuf, inst_tp->assignments, inst_tp->nassigns);
insert_strbuf (") VALUES (");
sl_print_insert_att_values (insert_strbuf, inst_tp->assignments, inst_tp->nassigns);
insert_strbuf (");");
string_buffer select_strbuf;
if (sl_print_select (select_strbuf, inst_tp->class_, key) != NO_ERROR)
{
return ER_FAILED;
}
if (sl_write_sql (insert_strbuf, &select_strbuf) != NO_ERROR)
{
return ER_FAILED;
}
return NO_ERROR;
}
int
sl_write_update_sql (DB_OTMPL * inst_tp, DB_VALUE * key)
{
int result;
if (strcmp (sm_ch_name ((MOBJ) (inst_tp->class_)), CT_SERIAL_NAME) != 0)
{
/* ordinary tables */
string_buffer update_strbuf;
update_strbuf ("UPDATE [%s] SET ", sm_ch_name ((MOBJ) (inst_tp->class_)));
sl_print_update_att_set (update_strbuf, inst_tp->assignments, inst_tp->nassigns);
update_strbuf (" WHERE ");
if (sl_print_pk (update_strbuf, inst_tp->class_, key) != NO_ERROR)
{
return ER_FAILED;
}
update_strbuf (";");
string_buffer select_strbuf;
if (sl_print_select (select_strbuf, inst_tp->class_, key) != NO_ERROR)
{
return ER_FAILED;
}
return sl_write_sql (update_strbuf, &select_strbuf);
}
else
{
/* _db_serial */
DB_VALUE *cur_value = sl_find_att_value ("current_val", inst_tp->assignments, inst_tp->nassigns);
DB_VALUE *incr_value = sl_find_att_value ("increment_val", inst_tp->assignments, inst_tp->nassigns);
if (cur_value == NULL || incr_value == NULL)
{
return ER_FAILED;
}
DB_VALUE next_value;
result = numeric_db_value_add (cur_value, incr_value, &next_value);
if (result != NO_ERROR)
{
return ER_FAILED;
}
string_buffer serial_name_strbuf;
sl_print_att_value (serial_name_strbuf, SERIAL_ATTR_UNIQUE_NAME, inst_tp->assignments, inst_tp->nassigns);
char *serial_name = trim_single_quote ((char *) serial_name_strbuf.get_buffer (), serial_name_strbuf.len ());
string_buffer alter_strbuf;
char str_next_value[NUMERIC_MAX_STRING_SIZE];
alter_strbuf ("ALTER SERIAL [%s] START WITH %s;", serial_name,
numeric_db_value_print (&next_value, str_next_value));
return sl_write_sql (alter_strbuf, NULL);
}
}
int
sl_write_delete_sql (char *class_name, MOBJ mclass, DB_VALUE * key)
{
string_buffer delete_strbuf;
delete_strbuf ("DELETE FROM [%s] WHERE ", class_name);
if (sl_print_pk (delete_strbuf, (SM_CLASS *) mclass, key) != NO_ERROR)
{
return ER_FAILED;
}
delete_strbuf (";");
string_buffer select_strbuf;
if (sl_print_select (select_strbuf, (SM_CLASS *) mclass, key) != NO_ERROR)
{
return ER_FAILED;
}
return sl_write_sql (delete_strbuf, &select_strbuf);
}
int
sl_write_statement_sql (char *class_name, char *db_user, int item_type, const char *stmt_text, char *ha_sys_prm)
{
int error = NO_ERROR;
char default_ha_prm[LINE_MAX];
SYSPRM_ERR rc;
string_buffer statement_strbuf;
statement_strbuf ("%s;", stmt_text);
if (ha_sys_prm != NULL)
{
rc = sysprm_make_default_values (ha_sys_prm, default_ha_prm, sizeof (default_ha_prm));
if (rc != PRM_ERR_NO_ERROR)
{
return sysprm_set_error (rc, ha_sys_prm);
}
string_buffer setprm_strbuf;
setprm_strbuf ("%s SET SYSTEM PARAMETERS '%s';", CA_MARK_TRAN_START, ha_sys_prm); //set param
if (sl_write_sql (setprm_strbuf, NULL) != NO_ERROR)
{
return ER_FAILED;
}
if (sl_write_sql (statement_strbuf, NULL) != NO_ERROR)
{
sl_write_sql (setprm_strbuf, NULL);
return ER_FAILED;
}
setprm_strbuf.clear ();
setprm_strbuf ("%s SET SYSTEM PARAMETERS '%s';", CA_MARK_TRAN_END, default_ha_prm); //restore param
if (sl_write_sql (setprm_strbuf, NULL) != NO_ERROR)
{
return ER_FAILED;
}
}
else
{
if (sl_write_sql (statement_strbuf, NULL) != NO_ERROR)
{
return ER_FAILED;
}
}
if (item_type == CUBRID_STMT_CREATE_CLASS)
{
if (db_user != NULL && strlen (db_user) > 0)
{
statement_strbuf.clear ();
statement_strbuf ("GRANT ALL PRIVILEGES ON %s TO %s;", class_name, db_user);
if (sl_write_sql (statement_strbuf, NULL) != NO_ERROR)
{
return ER_FAILED;
}
}
}
return NO_ERROR;
}
static int
sl_write_sql (string_buffer & query, string_buffer * select)
{
time_t curr_time;
char time_buf[20];
assert (query.get_buffer () != NULL);
if (log_fp == NULL)
{
if ((log_fp = sl_log_open ()) == NULL)
{
return ER_FAILED;
}
}
curr_time = time (NULL);
strftime (time_buf, sizeof (time_buf), "%Y-%m-%d %H:%M:%S", localtime (&curr_time));
/* -- datetime | sql_id | is_ddl | select length | query length */
fprintf (log_fp, "-- %s | %u | %zu | %zu\n", time_buf, ++sl_Info.last_inserted_sql_id,
(select == NULL) ? 0 : select->len (), query.len ());
/* print select for verifying data consistency */
if (select != NULL)
{
/* -- select_length select * from tbl_name */
fprintf (log_fp, "-- ");
fwrite (select->get_buffer (), sizeof (char), select->len (), log_fp);
fputc ('\n', log_fp);
}
/* print SQL query */
fwrite (query.get_buffer (), sizeof (char), query.len (), log_fp);
fputc ('\n', log_fp);
fflush (log_fp);
sl_write_catalog ();
fseek (log_fp, 0, SEEK_END);
if (ftell (log_fp) >= SL_LOG_FILE_MAX_SIZE)
{
log_fp = sl_open_next_file (log_fp);
sl_delete_oldest_file_if_needed ();
}
return NO_ERROR;
}
static FILE *
sl_log_open (void)
{
char cur_sql_log_path[PATH_MAX];
FILE *fp;
if (snprintf (cur_sql_log_path, PATH_MAX - 1, "%s.%u", sql_log_base_path, sl_Info.curr_file_id) < 0)
{
assert (false);
return NULL;
}
fp = fopen (cur_sql_log_path, "r+");
if (fp != NULL)
{
fseek (fp, 0, SEEK_END);
if (ftell (fp) >= SL_LOG_FILE_MAX_SIZE)
{
fp = sl_open_next_file (fp);
}
}
else
{
fp = fopen (cur_sql_log_path, "w");
}
if (fp == NULL)
{
er_log_debug (ARG_FILE_LINE, "Failed to open SQL log file (%s): %s", cur_sql_log_path, strerror (errno));
}
return fp;
}
static FILE *
sl_open_next_file (FILE * old_fp)
{
FILE *new_fp;
char new_file_path[PATH_MAX];
sl_Info.curr_file_id++;
sl_Info.last_inserted_sql_id = 0;
if (snprintf (new_file_path, PATH_MAX - 1, "%s.%u", sql_log_base_path, sl_Info.curr_file_id) < 0)
{
assert (false);
return NULL;
}
fclose (old_fp);
new_fp = fopen (new_file_path, "w");
if (sl_write_catalog () != NO_ERROR)
{
fclose (new_fp);
return NULL;
}
return new_fp;
}
/*
* sl_delete_oldest_file_if_needed() - Delete the oldest file only if the number of SQL log files exceeds the 'sql_log_max_cnt' value.
*
* Note:
* This function is related to the ha_sql_log_max_count system parameter.
* This system parameter can be set from 2 to 5 and only that number of sql log files are kept.
*/
static void
sl_delete_oldest_file_if_needed (void)
{
unsigned int oldest_file_id;
char oldest_file_path[PATH_MAX];
// step(1) : guess oldest file
if (sl_Info.curr_file_id < sql_log_max_cnt)
{
/*
* Cases :
* 1. 'curr_file_id' has never exceeded the maximum value of UINT_MAX, which means there is no oldest file to delete.
* 2. 'curr_file_id' exceeds UINT_MAX and wraps around to 0, which means there is oldest file (e.g. oldest file id == UINT_MAX) to delete.
*
* Instead of using a complicated process to decide between the two cases, always assume it’s case 2.
*/
oldest_file_id = UINT_MAX - sql_log_max_cnt + sl_Info.curr_file_id + 1;
}
else
{
oldest_file_id = sl_Info.curr_file_id - sql_log_max_cnt;
}
if (snprintf (oldest_file_path, PATH_MAX, "%s.%u", sql_log_base_path, oldest_file_id) >= PATH_MAX)
{
assert_release (false);
oldest_file_path[PATH_MAX - 1] = '\0';
}
// step(2) : delete the oldest file if it exists
unlink (oldest_file_path);
/*
* if (errno == EACCES), then this corresponds to case1 mentioned above.
* There isn't actually a file to delete, but it will attempt to delete the guessed 'oldest_file_path'.
* However, this situation is expected, and the unlink() function does not attempt to delete a file that does not exist,
* so even if the 'oldest_file_path' is guessed incorrectly, the issue is mitigated.
*/
}
/*
* sl_create_sql_log_dir() - verify and create the SQL log path
* return: NO_ERROR or ER_FAILED
* repl_log_path(in): log volume path for apply (default path)
* path_buf(out): SQL log path buffer
* path_buf_size(in): buffer size
*
* Note:
* This function is related to the ha_sql_log_path system parameter. The SQL log path can be changed by setting this.
*/
static int
sl_create_sql_log_dir (const char *repl_log_path, char *path_buf, int path_buf_size)
{
const char *log_path = NULL, *path_base_name = "sql_log";
char *p = NULL;
char tmp_log_path[PATH_MAX], er_msg[PATH_MAX + 60];
assert (repl_log_path != NULL && path_buf != NULL && path_buf_size >= PATH_MAX);
log_path = prm_get_string_value (PRM_ID_HA_SQL_LOG_PATH);
if (log_path != NULL && *log_path != '\0')
{
if (!IS_ABS_PATH (log_path))
{
snprintf (tmp_log_path, sizeof (tmp_log_path), "%s%s%s", repl_log_path, FILEIO_PATH_SEPARATOR (repl_log_path),
log_path);
log_path = tmp_log_path;
}
}
else
{
log_path = repl_log_path;
}
int n = snprintf (path_buf, path_buf_size, "%s%s%s", log_path, FILEIO_PATH_SEPARATOR (log_path), path_base_name);
if (n >= path_buf_size)
{
snprintf (er_msg, sizeof (er_msg), "Too long the SQL log path \'%s\'", log_path);
er_stack_push ();
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, er_msg);
er_stack_pop ();
return ER_FAILED;
}
p = path_buf;
if (*p == PATH_SEPARATOR)
{
p++;
}
while (p != NULL)
{
p = strchr (p, PATH_SEPARATOR);
if (p != NULL)
{
*p = '\0';
}
if (strcmp (basename (path_buf), ".") && strcmp (basename (path_buf), ".."))
{
if (access (path_buf, F_OK) < 0)
{
if (mkdir (path_buf, 0777) < 0)
{
snprintf (er_msg, sizeof (er_msg), "Failed to create SQL log directory \'%s\'", path_buf);
er_stack_push ();
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HA_GENERIC_ERROR, 1, er_msg);
er_stack_pop ();
return ER_FAILED;
}
}
}
if (p != NULL)
{
*p = PATH_SEPARATOR;
p++;
}
}
return NO_ERROR;
}