28 #if defined(SERVER_MODE) 56 #include <sys/types.h> 58 #if !defined(SERVER_MODE) 59 #define CSS_ENABLE_INTERRUPTS 67 char *(*sprintf_participant) (
void *particp_id);
69 int (*
send_prepare) (
int gtrid,
int num_particps,
void *block_particps_ids);
70 bool (*
send_commit) (
int gtrid,
int num_particps,
int *particp_indices,
void *block_particps_ids);
71 bool (*
send_abort) (
int gtrid,
int num_particps,
int *particp_indices,
void *block_particps_ids,
int collect);
88 int *ack_list,
int *ack_count);
93 LOG_LSA * log_lsa,
LOG_PAGE * log_page_p,
int **ack_list,
int *ack_count,
94 int *size_ack_list,
bool * search_2pc_prepare,
bool * search_2pc_start);
128 block_particps_ids =
NULL;
133 if (num_particps > 0)
135 *block_particps_ids = malloc (num_particps * *partid_len);
136 if (*block_particps_ids ==
NULL)
141 memcpy (*block_particps_ids, block, num_particps * *partid_len);
145 *block_particps_ids =
NULL;
221 return (*log_2pc_Userfun.
send_prepare) (gtrid, num_particps, block_particps_ids);
257 result = (*log_2pc_Userfun.
send_commit) (gtrid, num_particps, particps_indices, block_particps_ids);
301 result = (*log_2pc_Userfun.
send_abort) (gtrid, num_particps, particps_indices, block_particps_ids, collect);
313 #if defined (ENABLE_UNUSED_FUNCTION) 367 unsigned int unsig_gtrid;
376 ptr = (
unsigned char *) &value;
377 for (i = 0; i <
sizeof (value); i++)
379 hash = (hash << 5) - hash + *ptr++;
383 unsig_gtrid = unsig_gtrid + ((hash % UCHAR_MAX) & 0x7F);
386 value = (
unsigned int) getpid ();
388 ptr = (
unsigned char *) &value;
389 for (i = 0; i <
sizeof (value); i++)
391 hash = (hash << 5) - hash + *ptr++;
394 unsig_gtrid = (unsig_gtrid << 8) + (hash % UCHAR_MAX);
398 unsigned short ushort_one;
399 unsigned short ushort_two;
400 memcpy (&ushort_one, &tranid,
sizeof (
unsigned short));
401 memcpy (&ushort_two, (
char *) (&tranid) +
sizeof (
unsigned short),
sizeof (
unsigned short));
403 hash = (hash << 5) - hash + ushort_two;
406 unsig_gtrid = (unsig_gtrid << 16) + (hash % SHRT_MAX);
415 gtrid = (int) unsig_gtrid;
553 if (*decision ==
true)
978 gtrids[count++] = tdes->
gtrid;
1090 if (client_tdes ==
NULL)
1190 #if defined(CUBRID_DEBUG) 1192 "log_2pc_prepare: Transaction %d " "(index = %d) is not active for prepare to commit." 1207 #if defined(CUBRID_DEBUG) 1209 "log_2pc_prepare: May be a system error.\n" "Prepare to commit requested on the transaction = %d" 1210 " (index = %d) which has permanent operations attached" 1211 " to it.\n Will attach those system operations to the" " transaction\n", tdes->
trid,
1230 if (other_tdes ==
NULL)
1239 && other_tdes->
gtrid == gtrid)
1259 tdes->
gtrid = gtrid;
1266 if (decision ==
false)
1302 prepared->
gtrid = gtrid;
1386 if (acquire_locks !=
false)
1679 int *ack_list,
int *ack_count)
1682 void *block_particps_ids;
1684 int particp_id_length;
1701 block_particps_ids = malloc (particp_id_length * num_particps);
1702 if (block_particps_ids ==
NULL)
1712 logpb_copy_from_log (thread_p, (
char *) block_particps_ids, particp_id_length * num_particps, log_lsa, log_page_p);
1738 if (*ack_count > 0 && ack_list !=
NULL)
1745 for (i = 0; i < *ack_count; i++)
1780 if ((*ack_count + 1) > (*size_ack_list))
1783 if (*size_ack_list == 0)
1790 *size_ack_list = 10;
1792 size = (*size_ack_list) *
sizeof (int);
1793 ack_list = (
int *) malloc (size);
1794 if (ack_list ==
NULL)
1804 *size_ack_list = ((int) (((
float) (*size_ack_list) * 1.30) + 0.5));
1805 size = (*size_ack_list) *
sizeof (int);
1806 ack_list = (
int *) realloc (ack_list, size);
1807 if (ack_list ==
NULL)
1864 LOG_PAGE * log_page_p,
int **ack_list,
int *ack_count,
int *size_ack_list,
1865 bool * search_2pc_prepare,
bool * search_2pc_start)
1867 switch (record_type)
1870 if (*search_2pc_prepare)
1873 *search_2pc_prepare =
false;
1878 if (*search_2pc_start)
1882 *search_2pc_start =
false;
1940 if (*search_2pc_start ==
false)
1942 *search_2pc_prepare =
false;
1944 else if (*search_2pc_prepare ==
false)
1946 *search_2pc_start =
false;
1954 #if defined(CUBRID_DEBUG) 1995 bool search_2pc_prepare =
false;
1996 bool search_2pc_start =
false;
1998 int *ack_list =
NULL;
1999 int size_ack_list = 0;
2014 search_2pc_prepare =
true;
2022 search_2pc_start =
true;
2029 log_page_p = (
LOG_PAGE *) aligned_log_pgbuf;
2031 LSA_COPY (&prev_tranlsa, upto_chain_lsa);
2032 while (!
LSA_ISNULL (&prev_tranlsa) && (search_2pc_prepare || search_2pc_start))
2041 while (prev_tranlsa.
pageid == lsa.
pageid && (search_2pc_prepare || search_2pc_start))
2049 (thread_p, log_rec->
type, tdes, &lsa, log_page_p, &ack_list, &ack_count, &size_ack_list,
2050 &search_2pc_prepare, &search_2pc_start) !=
NO_ERROR)
2061 #if defined(CUBRID_DEBUG) 2063 "log_2pc_recovery_analysis_info:" " SYSTEM ERROR... Either the LOG_2PC_PREPARE/LOG_2PC_START\n" 2064 " log record was not found for participant of distributed" " trid = %d with state = %s", tdes->
trid,
2075 if (search_2pc_start)
2082 #if defined(CUBRID_DEBUG) 2084 "log_2pc_recovery_analysis_info:" " SYSTEM ERROR... The LOG_2PC_START log record was" 2085 " not found for coordinator of distributed trid = %d" " with state = %s", tdes->
trid,
2142 state = tdes->
state;
2149 tdes->
state = state;
2181 state = tdes->
state;
2185 tdes->
state = state;
2283 switch (tdes->
state)
2334 #if defined (ENABLE_UNUSED_FUNCTION) 2375 int num_particps = 0;
2376 int particp_id_length;
2377 void *block_particps_ids;
2385 if (num_particps > 0)
#define LOG_READ_ADD_ALIGN(thread_p, add, lsa, log_pgptr)
unsigned int num_object_locks
void log_2pc_recovery_analysis_info(THREAD_ENTRY *thread_p, log_tdes *tdes, LOG_LSA *upto_chain_lsa)
bool(* send_commit)(int gtrid, int num_particps, int *particp_indices, void *block_particps_ids)
TRAN_STATE log_commit_local(THREAD_ENTRY *thread_p, LOG_TDES *tdes, bool retain_lock, bool is_local_tran)
void log_2pc_dump_acqobj_locks(FILE *fp, int length, void *data)
TRAN_STATE log_abort(THREAD_ENTRY *thread_p, int tran_index)
char user_name[DB_MAX_USER_LENGTH+1]
void logpb_fatal_error(THREAD_ENTRY *thread_p, bool logexit, const char *file_name, const int lineno, const char *fmt,...)
void(* dump_participants)(FILE *fp, int block_length, void *block_particps_id)
static LOG_TDES * log_2pc_find_tran_descriptor(int gtrid)
void LSA_COPY(log_lsa *plsa1, const log_lsa *plsa2)
void set_system_internal_with_user(const char *db_user)
#define ER_LOG_PAGE_CORRUPTED
#define LOG_ISTRAN_2PC_IN_SECOND_PHASE(tdes)
LOG_2PC_COORDINATOR * coord
int logpb_fetch_page(THREAD_ENTRY *thread_p, const LOG_LSA *req_lsa, LOG_CS_ACCESS_MODE access_mode, LOG_PAGE *log_pgptr)
#define ER_LOG_CANNOT_SET_GTRINFO
const int LOG_SYSTEM_TRAN_INDEX
TRAN_STATE log_2pc_prepare(THREAD_ENTRY *thread_p)
#define ER_LOG_2PC_UNKNOWN_GTID
int lock_reacquire_crash_locks(THREAD_ENTRY *thread_p, LK_ACQUIRED_LOCKS *acqlocks, int tran_index)
#define LOG_GET_LOG_RECORD_HEADER(log_page_p, lsa)
int(* send_prepare)(int gtrid, int num_particps, void *block_particps_ids)
static void log_2pc_recovery_prepare(THREAD_ENTRY *thread_p, LOG_TDES *tdes, LOG_LSA *log_lsa, LOG_PAGE *log_page_p)
char user_name[DB_MAX_USER_LENGTH+1]
#define ER_LOG_2PC_CANNOT_START
const int LOG_2PC_NULL_GTRID
void lock_dump_acquired(FILE *fp, LK_ACQUIRED_LOCKS *acqlocks)
LOG_TDES * LOG_FIND_TDES(int tran_index)
static void log_2pc_recovery_abort_decision(THREAD_ENTRY *thread_p, LOG_TDES *tdes)
#define ER_LOG_2PC_CANNOT_ATTACH
bool log_2pc_clear_and_is_tran_distributed(log_tdes *tdes)
int(* lookup_participant)(void *particp_id, int num_particps, void *block_particps_ids)
LOG_PRIOR_NODE * prior_lsa_alloc_and_copy_data(THREAD_ENTRY *thread_p, LOG_RECTYPE rec_type, LOG_RCVINDEX rcvindex, LOG_DATA_ADDR *addr, int ulength, const char *udata, int rlength, const char *rdata)
bool log_2pc_send_prepare(int gtrid, int num_particps, void *block_particps_ids)
void log_2pc_dump_gtrinfo(FILE *fp, int length, void *data)
const char * log_state_string(TRAN_STATE state)
#define PTR_ALIGN(addr, boundary)
void * block_particps_ids
int log_2pc_attach_global_tran(THREAD_ENTRY *thread_p, int gtrid)
#define er_log_debug(...)
static void log_2pc_append_start(THREAD_ENTRY *thread_p, LOG_TDES *tdes)
struct lk_acqobj_lock LK_ACQOBJ_LOCK
static bool log_2pc_check_duplicate_global_tran_id(int gtrid)
static int * log_2pc_expand_ack_list(THREAD_ENTRY *thread_p, int *ack_list, int *ack_count, int *size_ack_list)
const bool LOG_2PC_OBTAIN_LOCKS
#define TR_TABLE_CS_ENTER(thread_p)
static int log_2pc_commit_first_phase(THREAD_ENTRY *thread_p, LOG_TDES *tdes, LOG_2PC_EXECUTE execute_2pc_type, bool *decision)
void log_2pc_free_coord_info(log_tdes *tdes)
LOG_LSA prior_lsa_next_record(THREAD_ENTRY *thread_p, LOG_PRIOR_NODE *node, log_tdes *tdes)
int log_2pc_set_global_tran_info(THREAD_ENTRY *thread_p, int gtrid, void *info, int size)
void er_set(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
char * log_2pc_sprintf_particp(void *particp_id)
TRAN_STATE log_2pc_commit(THREAD_ENTRY *thread_p, log_tdes *tdes, LOG_2PC_EXECUTE execute_2pc_type, bool *decision)
#define ER_LOG_2PC_NON_UNIQUE_GTID
#define ER_LOG_2PC_NOT_STARTED
TRAN_STATE log_complete_for_2pc(THREAD_ENTRY *thread_p, LOG_TDES *tdes, LOG_RECTYPE iscommitted, LOG_GETNEWTRID get_newtrid)
#define LOG_ISTRAN_ACTIVE(tdes)
#define ER_OUT_OF_VIRTUAL_MEMORY
static TRAN_STATE log_2pc_commit_second_phase(THREAD_ENTRY *thread_p, LOG_TDES *tdes, bool *decision)
enum log_rectype LOG_RECTYPE
bool(* send_abort)(int gtrid, int num_particps, int *particp_indices, void *block_particps_ids, int collect)
void log_2pc_read_prepare(THREAD_ENTRY *thread_p, int acquire_locks, log_tdes *tdes, LOG_LSA *log_lsa, LOG_PAGE *log_page_p)
int log_2pc_start(THREAD_ENTRY *thread_p)
int num_prepared_loose_end_indices
static void log_2pc_recovery_recv_ack(THREAD_ENTRY *thread_p, LOG_LSA *log_lsa, LOG_PAGE *log_page_p, int *ack_list, int *ack_count)
bool log_2pc_send_commit_decision(int gtrid, int num_particps, int *particps_indices, void *block_particps_ids)
bool LSA_ISNULL(const log_lsa *lsa_ptr)
void log_2pc_dump_participants(FILE *fp, int block_length, void *block_particps_ids)
static int log_2pc_make_global_tran_id(TRANID tranid)
bool log_2pc_send_abort_decision(int gtrid, int num_particps, int *particps_indices, void *block_particps_ids, bool collect)
static void log_2pc_recovery_aborted_informing_participants(THREAD_ENTRY *thread_p, LOG_TDES *tdes)
#define LOG_SET_CURRENT_TRAN_INDEX(thrd, index)
int count(int &result, const cub_regex_object ®, const std::string &src, const int position, const INTL_CODESET codeset)
#define ER_LOG_UNKNOWN_TRANINDEX
log_tdes * log_2pc_alloc_coord_info(log_tdes *tdes, int num_particps, int particp_id_length, void *block_particps_ids)
void log_2pc_recovery(THREAD_ENTRY *thread_p)
int(* get_participants)(int *particp_id_length, void **block_particps_ids)
#define LOG_ISTRAN_2PC_PREPARE(tdes)
unsigned int css_gethostid(void)
static void error(const char *msg)
void logtb_free_tran_index(THREAD_ENTRY *thread_p, int tran_index)
#define LOG_FIND_THREAD_TRAN_INDEX(thrd)
#define LOG_READ_ALIGN(thread_p, lsa, log_pgptr)
static void log_2pc_recovery_collecting_participant_votes(THREAD_ENTRY *thread_p, LOG_TDES *tdes)
const char * get_db_user() const
#define free_and_init(ptr)
void LSA_SET_NULL(log_lsa *lsa_ptr)
void logpb_flush_pages(THREAD_ENTRY *thread_p, LOG_LSA *flush_lsa)
void logpb_copy_from_log(THREAD_ENTRY *thread_p, char *area, int length, LOG_LSA *log_lsa, LOG_PAGE *log_pgptr)
enum log_2pc_execute LOG_2PC_EXECUTE
char *(* sprintf_participant)(void *particp_id)
static void log_2pc_recovery_committed_informing_participants(THREAD_ENTRY *thread_p, LOG_TDES *tdes)
int log_2pc_recovery_prepared(THREAD_ENTRY *thread_p, int gtrids[], int size)
void log_sysop_attach_to_outer(THREAD_ENTRY *thread_p)
static void log_2pc_recovery_commit_decision(THREAD_ENTRY *thread_p, LOG_TDES *tdes)
static int log_2pc_recovery_start(THREAD_ENTRY *thread_p, LOG_TDES *tdes, LOG_LSA *log_lsa, LOG_PAGE *log_page_p, int *ack_list, int *ack_count)
TRAN_STATE log_abort_local(THREAD_ENTRY *thread_p, LOG_TDES *tdes, bool is_local_tran)
#define LOG_ISTRAN_2PC(tdes)
static int log_2pc_get_num_participants(int *partid_len, void **block_particps_ids)
static int log_2pc_recovery_analysis_record(THREAD_ENTRY *thread_p, LOG_RECTYPE record_type, LOG_TDES *tdes, LOG_LSA *log_lsa, LOG_PAGE *log_page_p, int **ack_list, int *ack_count, int *size_ack_list, bool *search_2pc_prepare, bool *search_2pc_start)
TRAN_STATE log_2pc_prepare_global_tran(THREAD_ENTRY *thread_p, int gtrid)
unsigned int num_page_locks
bool log_2pc_is_tran_distributed(log_tdes *tdes)
#define TR_TABLE_CS_EXIT(thread_p)
#define DB_MAX_USER_LENGTH
static void log_2pc_append_decision(THREAD_ENTRY *thread_p, LOG_TDES *tdes, LOG_RECTYPE decsion)
const bool LOG_2PC_DONT_OBTAIN_LOCKS
void lock_unlock_all_shared_get_all_exclusive(THREAD_ENTRY *thread_p, LK_ACQUIRED_LOCKS *acqlocks)
int log_2pc_get_global_tran_info(THREAD_ENTRY *thread_p, int gtrid, void *buffer, int size)
static int log_2pc_attach_client(THREAD_ENTRY *thread_p, LOG_TDES *tdes, LOG_TDES *client_tdes)
#define LOG_READ_ADVANCE_WHEN_DOESNT_FIT(thread_p, length, lsa, log_pgptr)
struct log_2pc_global_data log_2pc_Userfun