45 #define PROXY_MAX_IGNORE_TIMER_CHECK 10 46 #define PROXY_TIMER_CHECK_INTERVAL 1 164 fn_proxy_cas_relay_only
207 if (waiter_p ==
NULL)
220 if (counter !=
NULL && *counter > 0)
334 if (client_info_p !=
NULL)
420 if (func_code <= 0 || func_code >=
CAS_FC_MAX)
435 error = proxy_cas_fn (ctx_p, event_p);
551 PROXY_DEBUG_LOG (
"Context is in_tran status " "and waiting prepare/execute response. " 574 PROXY_DEBUG_LOG (
"context is in_tran status. " "and function code %d, " "cas_is_in_ran %s, " 603 switch (event_p->
type)
662 argc =
net_decode_str (payload, length, &func_code, (
void ***) (&argv));
673 if (func_code <= 0 || func_code >=
CAS_FC_MAX)
710 error = proxy_client_fn (ctx_p, event_p, argc, argv);
812 if (client_info_p ==
NULL)
815 "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_p->
cid,
884 switch (event_p->
type)
919 static bool is_init =
false;
941 "Not enough virtual memory. " "Failed to alloc context entries. " "(errno:%d, size:%d).", errno, size);
944 memset (proxy_Context.
ent, 0, size);
946 for (i = 0; i < proxy_Context.
size; i++)
948 ctx_p = &(proxy_Context.
ent[
i]);
978 #if defined(PROXY_VERBOSE_DEBUG) 984 fprintf (fp,
"* STMT_LIST: ");
985 for (stmt_list_p = ctx_p->
stmt_list; stmt_list_p; stmt_list_p = stmt_list_p->
next)
987 fprintf (fp,
"%-10d ", stmt_list_p->
stmt_h_id);
995 proxy_context_dump_title (FILE * fp)
998 "%-5s %-10s %-10s %-5s %-5s " "%-5s %-5s %-15s " "%-15s %-15s %-15s " "%-10s %-10s %-10s " "%-10s %-10s \n",
999 "CID",
"UID",
"CLIENT",
"SHARD",
"CAS",
"BUSY",
"IN_TRAN",
"PREPARE_FOR_EXEC",
"FREE_ON_END_TRAN",
1000 "FREE_CONTEXT",
"CLIENT_END_TRAN",
"FUNC_CODE",
"STMT_H_ID",
"STMT_HINT_TYPE",
"ERROR_IND",
"ERROR_CODE");
1010 "%-5d %-10u %-10d %-5d %-5d " "%-5s %-5s %-15s " "%-15s %-15s %-15s %-15s " "%-10d %-10d %-10d " 1018 proxy_context_dump_stmt (fp, ctx_p);
1024 proxy_context_dump_all (FILE * fp)
1030 fprintf (fp,
"* %-20s : %d\n",
"SIZE", proxy_Context.
size);
1033 if (proxy_Context.
ent ==
NULL)
1038 proxy_context_dump_title (fp);
1039 for (i = 0; i < proxy_Context.
size; i++)
1041 ctx_p = &(proxy_Context.
ent[
i]);
1042 proxy_context_dump (fp, ctx_p);
1048 proxy_context_print (
bool print_all)
1057 "idx",
"cid",
"uid",
"busy",
"wait",
"in_tran",
"client_id",
"shard_id",
"cas_id",
"func_code",
"stmt_id");
1058 if (proxy_Context.
ent)
1060 for (i = 0; i < proxy_Context.
size; i++)
1062 ctx_p = &(proxy_Context.
ent[
i]);
1064 if (!print_all && !ctx_p->
is_busy)
1081 static char buffer[BUFSIZ];
1085 return (
char *)
"-";
1088 snprintf (buffer,
sizeof (buffer),
1089 "cid:%d, uid:%u, " "is_busy:%s, is_in_tran:%s, " "is_prepare_for_execute:%s, free_on_end_tran:%s, " 1090 "free_on_client_io_write:%s, " "free_context:%s, is_client_in_tran:%s, " "is_cas_in_tran:%s, " 1091 "waiting_event:(%p, %s), " "func_code:%d, stmt_h_id:%d, stmt_hint_type:%d, " "wait_timeout:%d, " 1092 "client_id:%d, shard_id:%d, cas_id:%d, " "error_ind:%d, error_code:%d, error_msg:[%s] ", ctx_p->
cid,
1101 return (
char *) buffer;
1201 static unsigned int uid = 0;
1210 ctx_p->
uid = (++uid == 0) ? ++uid : uid;
1217 #if defined(PROXY_VERBOSE_DEBUG) 1218 proxy_context_print (
false);
1243 if (client_info_p !=
NULL)
1297 #if defined (ENABLE_UNUSED_FUNCTION) 1299 proxy_context_free_by_cid (
int cid,
unsigned int uid)
1318 ctx_p = &(proxy_Context.
ent[cid]);
1332 if (cli_io_p ==
NULL)
1358 for (stmt_list_p = ctx_p->
stmt_list; stmt_list_p; stmt_list_p = stmt_list_p->
next)
1360 if (stmt_list_p->
stmt_h_id != stmt_h_id)
1384 if (stmt_list_p ==
NULL)
1415 for (stmt_list_p = ctx_p->
stmt_list; stmt_list_p; stmt_list_p = stmt_list_np)
1417 stmt_list_np = stmt_list_p->
next;
1422 PROXY_DEBUG_LOG (
"remove prepared statement from context. " "(context:(%s), stmt:(%s))",
1490 "proxy service temporarily unavailable");
1510 static bool is_init =
false;
1512 if (is_init ==
true)
1599 PROXY_DEBUG_LOG (
"wakeup context(cid:%d, uid:%u) by shard(%d)/cas(%d).", ctx_p->
cid, ctx_p->
uid, shard_id, cas_id);
1604 if (cas_io_p ==
NULL)
1606 PROXY_DEBUG_LOG (
"failed to proxy_cas_alloc_by_ctx. " "(shard_id:%d, cas_id:%d, ctx_cid:%d, ctx_uid:%d", shard_id,
1619 if (event_p ==
NULL)
1658 if (event_p ==
NULL)
1684 event_p->
type = type;
1692 memset (((
void *) &event_p->
buffer), 0, sizeof (event_p->
buffer));
1706 memcpy ((
void *) new_event_p, (
void *) event_p, offsetof (
T_PROXY_EVENT, buffer));
1730 if (event_p ==
NULL)
1735 length = req_func (driver_info, &msg);
1762 if (event_p ==
NULL)
1767 length = resp_func (driver_info, &msg, argv);
1781 int (*err_func) (
char *driver_info,
char **buffer,
int error_ind,
int error_code,
1782 const char *error_msg,
char is_in_tran),
int error_ind,
int error_code,
1783 const char *error_msg,
char is_in_tran)
1790 if (event_p ==
NULL)
1795 length = err_func (driver_info, &msg, error_ind, error_code, error_msg, is_in_tran);
1814 event_p->
buffer.
data = (
char *) malloc (size *
sizeof (
char));
1858 event_p->
type = type;
1882 event_p->
cas_id = cas_id;
1896 #if defined (ENABLE_UNUSED_FUNCTION) 1921 static char buffer[LINE_MAX];
1923 if (event_p ==
NULL)
1925 return (
char *)
"-";
1928 snprintf (buffer,
sizeof (buffer),
"type:%d, from_cas:%s, cid:%d, uid:%u, shard_id:%d, cas_id:%d", event_p->
type,
1931 return (
char *) buffer;
1937 static int num_called = 0;
1943 if (num_called != 0)
1949 diff_time = now - old;
1966 static char buffer[LINE_MAX];
1968 size_t head_len, ws_len, tail_len;
1973 return (
char *)
"-";
1977 if (len < (
int)
sizeof (buffer))
1983 head_len =
sizeof (buffer) / 2;
1984 tail_len =
sizeof (buffer) - head_len - ws_len;
1987 memcpy (buffer, sql, head_len);
1990 *(buffer + head_len) =
'\n';
1991 memset ((buffer + head_len + 1), (
int)
'.', ws_len - 2);
1992 *(buffer + head_len + ws_len - 1) =
'\n';
1996 from = (to - tail_len);
1997 memcpy ((buffer + head_len + ws_len), from, tail_len);
1999 buffer[LINE_MAX - 1] =
'\0';
T_PROXY_EVENT * proxy_event_new_with_req(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC req_func)
void proxy_context_set_out_tran(T_PROXY_CONTEXT *ctx_p)
int fn_proxy_client_not_supported(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int(* T_PROXY_EVENT_FUNC)(char *driver_info, char **buffer)
int fn_proxy_client_con_close(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
T_CONTEXT_STMT * proxy_context_add_stmt(T_PROXY_CONTEXT *ctx_p, T_SHARD_STMT *stmt_p)
int fn_proxy_client_set_db_parameter(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int(* T_PROXY_CLIENT_FUNC)(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void proxy_event_free(T_PROXY_EVENT *event_p)
static void proxy_handler_process_client_wakeup_by_shard(T_PROXY_EVENT *event_p)
int fn_proxy_client_check_cas(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int proxy_wakeup_context_by_shard(T_WAIT_CONTEXT *waiter_p, int shard_id, int cas_id)
void proxy_io_buffer_clear(T_IO_BUFFER *io_buffer)
void proxy_event_set_buffer(T_PROXY_EVENT *event_p, char *data, unsigned int size)
int fn_proxy_cas_check_cas(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int proxy_handler_initialize(void)
T_PROXY_CONTEXT * proxy_context_find(int cid, unsigned int uid)
#define PROXY_DEBUG_LOG(fmt, args...)
int fn_proxy_client_get_db_parameter(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_INFO * proxy_info_p
static void proxy_handler_process_cas_response(T_PROXY_EVENT *event_p)
void proxy_event_set_shard(T_PROXY_EVENT *event_p, int shard_id, int cas_id)
int fn_proxy_cas_execute_array(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int shard_stmt_unpin(T_SHARD_STMT *stmt_p)
T_PROXY_CONTEXT * proxy_context_find_by_socket_client_io(T_SOCKET_IO *sock_io_p)
#define PROXY_INVALID_CONTEXT
void proxy_available_cas_wait_timer(void)
char database_user[SRV_CON_DBUSER_SIZE]
int fn_proxy_cas_prepare(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
T_PROXY_EVENT * proxy_event_dup(T_PROXY_EVENT *event_p)
int shard_queue_enqueue(T_SHARD_QUEUE *q, void *v)
#define PROXY_EVENT_FROM_CAS
void proxy_event_set_type_from(T_PROXY_EVENT *event_p, unsigned int type, int from_cas)
int fn_proxy_client_prepare_and_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_EVENT * proxy_event_new_with_rsp_ex(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC_EX resp_func, void *argv)
void shard_shm_init_client_info_request(T_CLIENT_INFO *client_info_p)
bool proxy_event_io_read_complete(T_PROXY_EVENT *event_p)
char * proxy_get_driver_info_by_ctx(T_PROXY_CONTEXT *ctx_p)
int fn_proxy_client_cursor(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
static int proxy_context_initialize(void)
void proxy_context_set_error(T_PROXY_CONTEXT *ctx_p, int error_ind, int error_code)
T_APPL_SERVER_INFO * shard_shm_get_as_info(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id)
void proxy_timer_process(void)
static void proxy_handler_process_cas_event(T_PROXY_EVENT *event_p)
int proxy_send_response_to_client(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_get_shard_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
char * shard_str_sqls(char *sql)
void proxy_handler_process(void)
T_PROXY_EVENT * proxy_event_new_with_error(char *driver_info, unsigned int type, int from, int(*err_func)(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran), int error_ind, int error_code, const char *error_msg, char is_in_tran)
void proxy_context_free_stmt(T_PROXY_CONTEXT *ctx_p)
T_SHARD_STMT * prepared_stmt
void proxy_cas_io_free_by_ctx(int shard_id, int cas_id, int ctx_cid, int unsigned ctx_uid)
void proxy_context_set_in_tran(T_PROXY_CONTEXT *ctx_p, int shard_id, int cas_id)
int proxy_context_send_error(T_PROXY_CONTEXT *ctx_p)
bool proxy_handler_is_cas_in_tran(int shard_id, int cas_id)
#define CAS_MAKE_PROTO_VER(DRIVER_INFO)
#define DOES_CLIENT_MATCH_THE_PROTOCOL(CLIENT, MATCH)
#define PROXY_TIMER_CHECK_INTERVAL
int proxy_check_cas_error(char *read_msg)
void proxy_client_io_free_by_ctx(int client_id, int ctx_cid, int ctx_uid)
char * proxy_str_event(T_PROXY_EVENT *event_p)
T_CONTEXT_STMT * stmt_list
void proxy_unset_force_out_tran(char *msg)
char database_passwd[SRV_CON_DBPASSWD_SIZE]
void shard_statement_wait_timer(void)
bool free_on_client_io_write
int fn_proxy_client_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void shard_shm_init_client_info(T_CLIENT_INFO *client_info_p)
static void proxy_handler_process_client_conn_error(T_PROXY_EVENT *event_p)
int fn_proxy_client_conn_error(T_PROXY_CONTEXT *ctx_p)
#define PROXY_INVALID_FUNC_CODE
void proxy_waiter_free(T_WAIT_CONTEXT *waiter)
void proxy_context_timeout(T_PROXY_CONTEXT *ctx_p)
T_PROXY_CONTEXT_GLOBAL proxy_Context
int proxy_waiter_comp_fn(const void *arg1, const void *arg2)
void proxy_context_set_error_with_msg(T_PROXY_CONTEXT *ctx_p, int error_ind, int error_code, const char *error_msg)
int proxy_event_realloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
#define CAS_ERROR_INDICATOR
int get_data_length(char *buffer)
union t_socket_io::@41 id
char * shard_str_stmt(T_SHARD_STMT *stmt_p)
T_CLIENT_INFO * shard_shm_get_client_info(T_PROXY_INFO *proxy_info_p, int idx)
static void proxy_context_clear(T_PROXY_CONTEXT *ctx_p)
#define SHARD_TEMPORARY_UNAVAILABLE
void * shard_cqueue_dequeue(T_SHARD_CQUEUE *q)
int fn_proxy_client_prepare(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void proxy_client_io_free(T_CLIENT_IO *cli_io_p)
#define PROXY_INVALID_CAS
int proxy_event_alloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
static void proxy_handler_process_client_request(T_PROXY_EVENT *event_p)
static void proxy_context_destroy(void)
int proxy_io_make_error_msg(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran)
void proxy_handler_destroy(void)
static int proxy_handler_process_cas_error(T_PROXY_EVENT *event_p)
T_CAS_IO * proxy_cas_alloc_by_ctx(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, int timeout, int func_code)
struct t_proxy_context T_PROXY_CONTEXT
int fn_proxy_cas_relay_only(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
void proxy_context_clear_error(T_PROXY_CONTEXT *ctx_p)
static void error(const char *msg)
T_CLIENT_IO * proxy_client_io_find_by_fd(int client_id, SOCKET fd)
int fn_proxy_client_cursor_close(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int shard_stmt_pin(T_SHARD_STMT *stmt_p)
void shard_shm_set_client_info_response(T_CLIENT_INFO *client_info_p)
static void proxy_handler_process_client_wakeup_by_statement(T_PROXY_EVENT *event_p)
T_PROXY_CONTEXT * proxy_context_new(void)
static void proxy_context_free_shard(T_PROXY_CONTEXT *ctx_p)
static void proxy_handler_process_cas_conn_error(T_PROXY_EVENT *event_p)
void proxy_waiter_timeout(T_SHARD_QUEUE *waitq, INT64 *counter, int now)
int fn_proxy_cas_fetch(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_client_schema_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
bool waiting_dummy_prepare
void proxy_cas_release_by_ctx(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
void shard_stmt_check_waiter_and_wakeup(T_SHARD_STMT *stmt_p)
char * proxy_str_context(T_PROXY_CONTEXT *ctx_p)
T_PROXY_HANDLER proxy_Handler
void shard_queue_destroy(T_SHARD_QUEUE *q)
int fn_proxy_cas_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_cas_prepare_and_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
bool is_prepare_for_execute
int net_decode_str(char *msg, int msg_size, char *func_code, void ***ret_argv)
T_PROXY_EVENT * waiting_event
static T_PROXY_CAS_FUNC proxy_cas_fn_table[]
#define PROXY_LOG(level, fmt, args...)
bool shard_shm_set_as_client_info(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id, unsigned int ip_addr, char *driver_info, char *driver_version)
void shard_stmt_free(T_SHARD_STMT *stmt_p)
T_APPL_SERVER_INFO * as_info
static void proxy_context_free_client(T_PROXY_CONTEXT *ctx_p)
void shard_cqueue_destroy(T_SHARD_CQUEUE *q)
T_SHM_APPL_SERVER * shm_as_p
T_PROXY_EVENT * proxy_event_new_with_rsp(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
T_SHM_PROXY * shm_proxy_p
int fn_proxy_client_fetch(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
#define PROXY_MAX_IGNORE_TIMER_CHECK
static void proxy_handler_process_client_event(T_PROXY_EVENT *event_p)
void * shard_queue_dequeue(T_SHARD_QUEUE *q)
int fn_proxy_cas_end_tran(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_client_execute_array(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int shard_cqueue_enqueue(T_SHARD_CQUEUE *q, void *e)
INT64 num_proxy_error_processed
void * shard_queue_peek_value(T_SHARD_QUEUE *q)
T_CONTEXT_STMT * proxy_context_find_stmt(T_PROXY_CONTEXT *ctx_p, int stmt_h_id)
int shard_queue_initialize(T_SHARD_QUEUE *q)
T_SHARD_STMT * shard_stmt_find_by_stmt_h_id(int stmt_h_id)
void proxy_context_free(T_PROXY_CONTEXT *ctx_p)
#define PROXY_INVALID_CLIENT
#define PROXY_INVALID_SHARD
int fn_proxy_client_get_db_version(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_EVENT * proxy_event_new(unsigned int type, int from_cas)
int fn_proxy_cas_schema_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int proxy_socket_io_delete(SOCKET fd)
static T_PROXY_CLIENT_FUNC proxy_client_fn_table[]
#define PROXY_EVENT_FROM_CLIENT
int fn_proxy_client_end_tran(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int fn_proxy_client_close_req_handle(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void proxy_event_set_context(T_PROXY_EVENT *event_p, int cid, unsigned int uid)
int proxy_wakeup_context_by_statement(T_WAIT_CONTEXT *waiter_p)
int(* T_PROXY_EVENT_FUNC_EX)(char *driver_info, char **buffer, void *argv)
char * proxy_str_client_io(T_CLIENT_IO *cli_io_p)
int shard_cqueue_initialize(T_SHARD_CQUEUE *q, int size)
int(* T_PROXY_CAS_FUNC)(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
void shard_shm_set_client_info_request(T_CLIENT_INFO *client_info_p, int func_code)
T_WAIT_CONTEXT * proxy_waiter_new(int ctx_cid, unsigned int ctx_uid, int timeout)
#define SHARD_STMT_INVALID_HANDLE_ID