42 #include "cci_applier.h" 50 #define SL_LOG_FILE_MAX_SIZE \ 51 (prm_get_integer_value (PRM_ID_HA_SQL_LOG_MAX_SIZE_IN_MB) * 1024 * 1024) 52 #define FILE_ID_FORMAT "%d" 53 #define SQL_ID_FORMAT "%010u" 54 #define CATALOG_FORMAT FILE_ID_FORMAT " | " SQL_ID_FORMAT 89 if (len < 2 || str[0] !=
'\'' || str[len - 1] !=
'\'')
100 strbuf (
"SELECT * FROM [%s] WHERE ",
sm_ch_name ((
MOBJ) sm_class));
141 FILE *read_catalog_fp;
146 if (read_catalog_fp ==
NULL)
151 if (fgets (info, LINE_MAX, read_catalog_fp) ==
NULL)
153 if (read_catalog_fp !=
NULL)
155 fclose (read_catalog_fp);
162 fclose (read_catalog_fp);
166 fclose (read_catalog_fp);
173 char tmp_log_path[PATH_MAX];
174 char basename_buf[PATH_MAX];
176 memset (&sl_Info, 0,
sizeof (sl_Info));
178 snprintf (tmp_log_path, PATH_MAX,
"%s/sql_log/", repl_log_path);
181 strcpy (basename_buf, repl_log_path);
183 snprintf (
sql_catalog_path, PATH_MAX,
"%s/%s_applylogdb.sql.info", repl_log_path, db_name);
239 if (num_assignments > 0)
241 strbuf (
"\"%s\"", assignments[0]->att->header.name);
243 for (
int i = 1;
i < num_assignments;
i++)
245 strbuf (
", \"%s\"", assignments[
i]->att->header.name);
254 if (num_assignments > 0)
259 for (
int i = 1;
i < num_assignments;
i++)
273 int prev_i_index = 0;
274 char *prev_i_ptr =
NULL;
285 strbuf (
"\"%s\"=", attributes[
i]->header.name);
295 for (
int i = 0;
i < num_assignments;
i++)
297 if (!strcmp (att_name, assignments[
i]->att->header.name))
323 for (
int i = 0;
i < num_assignments;
i++)
325 strbuf (
"\"%s\"=", assignments[
i]->att->header.name);
327 if (
i != num_assignments - 1)
341 insert_strbuf (
") VALUES (");
343 insert_strbuf (
");");
372 update_strbuf (
" WHERE ");
394 if (cur_value ==
NULL || incr_value ==
NULL)
415 alter_strbuf (
"ALTER SERIAL [%s] START WITH %s;", serial_name,
427 delete_strbuf (
"DELETE FROM [%s] WHERE ", class_name);
448 char default_ha_prm[LINE_MAX];
452 statement_strbuf (
"%s;", stmt_text);
454 if (ha_sys_prm !=
NULL)
464 setprm_strbuf (
"%s SET SYSTEM PARAMETERS '%s';", CA_MARK_TRAN_START, ha_sys_prm);
475 setprm_strbuf.
clear ();
476 setprm_strbuf (
"%s SET SYSTEM PARAMETERS '%s';", CA_MARK_TRAN_END, default_ha_prm);
494 statement_strbuf.
clear ();
495 statement_strbuf (
"GRANT ALL PRIVILEGES ON %s TO %s;", class_name, db_user);
522 curr_time = time (
NULL);
523 strftime (time_buf,
sizeof (time_buf),
"%Y-%m-%d %H:%M:%S", localtime (&curr_time));
527 (select ==
NULL) ? 0 : select->
len (), query.
len ());
546 fseek (
log_fp, 0, SEEK_END);
558 char cur_sql_log_path[PATH_MAX];
567 fp = fopen (cur_sql_log_path,
"r+");
570 fseek (fp, 0, SEEK_END);
578 fp = fopen (cur_sql_log_path,
"w");
593 char new_file_path[PATH_MAX];
605 new_fp = fopen (new_file_path,
"w");
619 char *
p, path[PATH_MAX];
626 strcpy (path, new_dir);
642 if (access (path, F_OK) < 0)
644 if (mkdir (path, 0777) < 0)
static void sl_print_insert_att_values(string_buffer &strbuf, OBJ_TEMPASSIGN **assignments, int num_assignments)
static int sl_print_pk(string_buffer &strbuf, SM_CLASS *sm_class, DB_VALUE *key)
DB_MIDXKEY * db_get_midxkey(const DB_VALUE *value)
static int sl_write_sql(string_buffer &query, string_buffer *select)
int sl_init(const char *db_name, const char *repl_log_path)
SM_CLASS_CONSTRAINT * classobj_find_class_primary_key(SM_CLASS *class_)
static int create_dir(const char *new_dir)
int sysprm_set_error(SYSPRM_ERR rc, const char *data)
int sl_write_update_sql(DB_OTMPL *inst_tp, DB_VALUE *key)
OBJ_TEMPASSIGN ** assignments
static FILE * sl_open_next_file(FILE *old_fp)
static int sl_write_catalog(void)
#define er_log_debug(...)
static void sl_print_att_value(string_buffer &strbuf, const char *att_name, OBJ_TEMPASSIGN **assignments, int num_assignments)
static void sl_print_update_att_set(string_buffer &strbuf, OBJ_TEMPASSIGN **assignments, int num_assignments)
int sl_write_statement_sql(char *class_name, char *db_user, int item_type, const char *stmt_text, char *ha_sys_prm)
#define SL_LOG_FILE_MAX_SIZE
void describe_value(const db_value *value)
static DB_VALUE * sl_find_att_value(const char *att_name, OBJ_TEMPASSIGN **assignments, int num_assignments)
const char * sm_ch_name(const MOBJ clobj)
int sl_write_insert_sql(DB_OTMPL *inst_tp, DB_VALUE *key)
static int sl_print_select(string_buffer &strbuf, SM_CLASS *sm_class, DB_VALUE *key)
#define NUMERIC_MAX_STRING_SIZE
int sl_write_delete_sql(char *class_name, MOBJ mclass, DB_VALUE *key)
const char * get_buffer() const
int pr_midxkey_get_element_nocopy(const DB_MIDXKEY *midxkey, int index, DB_VALUE *value, int *prev_indexp, char **prev_ptrp)
static char sql_log_base_path[PATH_MAX]
static char sql_catalog_path[PATH_MAX]
SYSPRM_ERR sysprm_make_default_values(const char *data, char *default_val_buf, const int buf_size)
static void error(const char *msg)
char * numeric_db_value_print(const DB_VALUE *val, char *buf)
unsigned int last_inserted_sql_id
static FILE * sl_log_open(void)
static void sl_print_insert_att_names(string_buffer &strbuf, OBJ_TEMPASSIGN **assignments, int num_assignments)
static void sl_print_midxkey(string_buffer &strbuf, SM_ATTRIBUTE **attributes, const DB_MIDXKEY *midxkey)
SM_ATTRIBUTE ** attributes
char * basename(const char *path)
#define DB_VALUE_TYPE(value)
int numeric_db_value_add(const DB_VALUE *dbv1, const DB_VALUE *dbv2, DB_VALUE *answer)
static char * trim_single_quote(char *str, size_t len)
static int sl_read_catalog(void)