32 #include <sys/epoll.h> 33 #include <sys/types.h> 34 #include <sys/socket.h> 47 #define min(a,b) ((a) < (b) ? (a) : (b)) 50 #define max(a,b) ((a) > (b) ? (a) : (b)) 53 #if defined (SUPPRESS_STRLEN_WARNING) 54 #define strlen(s1) ((int) strlen(s1)) 58 #define O_NONBLOCK FIONBIO 62 #define PROC_TYPE_CLIENT 0 63 #define PROC_TYPE_CAS 1 64 #define PROC_TYPE_BROKER 2 69 #define CLIENT_READ_ERROR(i) io_error(i, PROC_TYPE_CLIENT, READ_TYPE) 70 #define CLIENT_WRITE_ERROR(i) io_error(i, PROC_TYPE_CLIENT, WRITE_TYPE) 71 #define CAS_READ_ERROR(i) io_error(i, PROC_TYPE_CAS, READ_TYPE) 72 #define CAS_WRITE_ERROR(i) io_error(i, PROC_TYPE_CAS, WRITE_TYPE) 74 #define MAX_NUM_NEW_CLIENT 5 76 #define PROXY_START_PORT 1 77 #define GET_CLIENT_PORT(broker_port, proxy_index) (broker_port) + PROXY_START_PORT + (proxy_index) 78 #define GET_CAS_PORT(broker_port, proxy_index, proxy_max_count) (broker_port) + PROXY_START_PORT + (proxy_max_count) + (proxy_index) 90 typedef T_CAS_IO *(*T_FUNC_FIND_CAS) (
int shard_id,
int cas_id,
int ctx_cid,
unsigned int ctx_uid);
98 #if defined (ENABLE_UNUSED_FUNCTION) 99 static int shard_io_clr_fl (
int fd,
int flags);
100 static int shard_io_setsockbuf (
int fd,
int size);
154 static int proxy_io_inet_lsnr (
int port);
155 static int proxy_io_client_lsnr (
void);
169 static void proxy_set_conn_info (
int func_code,
int ctx_cid,
int ctx_uid,
int shard_id,
int cas_id);
174 unsigned int ctx_uid);
178 const char *db_passwd);
181 static int proxy_get_max_socket (
void);
182 static int proxy_add_epoll_event (
int fd,
unsigned int events);
183 static int proxy_mod_epoll_event (
int fd,
unsigned int events);
184 static int proxy_del_epoll_event (
int fd);
186 static int max_Socket = 0;
188 static struct epoll_event *ep_Event =
NULL;
196 int accept_ip_addr = 0;
233 *((
int *) buffer) =
htonl (length);
245 length = *((
int *) (buffer));
246 return ntohl (length);
274 int remain_size = msg_size;
281 *ret_argv = (
void **)
NULL;
290 while (remain_size > 0)
298 memcpy ((
char *) &i_val, cur_p, 4);
299 i_val =
ntohl (i_val);
303 if (remain_size < i_val)
310 argv = (
void **)
REALLOC (argv,
sizeof (
void *) *
argc);
314 argv[argc - 1] = argp;
317 remain_size -= i_val;
331 if (flags == O_NONBLOCK)
334 if (ioctlsocket ((
SOCKET) fd, FIONBIO, &argp) == SOCKET_ERROR)
344 if ((val = fcntl (fd, F_GETFL, 0)) < 0)
352 if (fcntl (fd, F_SETFL, val) < 0)
361 #if defined (ENABLE_UNUSED_FUNCTION) 363 shard_io_clr_fl (
int fd,
int flags)
368 if (flags == O_NONBLOCK)
371 if (ioctlsocket ((
SOCKET) fd, FIONBIO, &argp) == SOCKET_ERROR)
381 if ((val = fcntl (fd, F_GETFL, 0)) < 0)
389 if (fcntl (fd, F_SETFL, val) < 0)
399 shard_io_setsockbuf (
int fd,
int size)
406 if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, (
char *) &val, &len) < 0)
416 setsockopt (fd, SOL_SOCKET, SO_SNDBUF, (
char *) &val, len);
421 if (getsockopt (fd, SOL_SOCKET, SO_RCVBUF, (
char *) &val, &len) < 0)
432 setsockopt (fd, SOL_SOCKET, SO_RCVBUF, (
char *) &val, len);
447 p = (
char *) malloc (length *
sizeof (
char));
450 memcpy (p, msg, length);
521 "Not enough virtual memory. " "Failed to alloc net buffer. " "(errno:%d, size:%d).", errno, size);
586 *buffer = net_buf.
data;
641 if (error_msg && error_msg[0])
647 *buffer = net_buf.
data;
652 PROXY_DEBUG_LOG (
"make error to send to the client. " "(error_ind:%d, error_code:%d, errro_msg:%s)", error_ind,
653 error_code, (error_msg && error_msg[0]) ? error_msg :
"-");
710 unsigned char func_code;
728 tran_commit = (commit) ? CCI_TRAN_COMMIT : CCI_TRAN_ROLLBACK;
737 *buffer = net_buf.
data;
750 #if defined (ENABLE_UNUSED_FUNCTION) 752 proxy_io_make_end_tran_commit (
char **buffer)
798 *buffer = net_buf.
data;
811 #if defined (ENABLE_UNUSED_FUNCTION) 813 proxy_io_make_close_req_handle_in_tran_ok (
char **buffer)
864 *buffer = net_buf.
data;
880 (*buffer) = (
char *) malloc (
sizeof (
int));
882 if ((*buffer) ==
NULL)
887 memset ((*buffer), 0,
sizeof (
int));
913 *buffer = net_buf.
data;
926 int reply_size, reply_nsize;
979 reply_nsize =
htonl (reply_size);
981 proxy_pid =
htonl (getpid ());
1028 snprintf (err_msg,
sizeof (err_msg),
"Authorization error.(Address is rejected)");
1063 for (shard_index = 0; shard_index < shm_conn_p->
num_shard_conn; shard_index++)
1065 shard_conn_p = &shm_conn_p->
shard_conn[shard_index];
1081 *buffer = net_buf.
data;
1113 *buffer = net_buf.
data;
1143 PROXY_DEBUG_LOG (
"free socket io.(fd:%d,from_cas:%s,shard/cas:%d/%d).\n", sock_io_p->
fd,
1182 if (proxy_Socket_io.
ent)
1197 if (proxy_Socket_io.
ent ==
NULL)
1200 "Not enough virtual memory. " "Failed to alloc socket entry. " "(errno:%d, size:%d).", errno, size);
1203 memset (proxy_Socket_io.
ent, 0, size);
1205 for (i = 0; i < proxy_Socket_io.
max_socket; i++)
1207 sock_io_p = &(proxy_Socket_io.
ent[
i]);
1221 if (proxy_Socket_io.
ent ==
NULL)
1226 for (i = 0; i < proxy_Socket_io.
max_socket; i++)
1228 sock_io_p = &(proxy_Socket_io.
ent[
i]);
1235 (void) proxy_del_epoll_event (sock_io_p->
fd);
1253 #if defined(PROXY_VERBOSE_DEBUG) 1258 int client_id, shard_id, cas_id;
1267 "ip_addr",
"cas",
"client_id",
"shard_id",
"cas_id");
1268 if (proxy_Socket_io.
ent)
1270 for (i = 0; i < proxy_Socket_io.
max_socket; i++)
1272 sock_io_p = &(proxy_Socket_io.
ent[
i]);
1281 shard_id = sock_io_p->
id.
shard.shard_id;
1282 cas_id = sock_io_p->
id.
shard.cas_id;
1283 from_cas = (
char *)
"cas";
1290 from_cas = (
char *)
"client";
1294 sock_io_p->
fd, sock_io_p->
status, inet_ntoa (*((
struct in_addr *) &sock_io_p->
ip_addr)), from_cas,
1295 client_id, shard_id, cas_id);
1313 if (proxy_Socket_io.
ent ==
NULL)
1328 "Number of socket entry exceeds max_socket. " "(current_socket:%d, max_socket:%d).",
1333 sock_io_p = &(proxy_Socket_io.
ent[fd]);
1347 error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLOUT);
1375 #if defined(PROXY_VERBOSE_DEBUG) 1400 sock_io_p = &(proxy_Socket_io.
ent[fd]);
1407 (void) proxy_del_epoll_event (sock_io_p->
fd);
1432 if (cli_io_p ==
NULL)
1455 sock_io_p = &(proxy_Socket_io.
ent[fd]);
1476 error = proxy_mod_epoll_event (sock_io_p->
fd, EPOLLIN | EPOLLOUT);
1508 int proxy_status = 0;
1510 #if !defined (WINDOWS) 1518 #if defined(WINDOWS) 1519 client_fd = lsnr_fd;
1520 client_ip = accept_ip_addr;
1525 client_fd =
recv_fd (lsnr_fd, &client_ip, driver_info);
1537 length =
WRITESOCKET (lsnr_fd, &proxy_status,
sizeof (proxy_status));
1538 if (length !=
sizeof (proxy_status))
1549 if (sock_io_p ==
NULL)
1557 if (cli_io_p ==
NULL)
1564 sock_io_p->
ip_addr = client_ip;
1574 #if !defined(WINDOWS) 1579 if (event_p ==
NULL)
1613 char *url =
NULL, *db_sessionid =
NULL;
1614 struct timeval client_start_time;
1619 unsigned char *ip_addr;
1633 gettimeofday (&client_start_time,
NULL);
1638 db_name = read_buffer->
data;
1646 goto clear_event_and_return;
1652 goto connection_established;
1662 if (event_p ==
NULL)
1683 goto clear_event_and_return;
1688 if (db_user[0] ==
'\0')
1690 strcpy (db_user,
"PUBLIC");
1701 driver_version[0] =
'\0';
1704 len = *(url +
strlen (url) + 1);
1707 memcpy (driver_version, url +
strlen (url) + 2, (
int) len);
1708 driver_version[(int) len] =
'\0';
1732 memcpy (client_info_p->
driver_version, driver_version, sizeof (driver_version));
1743 ip_addr = (
unsigned char *) (&sock_io_p->
ip_addr);
1752 snprintf (err_msg,
sizeof (err_msg),
"Authorization error.(Address is rejected)");
1756 db_name, db_user, db_passwd);
1761 (db_user) ? (
const char *) db_user :
"-",
true);
1764 goto connection_established;
1771 goto connection_established;
1774 strncpy (ctx_p->database_user, db_user, SRV_CON_DBUSER_SIZE - 1);
1782 if (event_p ==
NULL)
1807 goto clear_event_and_return;
1810 connection_established:
1820 ctx_p->free_on_client_io_write =
true;
1828 if (event_p ==
NULL)
1850 ctx_p->is_connected =
true;
1857 (db_user) ? (
const char *) db_user :
"-",
true);
1861 clear_event_and_return:
1932 error = proxy_del_epoll_event (sock_io_p->
fd);
1950 if (event_p ==
NULL)
1953 "PROXY_EVENT_FROM_CLIENT");
2044 switch (sock_io_p->
status)
2084 memcpy (&func_code, p,
sizeof (
char));
2088 memcpy (&shard_id, p,
sizeof (
int));
2089 shard_id =
ntohl (shard_id);
2092 memcpy (&cas_id, p,
sizeof (
int));
2093 cas_id =
ntohl (cas_id);
2096 if (cas_io_p ==
NULL)
2099 cas_id, sock_io_p->
fd);
2106 sock_io_p->
id.
shard.shard_id = shard_id;
2107 sock_io_p->
id.
shard.cas_id = cas_id;
2135 if (cas_io_p && shard_id >= 0 && cas_id >= 0)
2168 if (cas_io_p ==
NULL)
2186 "Unexpected CAS transaction status. " "(expected tran status:%d). CAS(%s). event(%s).",
true,
2253 error = proxy_del_epoll_event (sock_io_p->
fd);
2265 if (cas_io_p ==
NULL)
2303 if (event_p ==
NULL)
2395 switch (sock_io_p->
status)
2441 p = (
char *) (send_buffer->
data + send_buffer->
offset);
2446 #if defined(WINDOWS) 2449 error = WSAGetLastError ();
2450 if (error == WSAEWOULDBLOCK)
2452 if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR))
2466 send_buffer->
offset += write_len;
2484 (void) proxy_mod_epoll_event (sock_io_p->
fd, EPOLLIN);
2545 int read_len, remain, total_len;
2555 buffer = (
char *) (read_buffer->
data + read_buffer->
offset);
2561 #if defined(WINDOWS) 2562 error = WSAGetLastError ();
2563 if ((error == WSAECONNRESET) || (error == WSAECONNABORTED))
2567 else if (error == WSAEWOULDBLOCK)
2569 if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR))
2576 else if (read_len == 0)
2581 read_buffer->
offset += read_len;
2588 if (total_len == read_buffer->
offset)
2591 return read_buffer->
offset;
2597 PROXY_DEBUG_LOG (
"Failed to realloc event buffer. (error:%d).", error);
2766 (void) proxy_mod_epoll_event (sock_io_p->
fd, EPOLLIN);
2782 (void) proxy_mod_epoll_event (sock_io_p->
fd, EPOLLIN);
2829 PROXY_DEBUG_LOG (
"Unexpected socket status. " "socket will be closed. " "(fd:%d, status:%d).", sock_io_p->
fd,
2904 if (proxy_Client_io.
ent ==
NULL)
2907 "Not enough virtual memory. " "Failed to alloc client entries. " "(errno:%d, size:%d).", errno, size);
2913 client_io_ent_p = &(proxy_Client_io.
ent[
i]);
2916 client_io_ent_p->
is_busy =
false;
2946 #if defined(PROXY_VERBOSE_DEBUG) 2959 "context_id",
"uid");
2960 if (proxy_Client_io.
ent)
2964 cli_io_p = &(proxy_Client_io.
ent[
i]);
2965 if (!print_all && !cli_io_p->
is_busy)
2981 static char buffer[LINE_MAX];
2983 if (cli_io_p ==
NULL)
2985 return (
char *)
"-";
2988 snprintf (buffer,
sizeof (buffer),
"client_id:%d, is_busy:%s, fd:%d, ctx_cid:%d, ctx_uid:%u", cli_io_p->
client_id,
2991 return (
char *) buffer;
3035 snprintf (err_msg,
sizeof (err_msg),
"Proxy refused client connection. max clients exceeded");
3041 #if defined(PROXY_VERBOSE_DEBUG) 3088 if (cli_io_p ==
NULL)
3104 if (client_id < 0 || client_id >= proxy_Client_io.
max_context)
3111 cli_io_p = &(proxy_Client_io.
ent[client_id]);
3113 if (cli_io_p->
ctx_cid != ctx_cid || cli_io_p->
ctx_uid != ctx_uid)
3128 if (client_id < 0 || client_id >= proxy_Client_io.
max_context)
3135 cli_io_p = &(proxy_Client_io.
ent[client_id]);
3154 if (sock_io_p ==
NULL)
3186 for (i = 0; i < size; i++)
3188 cas_io_p = &(buffer[
i]);
3198 *cas_io_pp = buffer;
3213 if (sock_io_p ==
NULL)
3238 int max_appl_server;
3243 if (proxy_Shard_io.
ent ==
NULL)
3251 for (i = 0; i < proxy_Shard_io.
max_shard; i++)
3253 shard_io_p = &(proxy_Shard_io.
ent[
i]);
3285 for (i = 0; i < proxy_Shard_io.
max_shard; i++)
3287 shard_io_p = &(proxy_Shard_io.
ent[
i]);
3298 #if defined(PROXY_VERBOSE_DEBUG) 3311 if (proxy_Shard_io.
ent)
3313 for (i = 0; i < proxy_Shard_io.
max_shard; i++)
3315 shard_io_p = &(proxy_Shard_io.
ent[
i]);
3321 "shard_id",
"in_tran",
"fd");
3322 if (shard_io_p->
ent)
3326 cas_io_p = &(shard_io_p->
ent[j]);
3346 static char buffer[LINE_MAX];
3348 if (cas_io_p ==
NULL)
3350 return (
char *)
"-";
3353 snprintf (buffer,
sizeof (buffer),
3354 "cas_id:%d, shard_id:%d, is_in_tran:%s, " "status:%d, ctx_cid:%d, ctx_uid:%u, fd:%d", cas_io_p->
cas_id,
3358 return (
char *) buffer;
3366 if (shard_id < 0 || shard_id >= proxy_Shard_io.
max_shard)
3372 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
3383 if (shard_id < 0 || shard_id >= proxy_Shard_io.
max_shard)
3390 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
3391 if (cas_id < 0 || cas_id >= shard_io_p->
max_num_cas)
3397 cas_io_p = &(shard_io_p->
ent[cas_id]);
3417 #if defined(PROXY_VERBOSE_DEBUG) 3433 if (shard_id < 0 || shard_id >= proxy_Shard_io.
max_shard)
3440 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
3441 if (cas_id < 0 || cas_id >= shard_io_p->
max_num_cas)
3448 cas_io_p = &(shard_io_p->
ent[cas_id]);
3493 if (shard_id < 0 || shard_id >= proxy_Shard_io.
max_shard)
3501 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
3502 if (cas_id < 0 || cas_id >= shard_io_p->
max_num_cas)
3509 cas_io_p = &(shard_io_p->
ent[cas_id]);
3563 PROXY_DEBUG_LOG (
"Unable to find CAS entry. " "(shard:%d, cas:%d, fd:%d).", shard_id, cas_id, fd);
3569 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
3572 assert (cas_id <= shard_io_p->max_num_cas);
3573 cas_io_p = &(shard_io_p->
ent[cas_id]);
3587 if (0 > shard_id || shard_id >= proxy_Shard_io.
max_shard)
3592 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
3594 if (0 > cas_id || cas_id >= shard_io_p->
max_num_cas)
3599 cas_io_p = &(shard_io_p->
ent[cas_id]);
3620 unsigned int curr_shard_id = 0;
3621 static unsigned int last_shard_id = 0;
3624 if ((shard_id < 0 && cas_id >= 0) || (shard_id >= proxy_Shard_io.
max_shard))
3634 if (shard_id >= 0 && cas_id >= 0)
3649 if (cas_io_p ==
NULL)
3657 if (cas_io_p ==
NULL)
3663 cas_id = cas_io_p->
cas_id;
3666 if (client_info_p ==
NULL)
3669 "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_cid, ctx_uid);
3673 (proxy_info_p, shm_as_p, cas_io_p->
shard_id, cas_io_p->
cas_id, client_info_p) ==
false)
3691 curr_shard_id = (
unsigned int) shard_id;
3695 curr_shard_id = (
unsigned int) (last_shard_id + 1) % proxy_Shard_io.
max_shard;
3697 last_shard_id = curr_shard_id;
3700 shard_io_p = &(proxy_Shard_io.
ent[curr_shard_id]);
3704 "Failed to allocate shard/cas. " "No available cas in this shard. " 3705 "Wait until shard has available cas. " "(cur_num_cas:%d, max_num_cas:%d)", shard_io_p->
cur_num_cas,
3726 if (0 > shard_id || shard_id >= proxy_Shard_io.
max_shard)
3732 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
3734 if (0 > cas_id || cas_id >= shard_io_p->
max_num_cas)
3740 cas_io_p = &(shard_io_p->
ent[cas_id]);
3788 if (waiter_p ==
NULL)
3794 PROXY_DEBUG_LOG (
"Context(context id:%d, context uid:%u) " "is waiting on shard(shard_id:%d).", ctx_cid, ctx_uid,
3805 if (shard_info_p !=
NULL)
3856 #if !defined(WINDOWS) 3863 struct sockaddr_un shard_sock_addr;
3874 if ((fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
3879 memset (&shard_sock_addr, 0,
sizeof (shard_sock_addr));
3880 shard_sock_addr.sun_family = AF_UNIX;
3881 strcpy (shard_sock_addr.sun_path, port_name);
3882 #ifdef _SOCKADDR_LEN 3883 len =
sizeof (shard_sock_addr.sun_len) +
sizeof (shard_sock_addr.sun_family) +
strlen (shard_sock_addr.sun_path) + 1;
3884 shard_sock_addr.sun_len = len;
3886 len =
strlen (shard_sock_addr.sun_path) +
sizeof (shard_sock_addr.sun_family) + 1;
3889 if (connect (fd, (
struct sockaddr *) &shard_sock_addr, len) != 0)
3922 len =
WRITESOCKET (fd, &tmp_proxy_id,
sizeof (tmp_proxy_id));
3923 if (len !=
sizeof (tmp_proxy_id))
3932 error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLPRI);
3948 #if defined(WINDOWS) 3950 proxy_io_inet_lsnr (
int port)
3953 int len, backlog_size;
3955 struct sockaddr_in shard_sock_addr;
3957 fd = socket (AF_INET, SOCK_STREAM, 0);
3963 if ((setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (
char *) &one,
sizeof (one))) < 0)
3969 memset (&shard_sock_addr, 0,
sizeof (
struct sockaddr_in));
3970 shard_sock_addr.sin_family = AF_INET;
3971 shard_sock_addr.sin_port =
htons ((
unsigned short) (port));
3972 len =
sizeof (
struct sockaddr_in);
3973 shard_sock_addr.sin_addr.s_addr =
htonl (INADDR_ANY);
3976 if (bind (fd, (
struct sockaddr *) &shard_sock_addr, len) < 0)
3984 if (listen (fd, backlog_size) < 0)
3998 int len, backlog_size;
3999 struct sockaddr_un shard_sock_addr;
4001 if ((fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
4006 memset (&shard_sock_addr, 0,
sizeof (shard_sock_addr));
4007 shard_sock_addr.sun_family = AF_UNIX;
4008 strcpy (shard_sock_addr.sun_path, unixd_sock_name);
4010 #ifdef _SOCKADDR_LEN 4011 len =
sizeof (shard_sock_addr.sun_len) +
sizeof (shard_sock_addr.sun_family) +
strlen (shard_sock_addr.sun_path) + 1;
4012 shard_sock_addr.sun_len = len;
4014 len =
strlen (shard_sock_addr.sun_path) +
sizeof (shard_sock_addr.sun_family) + 1;
4018 if (bind (fd, (
struct sockaddr *) &shard_sock_addr, len) < 0)
4026 if (listen (fd, backlog_size) < 0)
4043 #if defined(WINDOWS) 4050 fd = proxy_io_inet_lsnr (port);
4067 error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLPRI);
4082 #if defined(WINDOWS) 4084 proxy_io_client_lsnr (
void)
4093 fd = proxy_io_inet_lsnr (port);
4100 proxy_info_p->proxy_port = port;
4101 client_lsnr_fd = fd;
4102 FD_SET (client_lsnr_fd, &
allset);
4114 #if defined(WINDOWS) 4115 struct sockaddr_in shard_sock_addr;
4117 struct sockaddr_un shard_sock_addr;
4121 fd = accept (lsnr_fd, (
struct sockaddr *) &shard_sock_addr, &len);
4127 #if defined(WINDOWS) 4128 memcpy (&accept_ip_addr, &(shard_sock_addr.sin_addr), 4);
4140 #if defined(WINDOWS) 4142 proxy_io_client_accept (
SOCKET lsnr_fd)
4154 max_Socket = proxy_get_max_socket ();
4158 ep_Fd = epoll_create (max_Socket);
4165 ep_Event = (epoll_event *) calloc (max_Socket,
sizeof (
struct epoll_event));
4166 if (ep_Event ==
NULL)
4179 #if defined(WINDOWS) 4188 error = proxy_io_client_lsnr ();
4260 for (i = 0; i < proxy_Socket_io.
max_socket; i++)
4262 sock_io_p = &(proxy_Socket_io.
ent[
i]);
4276 #if defined(WINDOWS) 4304 #if defined(WINDOWS) 4319 timeout = 1000 / HZ;
4321 n = epoll_wait (ep_Fd, ep_Event, max_Socket, timeout);
4327 tv.tv_usec = 1000000 / HZ;
4334 perror (
"select error");
4345 for (i = 0; i < n; i++)
4349 if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
4354 else if (ep_Event[i].events & EPOLLIN || ep_Event[i].events & EPOLLPRI)
4375 if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
4380 else if (ep_Event[i].events & EPOLLIN || ep_Event[i].events & EPOLLPRI)
4387 sock_fd = ep_Event[
i].data.fd;
4391 if (sock_io_p->fd != sock_fd)
4397 if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
4399 if (ep_Event[i].events & EPOLLIN)
4410 if (ep_Event[i].events & EPOLLOUT)
4415 if (ep_Event[i].events & EPOLLIN)
4444 #if defined(WINDOWS) 4445 if (FD_ISSET (client_lsnr_fd, &
rset))
4447 client_fd = proxy_io_client_accept (client_lsnr_fd);
4463 for (i = 0; i <=
maxfd; i++)
4465 sock_io_p = &(proxy_Socket_io.
ent[
i]);
4488 'C',
'U',
'B',
'R',
'K',
4505 if (cli_io_p ==
NULL)
4510 return cli_io_p->driver_info;
4518 'C',
'U',
'B',
'R',
'K',
4529 if (sock_io_p ==
NULL)
4535 if (cli_io_p ==
NULL)
4540 return cli_io_p->driver_info;
4554 for (i = 0; i < proxy_Shard_io.
max_shard; i++)
4556 shard_io_p = &(proxy_Shard_io.
ent[
i]);
4560 if (waiter_p == NULL)
4563 if ((shard_info_p != NULL) && (shard_info_p->
waiter_count > 0))
4621 assert (shard_id >= 0 && cas_id >= 0);
4623 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
4625 if (0 > cas_id || cas_id >= shard_io_p->
max_num_cas)
4631 cas_io_p = &(shard_io_p->
ent[cas_id]);
4645 PROXY_DEBUG_LOG (
"Unexpected CAS status. (context id:%d, context uid:%d). " "CAS(%s). ", ctx_cid, ctx_uid,
4672 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
4676 cas_io_p = &(shard_io_p->
ent[
i]);
4696 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
4698 for (i = shard_io_p->
cur_num_cas - 1; i >= 0; i--)
4700 cas_io_p = &(shard_io_p->
ent[
i]);
4722 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
4731 cas_io_p = &(shard_io_p->
ent[
i]);
4768 static int last_shard_id = -1;
4772 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
4774 cas_io_p =
function (shard_id, cas_id, ctx_cid, ctx_uid);
4779 shard_id = last_shard_id;
4780 for (i = 0; i < proxy_Shard_io.
max_shard; i++)
4782 shard_id = (shard_id + 1) % proxy_Shard_io.
max_shard;
4783 shard_io_p = &(proxy_Shard_io.
ent[shard_id]);
4791 cas_io_p =
function (shard_id, cas_id, ctx_cid, ctx_uid);
4792 if (cas_io_p !=
NULL)
4806 last_shard_id = shard_id;
4811 if (cas_io_p !=
NULL)
4839 if (strcmp (db_name, user_p->
db_name))
4841 goto authorization_error;
4851 goto authorization_error;
4856 if (strcasecmp (db_name, user_p->
db_name))
4858 goto authorization_error;
4866 if (strcasecmp (db_user, user_p->
db_user) || strcmp (db_passwd, user_p->
db_password))
4868 goto authorization_error;
4874 authorization_error:
4875 snprintf (err_msg,
sizeof (err_msg),
"Authorization error.");
4879 db_user, db_passwd);
4888 if (error_code >= 0)
4890 assert (error_code == 0);
4926 proxy_get_max_socket (
void)
4941 proxy_add_epoll_event (
int fd,
unsigned int events)
4944 struct epoll_event ep_ev;
4948 memset (&ep_ev, 0,
sizeof (
struct epoll_event));
4950 ep_ev.events = events;
4951 error = epoll_ctl (ep_Fd, EPOLL_CTL_ADD, fd, &ep_ev);
4964 proxy_mod_epoll_event (
int fd,
unsigned int events)
4967 struct epoll_event ep_ev;
4971 memset (&ep_ev, 0,
sizeof (
struct epoll_event));
4973 ep_ev.events = events;
4974 error = epoll_ctl (ep_Fd, EPOLL_CTL_MOD, fd, &ep_ev);
4987 proxy_del_epoll_event (
int fd)
4990 struct epoll_event ep_ev;
4994 memset (&ep_ev, 0,
sizeof (
struct epoll_event));
4998 error = epoll_ctl (ep_Fd, EPOLL_CTL_DEL, fd, &ep_ev);
int proxy_io_set_established_by_ctx(T_PROXY_CONTEXT *ctx_p)
int proxy_io_make_end_tran_request(char *driver_info, char **buffer, bool commit)
static void proxy_init_net_buf(T_NET_BUF *net_buf)
static int proxy_io_unixd_lsnr(char *unixd_sock_name)
void proxy_socket_io_print(bool print_all)
static int proxy_process_client_register(T_SOCKET_IO *sock_io_p)
static int proxy_process_client_message(T_SOCKET_IO *sock_io_p)
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
void proxy_set_force_out_tran(char *msg)
static void proxy_shard_io_destroy(void)
int proxy_make_net_buf(T_NET_BUF *net_buf, int size, T_BROKER_VERSION client_version)
void set_data_length(char *buffer, int length)
void proxy_event_free(T_PROXY_EVENT *event_p)
void init_msg_header(MSG_HEADER *header)
int net_decode_str(char *msg, int msg_size, char *func_code, void ***ret_argv)
int proxy_io_make_shard_info(char *driver_info, char **buffer)
#define CAS_INFO_FLAG_MASK_FORCE_OUT_TRAN
void proxy_set_con_status_out_tran(char *msg)
int proxy_wakeup_context_by_shard(T_WAIT_CONTEXT *waiter_p, int shard_id, int cas_id)
static int proxy_cas_io_initialize(int shard_id, T_CAS_IO **cas_io_pp, int size)
void proxy_io_buffer_clear(T_IO_BUFFER *io_buffer)
T_SHM_SHARD_USER * shm_user_p
static int read_buffer(SOCKET sock_fd, char *buf, int size)
int net_buf_cp_str(T_NET_BUF *net_buf, const char *buf, int size)
#define SRV_CON_CLIENT_INFO_SIZE
#define PROXY_IO_FROM_CLIENT
static int proxy_socket_io_write_internal(T_SOCKET_IO *sock_io_p)
#define DRIVER_SESSION_SIZE
T_SHARD_INFO * shard_shm_find_shard_info(T_PROXY_INFO *proxy_info_p, int shard_id)
char * proxy_str_cas_io(T_CAS_IO *cas_io_p)
static void proxy_socket_io_read_from_client(T_SOCKET_IO *sock_io_p)
T_PROXY_CONTEXT * proxy_context_find(int cid, unsigned int uid)
static T_SOCKET_IO * proxy_socket_io_find(SOCKET fd)
#define PROXY_DEBUG_LOG(fmt, args...)
T_SHARD_CONN shard_conn[MAX_SHARD_CONN]
void proxy_shard_io_print(bool print_all)
T_PROXY_HANDLER proxy_Handler
#define CAS_CONV_ERROR_TO_OLD(V)
int proxy_io_make_client_acl_fail(char *driver_info, char **buffer)
#define CAS_PROTO_VER_MASK
unsigned int htonl(unsigned int from)
T_CLIENT_IO * proxy_client_io_find_by_ctx(int client_id, int ctx_cid, unsigned int ctx_uid)
#define CAS_CONNECTION_REPLY_SIZE
static void proxy_socket_io_read_error(T_SOCKET_IO *sock_io_p)
int proxy_cas_io_write(T_CAS_IO *cas_io_p, T_PROXY_EVENT *event_p)
char port_name[SHM_APPL_SERVER_NAME_MAX]
int proxy_io_process(void)
int access_control_check_right(T_SHM_APPL_SERVER *shm_as_p, char *dbname, char *dbuser, unsigned char *address)
static T_CAS_IO * proxy_find_idle_cas_by_conn_info(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
T_PROXY_CONTEXT * proxy_context_find_by_socket_client_io(T_SOCKET_IO *sock_io_p)
#define PROXY_INVALID_CONTEXT
void proxy_available_cas_wait_timer(void)
char database_user[SRV_CON_DBUSER_SIZE]
static void proxy_socket_io_write(T_SOCKET_IO *sock_io_p)
static int proxy_process_cas_register(T_SOCKET_IO *sock_io_p)
#define PROXY_CONV_ERR_TO_OLD
static void proxy_socket_io_read_from_client_first(T_SOCKET_IO *sock_io_p)
int get_msg_length(char *buffer)
int proxy_io_make_client_proxy_alive(char *driver_info, char **buffer)
#define NET_BUF_HEADER_MSG_SIZE
#define CAS_PROTO_INDICATOR
#define SRV_CON_MSG_IDX_PROTO_VERSION
int shard_queue_enqueue(T_SHARD_QUEUE *q, void *v)
#define PROXY_EVENT_FROM_CAS
void proxy_event_set_type_from(T_PROXY_EVENT *event_p, unsigned int type, int from_cas)
#define CAS_VER_TO_MAJOR(VER)
bool proxy_event_io_read_complete(T_PROXY_EVENT *event_p)
static int proxy_process_cas_message(T_SOCKET_IO *sock_io_p)
char * proxy_get_driver_info_by_ctx(T_PROXY_CONTEXT *ctx_p)
char database_user[SRV_CON_DBUSER_SIZE]
#define CAS_CONNECTION_REPLY_SIZE_V3
#define MSG_HEADER_INFO_SIZE
static void proxy_cas_io_free(int shard_id, int cas_id)
#define CAS_CONV_ERROR_TO_NEW(V)
int shard_queue_ordered_enqueue(T_SHARD_QUEUE *q, void *v, SHARD_COMP_FN comp_fn)
int proxy_access_log(struct timeval *start_time, int client_ip_addr, const char *dbname, const char *dbuser, bool accepted)
T_SOCKET_IO_GLOBAL proxy_Socket_io
static void proxy_socket_io_write_error(T_SOCKET_IO *sock_io_p)
static void proxy_client_io_destroy(void)
int proxy_io_make_get_db_version(char *driver_info, char **buffer)
#define SRV_CON_DBPASSWD_SIZE
#define CAS_MAKE_VER(MAJOR, MINOR, PATCH)
static int proxy_io_register_to_broker(void)
#define SRV_CON_DB_INFO_SIZE
struct t_socket_io T_SOCKET_IO
T_CAS_IO *(* T_FUNC_FIND_CAS)(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
T_APPL_SERVER_INFO * shard_shm_get_as_info(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id)
T_SHM_PROXY * shm_proxy_p
#define SRV_CON_DBNAME_SIZE
#define SRV_CON_VER_STR_MAX_SIZE
void proxy_io_destroy(void)
char db_password[SRV_CON_DBPASSWD_SIZE]
int proxy_io_make_close_req_handle_ok(char *driver_info, char **buffer, bool is_in_tran)
bool shard_shm_set_as_client_info_with_db_param(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id, T_CLIENT_INFO *client_info_p)
char cci_default_autocommit
static int proxy_client_add_waiter_by_shard(T_SHARD_IO *shard_io_p, int ctx_cid, int ctx_uid, int timeout)
#define CAS_PROTO_UNPACK_NET_VER(VER)
#define CAS_INFO_RESERVED_DEFAULT
static void proxy_socket_io_read(T_SOCKET_IO *sock_io_p)
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
static T_SOCKET_IO * proxy_socket_io_add(SOCKET fd, bool from_cas)
#define APPL_SERVER_CAS_MYSQL
char db_name[SRV_CON_DBNAME_SIZE]
INT64 num_connect_requests
#define APPL_SERVER_CAS_ORACLE
void net_buf_init(T_NET_BUF *net_buf, T_BROKER_VERSION client_version)
int net_buf_cp_int(T_NET_BUF *net_buf, int value, int *begin_offset)
void proxy_cas_io_free_by_ctx(int shard_id, int cas_id, int ctx_cid, int unsigned ctx_uid)
int proxy_context_send_error(T_PROXY_CONTEXT *ctx_p)
#define CAS_MAKE_PROTO_VER(DRIVER_INFO)
#define DOES_CLIENT_MATCH_THE_PROTOCOL(CLIENT, MATCH)
static void proxy_socket_io_destroy(void)
void proxy_client_io_free_by_ctx(int client_id, int ctx_cid, int ctx_uid)
#define CAS_VER_TO_PATCH(VER)
char * proxy_str_event(T_PROXY_EVENT *event_p)
int proxy_convert_error_code(int error_ind, int error_code, char *driver_info, T_BROKER_VERSION client_version, bool to_new)
struct t_client_io T_CLIENT_IO
void proxy_unset_force_out_tran(char *msg)
char database_passwd[SRV_CON_DBPASSWD_SIZE]
char database_passwd[SRV_CON_DBPASSWD_SIZE]
bool free_on_client_io_write
void shard_shm_init_client_info(T_CLIENT_INFO *client_info_p)
static int get_dbinfo_length(char *driver_info)
static int proxy_io_cas_lsnr(void)
static int proxy_socket_io_read_internal(T_SOCKET_IO *sock_io_p)
static void proxy_socket_io_read_from_cas_next(T_SOCKET_IO *sock_io_p)
int proxy_socket_set_write_event(T_SOCKET_IO *sock_io_p, T_PROXY_EVENT *event_p)
T_SHM_SHARD_CONN * shm_conn_p
#define SRV_CON_DB_INFO_SIZE_PRIOR_8_2_0
int tran_commit(bool retain_lock)
T_CAS_IO * proxy_cas_find_io_by_ctx(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
#define IS_INVALID_SOCKET(socket)
#define PROXY_CONNECTION_REPLY_SIZE(con_reply_size)
int proxy_waiter_comp_fn(const void *arg1, const void *arg2)
T_PROXY_INFO * proxy_info_p
void proxy_context_set_error_with_msg(T_PROXY_CONTEXT *ctx_p, int error_ind, int error_code, const char *error_msg)
static void proxy_socket_io_read_from_cas(T_SOCKET_IO *sock_io_p)
T_CLIENT_IO_GLOBAL proxy_Client_io
T_PROXY_CONTEXT proxy_Context
int proxy_io_make_no_error(char *driver_info, char **buffer)
static T_CLIENT_IO * proxy_client_io_new(SOCKET fd, char *driver_info)
#define CAS_INFO_FLAG_MASK_NEW_SESSION_ID
int proxy_event_realloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
#define CAS_ERROR_INDICATOR
int get_data_length(char *buffer)
#define READSOCKET(fd, buf, len)
static T_CAS_IO * proxy_cas_alloc_anything(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, T_FUNC_FIND_CAS function)
int proxy_io_make_ex_get_lock_timeout(char *driver_info, char **buffer, void *argv)
#define SRV_CON_DB_INFO_SIZE_PRIOR_8_4_0
int proxy_io_initialize(void)
static void proxy_socket_io_write_to_cas(T_SOCKET_IO *sock_io_p)
union t_socket_io::@41 id
static int proxy_process_cas_write_error(T_SOCKET_IO *sock_io_p)
struct t_socket_io::@41::@42 shard
#define strncpy_bufsize(buf, str)
static T_CAS_IO * proxy_find_idle_cas_by_desc(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
static void proxy_socket_io_read_from_cas_first(T_SOCKET_IO *sock_io_p)
INT64 num_connect_rejected
T_CLIENT_INFO * shard_shm_get_client_info(T_PROXY_INFO *proxy_info_p, int idx)
char * proxy_get_driver_info_by_fd(T_SOCKET_IO *sock_io_p)
static int proxy_process_client_conn_error(T_SOCKET_IO *sock_io_p)
char db_conn_info[MAX_CONN_INFO_LENGTH]
#define PROXY_IO_FROM_CAS
static struct sockaddr_un shard_sock_addr
unsigned short htons(unsigned short from)
void proxy_client_io_print(bool print_all)
#define WRITESOCKET(fd, buf, len)
#define SHARD_TEMPORARY_UNAVAILABLE
static T_CAS_IO * proxy_cas_alloc_by_shard_and_cas_id(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
void * shard_cqueue_dequeue(T_SHARD_CQUEUE *q)
char driver_version[SRV_CON_VER_STR_MAX_SIZE]
char * proxy_dup_msg(char *msg)
static int proxy_shard_io_initialize(void)
void proxy_client_io_free(T_CLIENT_IO *cli_io_p)
T_SHARD_IO_GLOBAL proxy_Shard_io
#define PROXY_INVALID_CAS
static T_SHARD_IO * proxy_shard_io_find(int shard_id)
int proxy_event_alloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
int proxy_io_make_error_msg(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran)
#define PROXY_CONV_ERR_TO_NEW
T_CAS_IO * proxy_cas_alloc_by_ctx(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, int timeout, int func_code)
#define SRV_CON_MSG_IDX_FUNCTION_FLAG
static SOCKET proxy_io_connect_to_broker(void)
void proxy_context_clear_error(T_PROXY_CONTEXT *ctx_p)
static T_CAS_IO * proxy_cas_io_new(int shard_id, int cas_id, SOCKET fd)
int proxy_io_make_set_db_parameter_ok(char *driver_info, char **buffer)
T_SHM_APPL_SERVER * shm_as_p
#define BROKER_RENEWED_ERROR_CODE
T_PROXY_EVENT * read_event
#define MSG_HEADER_MSG_SIZE
static int proxy_socket_io_new_client(SOCKET lsnr_fd)
static void error(const char *msg)
T_CLIENT_IO * proxy_client_io_find_by_fd(int client_id, SOCKET fd)
static T_CAS_IO * proxy_find_idle_cas_by_asc(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
T_PROXY_CONTEXT * proxy_context_new(void)
#define SRV_CON_DBSESS_ID_SIZE
#define GET_CAS_PORT(broker_port, proxy_index, proxy_max_count)
#define PROXY_RESERVED_FD
#define BROKER_SUPPORT_HOLDABLE_RESULT
void proxy_waiter_timeout(T_SHARD_QUEUE *waitq, INT64 *counter, int now)
static SOCKET proxy_io_accept(SOCKET lsnr_fd)
int proxy_client_io_write(T_CLIENT_IO *cli_io_p, T_PROXY_EVENT *event_p)
void proxy_cas_release_by_ctx(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
int proxy_io_close_all_fd(void)
char * proxy_str_context(T_PROXY_CONTEXT *ctx_p)
int proxy_io_make_check_cas_ok(char *driver_info, char **buffer)
int proxy_io_make_ex_get_isolation_level(char *driver_info, char **buffer, void *argv)
#define CAS_VER_TO_MINOR(VER)
void shard_queue_destroy(T_SHARD_QUEUE *q)
static void proxy_socket_io_clear(T_SOCKET_IO *sock_io_p)
#define SRV_CON_DBUSER_SIZE
void shard_stmt_del_all_srv_h_id_for_shard_cas(int shard_id, int cas_id)
static int proxy_process_cas_read_error(T_SOCKET_IO *sock_io_p)
#define PROXY_LOG(level, fmt, args...)
int proxy_io_make_client_dbinfo_ok(char *driver_info, char **buffer)
static SOCKET proxy_io_cas_accept(SOCKET lsnr_fd)
#define CAS_PROTO_TO_VER_STR(MSG_P, VER)
static int proxy_io_make_ex_get_int(char *driver_info, char **buffer, int *argv)
#define DOES_CLIENT_UNDERSTAND_THE_PROTOCOL(CLIENT, REQUIRE)
void shard_cqueue_destroy(T_SHARD_CQUEUE *q)
T_PROXY_EVENT * proxy_event_new_with_rsp(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
#define DBMS_ERROR_INDICATOR
#define HEALTH_CHECK_DUMMY_DB
int proxy_io_make_end_tran_ok(char *driver_info, char **buffer)
void * shard_queue_dequeue(T_SHARD_QUEUE *q)
char port_name[SHM_PROXY_NAME_MAX]
static int proxy_process_cas_response(T_SOCKET_IO *sock_io_p)
int shard_cqueue_enqueue(T_SHARD_CQUEUE *q, void *e)
#define CAS_CONNECTION_REPLY_SIZE_PRIOR_PROTOCOL_V3
static T_CAS_IO * proxy_cas_io_find_by_fd(int shard_id, int cas_id, SOCKET fd)
void * shard_queue_peek_value(T_SHARD_QUEUE *q)
static void proxy_set_conn_info(int func_code, int ctx_cid, int ctx_uid, int shard_id, int cas_id)
int proxy_io_make_client_conn_ok(char *driver_info, char **buffer)
#define APPL_SERVER_CAS_MYSQL51
#define SHARD_NET_BUF_ALLOC_SIZE
static int shard_io_set_fl(int fd, int flags)
unsigned int ntohl(unsigned int from)
int recv_fd(int fd, int *rid, char *driver_info)
int shard_queue_initialize(T_SHARD_QUEUE *q)
const char * rel_build_number(void)
#define GET_CLIENT_PORT(broker_port, proxy_index)
char db_user[SRV_CON_DBUSER_SIZE]
void proxy_context_free(T_PROXY_CONTEXT *ctx_p)
static int proxy_process_client_write_error(T_SOCKET_IO *sock_io_p)
static int proxy_process_client_request(T_SOCKET_IO *sock_io_p)
void proxy_set_con_status_in_tran(char *msg)
static void proxy_socket_io_read_from_client_next(T_SOCKET_IO *sock_io_p)
#define CAS_INFO_FLAG_MASK_AUTOCOMMIT
#define PROXY_INVALID_SHARD
static int proxy_check_authorization(T_PROXY_CONTEXT *ctx_p, const char *db_name, const char *db_user, const char *db_passwd)
static int proxy_process_cas_conn_error(T_SOCKET_IO *sock_io_p)
int proxy_io_make_end_tran_abort(char *driver_info, char **buffer)
T_PROXY_EVENT * proxy_event_new(unsigned int type, int from_cas)
static int proxy_socket_io_initialize(void)
int proxy_socket_io_delete(SOCKET fd)
#define PROXY_EVENT_FROM_CLIENT
int proxy_io_make_close_req_handle_out_tran_ok(char *driver_info, char **buffer)
int proxy_io_make_check_cas(char *driver_info, char **buffer)
int net_buf_cp_byte(T_NET_BUF *net_buf, char ch)
static void proxy_socket_io_write_to_client(T_SOCKET_IO *sock_io_p)
void proxy_event_set_context(T_PROXY_EVENT *event_p, int cid, unsigned int uid)
int proxy_io_make_cursor_close_out_tran_ok(char *driver_info, char **buffer)
char * proxy_str_client_io(T_CLIENT_IO *cli_io_p)
int shard_cqueue_initialize(T_SHARD_CQUEUE *q, int size)
char db_name[MAX_DBNAME_LENGTH]
int proxy_io_make_con_close_ok(char *driver_info, char **buffer)
#define REALLOC(PTR, SIZE)
static int proxy_process_client_read_error(T_SOCKET_IO *sock_io_p)
T_PROXY_EVENT * write_event
static void proxy_client_check_waiter_and_wakeup(T_SHARD_IO *shard_io_p, T_CAS_IO *cas_io_p)
T_WAIT_CONTEXT * proxy_waiter_new(int ctx_cid, unsigned int ctx_uid, int timeout)
static int proxy_client_io_initialize(void)