CUBRID Engine  latest
shard_proxy_io.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 
20 /*
21  * shard_proxy_io.c -
22  *
23  */
24 
25 #ident "$Id$"
26 
27 
28 #include <assert.h>
29 #include <signal.h>
30 #include <string.h>
31 #if defined(LINUX)
32 #include <sys/epoll.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #endif /* LINUX */
36 
37 #include "porting.h"
38 #include "shard_proxy_io.h"
39 #include "shard_proxy_handler.h"
40 #include "shard_proxy_function.h"
41 #include "cas_protocol.h"
42 #include "cas_error.h"
43 #include "shard_shm.h"
44 #include "broker_acl.h"
45 
46 #ifndef min
47 #define min(a,b) ((a) < (b) ? (a) : (b))
48 #endif
49 #ifndef max
50 #define max(a,b) ((a) > (b) ? (a) : (b))
51 #endif
52 
53 #if defined (SUPPRESS_STRLEN_WARNING)
54 #define strlen(s1) ((int) strlen(s1))
55 #endif /* defined (SUPPRESS_STRLEN_WARNING) */
56 
57 #if defined(WINDOWS)
58 #define O_NONBLOCK FIONBIO
59 #define HZ 1000
60 #endif /* WINDOWS */
61 
62 #define PROC_TYPE_CLIENT 0
63 #define PROC_TYPE_CAS 1
64 #define PROC_TYPE_BROKER 2
65 
66 #define READ_TYPE 1
67 #define WRITE_TYPE 2
68 
69 #define CLIENT_READ_ERROR(i) io_error(i, PROC_TYPE_CLIENT, READ_TYPE)
70 #define CLIENT_WRITE_ERROR(i) io_error(i, PROC_TYPE_CLIENT, WRITE_TYPE)
71 #define CAS_READ_ERROR(i) io_error(i, PROC_TYPE_CAS, READ_TYPE)
72 #define CAS_WRITE_ERROR(i) io_error(i, PROC_TYPE_CAS, WRITE_TYPE)
73 
74 #define MAX_NUM_NEW_CLIENT 5
75 
76 #define PROXY_START_PORT 1
77 #define GET_CLIENT_PORT(broker_port, proxy_index) (broker_port) + PROXY_START_PORT + (proxy_index)
78 #define GET_CAS_PORT(broker_port, proxy_index, proxy_max_count) (broker_port) + PROXY_START_PORT + (proxy_max_count) + (proxy_index)
79 
81 extern T_SHM_PROXY *shm_proxy_p;
85 extern int proxy_id;
86 
89 
90 typedef T_CAS_IO *(*T_FUNC_FIND_CAS) (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);
91 
92 extern void proxy_term (void);
93 extern bool proxy_Keep_running;
94 
95 extern const char *rel_build_number (void);
96 
97 static int shard_io_set_fl (int fd, int flags);
98 #if defined (ENABLE_UNUSED_FUNCTION)
99 static int shard_io_clr_fl (int fd, int flags);
100 static int shard_io_setsockbuf (int fd, int size);
101 #endif /* ENABLE_UNUSED_FUNCTION */
102 
103 static void proxy_socket_io_clear (T_SOCKET_IO * sock_io_p);
104 static int proxy_socket_io_initialize (void);
105 static void proxy_socket_io_destroy (void);
106 static T_SOCKET_IO *proxy_socket_io_add (SOCKET fd, bool from_cas);
108 static int proxy_socket_io_new_client (SOCKET lsnr_fd);
109 
110 static int proxy_process_client_register (T_SOCKET_IO * sock_io_p);
111 static int proxy_process_client_request (T_SOCKET_IO * sock_io_p);
112 static int proxy_process_client_conn_error (T_SOCKET_IO * sock_io_p);
113 static int proxy_process_client_write_error (T_SOCKET_IO * sock_io_p);
114 static int proxy_process_client_read_error (T_SOCKET_IO * sock_io_p);
115 static int proxy_process_client_message (T_SOCKET_IO * sock_io_p);
116 static int proxy_process_cas_register (T_SOCKET_IO * sock_io_p);
117 static int proxy_process_cas_response (T_SOCKET_IO * sock_io_p);
118 static int proxy_process_cas_conn_error (T_SOCKET_IO * sock_io_p);
119 static int proxy_process_cas_write_error (T_SOCKET_IO * sock_io_p);
120 static int proxy_process_cas_read_error (T_SOCKET_IO * sock_io_p);
121 static int proxy_process_cas_message (T_SOCKET_IO * sock_io_p);
122 static int proxy_socket_io_write_internal (T_SOCKET_IO * sock_io_p);
123 static void proxy_socket_io_write_to_cas (T_SOCKET_IO * sock_io_p);
124 static void proxy_socket_io_write_to_client (T_SOCKET_IO * sock_io_p);
125 
126 static int proxy_socket_io_read_internal (T_SOCKET_IO * sock_io_p);
127 static void proxy_socket_io_read_from_cas_next (T_SOCKET_IO * sock_io_p);
128 static void proxy_socket_io_read_from_cas_first (T_SOCKET_IO * sock_io_p);
129 static void proxy_socket_io_read_from_cas (T_SOCKET_IO * sock_io_p);
130 static void proxy_socket_io_read_from_client_next (T_SOCKET_IO * sock_io_p);
131 static void proxy_socket_io_read_from_client_first (T_SOCKET_IO * sock_io_p);
132 static void proxy_socket_io_read_from_client (T_SOCKET_IO * sock_io_p);
133 static void proxy_socket_io_write (T_SOCKET_IO * sock_io_p);
134 static void proxy_socket_io_write_error (T_SOCKET_IO * sock_io_p);
135 static void proxy_socket_io_read (T_SOCKET_IO * sock_io_p);
136 static void proxy_socket_io_read_error (T_SOCKET_IO * sock_io_p);
137 
138 static int proxy_client_io_initialize (void);
139 static void proxy_client_io_destroy (void);
140 static T_CLIENT_IO *proxy_client_io_new (SOCKET fd, char *driver_info);
141 
142 static int proxy_cas_io_initialize (int shard_id, T_CAS_IO ** cas_io_pp, int size);
143 static int proxy_shard_io_initialize (void);
144 static void proxy_shard_io_destroy (void);
145 static T_SHARD_IO *proxy_shard_io_find (int shard_id);
146 static T_CAS_IO *proxy_cas_io_new (int shard_id, int cas_id, SOCKET fd);
147 static void proxy_cas_io_free (int shard_id, int cas_id);
148 static T_CAS_IO *proxy_cas_io_find_by_fd (int shard_id, int cas_id, SOCKET fd);
149 
150 static int proxy_client_add_waiter_by_shard (T_SHARD_IO * shard_io_p, int ctx_cid, int ctx_uid, int timeout);
151 static void proxy_client_check_waiter_and_wakeup (T_SHARD_IO * shard_io_p, T_CAS_IO * cas_io_p);
152 
153 #if defined(WINDOWS)
154 static int proxy_io_inet_lsnr (int port);
155 static int proxy_io_client_lsnr (void);
156 static SOCKET proxy_io_client_accept (SOCKET lsnr_fd);
157 #else /* WINDOWS */
158 static SOCKET proxy_io_connect_to_broker (void);
159 static int proxy_io_register_to_broker (void);
160 static int proxy_io_unixd_lsnr (char *unixd_sock_name);
161 #endif /* !WINDOWS */
162 static int proxy_io_cas_lsnr (void);
163 static SOCKET proxy_io_accept (SOCKET lsnr_fd);
164 static SOCKET proxy_io_cas_accept (SOCKET lsnr_fd);
165 
166 static void proxy_init_net_buf (T_NET_BUF * net_buf);
167 
168 static int proxy_io_make_ex_get_int (char *driver_info, char **buffer, int *argv);
169 static void proxy_set_conn_info (int func_code, int ctx_cid, int ctx_uid, int shard_id, int cas_id);
170 static T_CAS_IO *proxy_find_idle_cas_by_asc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);
171 static T_CAS_IO *proxy_find_idle_cas_by_desc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);
172 static T_CAS_IO *proxy_find_idle_cas_by_conn_info (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);
173 static T_CAS_IO *proxy_cas_alloc_by_shard_and_cas_id (int client_id, int shard_id, int cas_id, int ctx_cid,
174  unsigned int ctx_uid);
175 static T_CAS_IO *proxy_cas_alloc_anything (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid,
176  T_FUNC_FIND_CAS function);
177 static int proxy_check_authorization (T_PROXY_CONTEXT * ctx_p, const char *db_name, const char *db_user,
178  const char *db_passwd);
179 
180 #if defined(LINUX)
181 static int proxy_get_max_socket (void);
182 static int proxy_add_epoll_event (int fd, unsigned int events);
183 static int proxy_mod_epoll_event (int fd, unsigned int events);
184 static int proxy_del_epoll_event (int fd);
185 
186 static int max_Socket = 0;
187 static int ep_Fd = INVALID_SOCKET;
188 static struct epoll_event *ep_Event = NULL;
189 #else /* LINUX */
190 int maxfd = 0;
192 #endif /* !LINUX */
193 
194 #if defined(WINDOWS)
195 int broker_port = 0;
196 int accept_ip_addr = 0;
197 SOCKET client_lsnr_fd = INVALID_SOCKET;
198 #else /* WINDOWS */
200 #endif /* !WINDOWS */
202 
206 
207 /* SHARD ONLY SUPPORT client_version.8.2.0 ~ */
209 
210 /***
211  THIS FUNCTION IS LOCATED IN ORIGINAL CAS FILES
212  ***/
213 #if 1 /* SHARD TODO -- remove this functions -- tigger */
214 void
216 {
217  header->msg_body_size_ptr = (int *) (header->buf);
218  header->info_ptr = (char *) (header->buf + MSG_HEADER_MSG_SIZE);
219 
220  *(header->msg_body_size_ptr) = 0;
225 }
226 
227 void
228 set_data_length (char *buffer, int length)
229 {
230  assert (buffer);
231 
232  /* length : first 4 bytes */
233  *((int *) buffer) = htonl (length);
234  return;
235 }
236 
237 int
238 get_data_length (char *buffer)
239 {
240  int length;
241 
242  assert (buffer);
243 
244  /* length : first 4 bytes */
245  length = *((int *) (buffer));
246  return ntohl (length);
247 }
248 
249 int
250 get_msg_length (char *buffer)
251 {
252  return (get_data_length (buffer) + MSG_HEADER_SIZE);
253 }
254 
255 static int
256 get_dbinfo_length (char *driver_info)
257 {
258  T_BROKER_VERSION client_version = CAS_MAKE_PROTO_VER (driver_info);
259 
260  if (client_version < CAS_MAKE_VER (8, 2, 0))
261  {
263  }
264  else if (client_version < CAS_MAKE_VER (8, 4, 0))
265  {
267  }
268  return SRV_CON_DB_INFO_SIZE;
269 }
270 
271 int
272 net_decode_str (char *msg, int msg_size, char *func_code, void ***ret_argv)
273 {
274  int remain_size = msg_size;
275  char *cur_p = msg;
276  char *argp;
277  int i_val;
278  void **argv = NULL;
279  int argc = 0;
280 
281  *ret_argv = (void **) NULL;
282 
283  if (remain_size < 1)
284  return CAS_ER_COMMUNICATION;
285 
286  *func_code = *cur_p;
287  cur_p += 1;
288  remain_size -= 1;
289 
290  while (remain_size > 0)
291  {
292  if (remain_size < 4)
293  {
294  FREE_MEM (argv);
295  return CAS_ER_COMMUNICATION;
296  }
297  argp = cur_p;
298  memcpy ((char *) &i_val, cur_p, 4);
299  i_val = ntohl (i_val);
300  remain_size -= 4;
301  cur_p += 4;
302 
303  if (remain_size < i_val)
304  {
305  FREE_MEM (argv);
306  return CAS_ER_COMMUNICATION;
307  }
308 
309  argc++;
310  argv = (void **) REALLOC (argv, sizeof (void *) * argc);
311  if (argv == NULL)
312  return CAS_ER_NO_MORE_MEMORY;
313 
314  argv[argc - 1] = argp;
315 
316  cur_p += i_val;
317  remain_size -= i_val;
318  }
319 
320  *ret_argv = argv;
321  return argc;
322 }
323 #endif /* if 1 */
324 
325 static int
326 shard_io_set_fl (int fd, int flags)
327 { /* flags are file status flags to turn on */
328 #if defined(WINDOWS)
329  u_long argp;
330 
331  if (flags == O_NONBLOCK)
332  {
333  argp = 1; /* 1:non-blocking, 0:blocking */
334  if (ioctlsocket ((SOCKET) fd, FIONBIO, &argp) == SOCKET_ERROR)
335  {
336  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to ioctlsocket(%d, FIONBIO, %u). " "(error:%d).", fd, argp,
337  SOCKET_ERROR);
338  return -1;
339  }
340  }
341 #else
342  int val;
343 
344  if ((val = fcntl (fd, F_GETFL, 0)) < 0)
345  {
346  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_GETFL). (val:%d, errno:%d).", fd, val, errno);
347  return -1;
348  }
349 
350  val |= flags; /* turn on flags */
351 
352  if (fcntl (fd, F_SETFL, val) < 0)
353  {
354  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_SETFL, %d). (errno:%d).", fd, val, errno);
355  return -1;
356  }
357 #endif
358  return 1;
359 }
360 
361 #if defined (ENABLE_UNUSED_FUNCTION)
362 static int
363 shard_io_clr_fl (int fd, int flags)
364 { /* flags are file status flags to turn on */
365 #if defined(WINDOWS)
366  u_long argp;
367 
368  if (flags == O_NONBLOCK)
369  {
370  argp = 0; /* 1:non-blocking, 0:blocking */
371  if (ioctlsocket ((SOCKET) fd, FIONBIO, &argp) == SOCKET_ERROR)
372  {
373  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to ioctlsocket(%d, FIONBIO, %u). (error:%d).", fd, argp,
374  SOCKET_ERROR);
375  return -1;
376  }
377  }
378 #else
379  int val;
380 
381  if ((val = fcntl (fd, F_GETFL, 0)) < 0)
382  {
383  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_GETFL). (val:%d, errno:%d).", fd, val, errno);
384  return -1;
385  }
386 
387  val &= ~flags; /* turn off flags */
388 
389  if (fcntl (fd, F_SETFL, val) < 0)
390  {
391  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_SETFL, %d). (errno:%d).", fd, val, errno);
392  return -1;
393  }
394 #endif
395  return 1;
396 }
397 
398 static int
399 shard_io_setsockbuf (int fd, int size)
400 {
401  int n, val;
402  socklen_t len;
403 
404  val = size;
405  len = sizeof (int);
406  if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, (char *) &val, &len) < 0)
407  {
408  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to getsockopt(%d, SO_SND_BUF, %d, %d). " "(errno:%d).", fd, val, len,
409  errno);
410  return -1;
411  }
412  if (val < size)
413  {
414  val = size;
415  len = sizeof (int);
416  setsockopt (fd, SOL_SOCKET, SO_SNDBUF, (char *) &val, len);
417  }
418 
419  val = size;
420  len = sizeof (int);
421  if (getsockopt (fd, SOL_SOCKET, SO_RCVBUF, (char *) &val, &len) < 0)
422  {
423  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to getsockopt(%d, SO_RCV_BUF, %d, %d). " "(errno:%d).", fd, val, len,
424  errno);
425 
426  return -1;
427  }
428  if (val < size)
429  {
430  val = size;
431  len = sizeof (int);
432  setsockopt (fd, SOL_SOCKET, SO_RCVBUF, (char *) &val, len);
433  }
434 
435  return 1;
436 }
437 #endif /* ENABLE_UNUSED_FUNCTION */
438 
439 char *
440 proxy_dup_msg (char *msg)
441 {
442  char *p;
443  int length;
444 
445  length = get_msg_length (msg);
446 
447  p = (char *) malloc (length * sizeof (char));
448  if (p)
449  {
450  memcpy (p, msg, length);
451  }
452 
453  return p;
454 }
455 
456 void
458 {
459  int pos;
460 
461  assert (msg);
462 
464 
465  msg[pos] = CAS_INFO_STATUS_ACTIVE;
466 
467  return;
468 }
469 
470 void
472 {
473  int pos;
474 
475  assert (msg);
476 
478 
479  msg[pos] = CAS_INFO_STATUS_INACTIVE;
480 
481  return;
482 }
483 
484 void
486 {
487  int pos;
488 
489  assert (msg);
490 
492 
494 
495  return;
496 }
497 
498 void
500 {
501  int pos;
502 
503  assert (msg);
504 
506 
508 
509  return;
510 }
511 
512 int
513 proxy_make_net_buf (T_NET_BUF * net_buf, int size, T_BROKER_VERSION client_version)
514 {
515  net_buf_init (net_buf, client_version);
516 
517  net_buf->data = (char *) MALLOC (size);
518  if (net_buf->data == NULL)
519  {
521  "Not enough virtual memory. " "Failed to alloc net buffer. " "(errno:%d, size:%d).", errno, size);
522  return -1;
523  }
524  net_buf->alloc_size = size;
525 
526  return 0;
527 }
528 
529 static void
531 {
532  MSG_HEADER msg_header;
533 
534  assert (net_buf);
535  assert (net_buf->data);
536 
537  init_msg_header (&msg_header);
538 
539  *(msg_header.msg_body_size_ptr) = htonl (net_buf->data_size);
540 
541  /* length */
542  memcpy (net_buf->data, msg_header.buf, NET_BUF_HEADER_MSG_SIZE);
543 
544  /* cas info */
545  /* 0:cas status */
547 
548  /* 3:cci default autocommit */
551 
553 
554  memcpy (net_buf->data + NET_BUF_HEADER_MSG_SIZE, msg_header.info_ptr, CAS_INFO_SIZE);
555 
556  return;
557 }
558 
559 static int
560 proxy_io_make_ex_get_int (char *driver_info, char **buffer, int *argv)
561 {
562  int error;
563  T_NET_BUF net_buf;
564  T_BROKER_VERSION client_version;
565 
566  assert (buffer);
567  assert (*buffer == NULL);
568  assert (argv != NULL);
569 
570  client_version = CAS_MAKE_PROTO_VER (driver_info);
571 
572  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
573  if (error)
574  {
575  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
576  goto error_return;
577  }
578 
579  proxy_init_net_buf (&net_buf);
580 
581  /* error code */
582  net_buf_cp_int (&net_buf, 0 /* success */ , NULL);
583  /* int arg1 */
584  net_buf_cp_int (&net_buf, *argv, NULL);
585 
586  *buffer = net_buf.data;
587  set_data_length (*buffer, net_buf.data_size);
588 
589  net_buf.data = NULL;
590 
591  return (net_buf.data_size + MSG_HEADER_SIZE);
592 
593 error_return:
594  *buffer = NULL;
595 
596  return -1;
597 }
598 
599 /* error */
600 int
601 proxy_io_make_error_msg (char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg,
602  char is_in_tran)
603 {
604  int error;
605  T_NET_BUF net_buf;
606  T_BROKER_VERSION client_version;
607 
608  assert (buffer);
609  assert (*buffer == NULL);
610 
611  client_version = CAS_MAKE_PROTO_VER (driver_info);
612 
613  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
614  if (error)
615  {
616  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. " "(error:%d).", error);
617  goto error_return;
618  }
619 
620  proxy_init_net_buf (&net_buf);
621 
622  if (is_in_tran)
623  {
625  }
626 
627  if (error_ind != CAS_NO_ERROR)
628  {
629  if (client_version >= CAS_MAKE_VER (8, 3, 0))
630  {
631  /* error indicator */
632  net_buf_cp_int (&net_buf, error_ind, NULL);
633  }
634  }
635 
636  error_code = proxy_convert_error_code (error_ind, error_code, driver_info, client_version, PROXY_CONV_ERR_TO_OLD);
637 
638  /* error code */
639  net_buf_cp_int (&net_buf, error_code, NULL);
640 
641  if (error_msg && error_msg[0])
642  {
643  /* error messgae */
644  net_buf_cp_str (&net_buf, error_msg, strlen (error_msg) + 1);
645  }
646 
647  *buffer = net_buf.data;
648  set_data_length (*buffer, net_buf.data_size);
649 
650  net_buf.data = NULL;
651 
652  PROXY_DEBUG_LOG ("make error to send to the client. " "(error_ind:%d, error_code:%d, errro_msg:%s)", error_ind,
653  error_code, (error_msg && error_msg[0]) ? error_msg : "-");
654 
655  return (net_buf.data_size + MSG_HEADER_SIZE);
656 
657 error_return:
658  *buffer = NULL;
659 
660  return -1;
661 }
662 
663 int
664 proxy_io_make_no_error (char *driver_info, char **buffer)
665 {
666  return proxy_io_make_error_msg (driver_info, buffer, CAS_NO_ERROR, CAS_NO_ERROR, NULL, false);
667 }
668 
669 int
670 proxy_io_make_con_close_ok (char *driver_info, char **buffer)
671 {
672  return proxy_io_make_no_error (driver_info, buffer);
673 }
674 
675 int
676 proxy_io_make_end_tran_ok (char *driver_info, char **buffer)
677 {
678  return proxy_io_make_no_error (driver_info, buffer);
679 }
680 
681 int
682 proxy_io_make_check_cas_ok (char *driver_info, char **buffer)
683 {
684  return proxy_io_make_no_error (driver_info, buffer);
685 }
686 
687 int
688 proxy_io_make_set_db_parameter_ok (char *driver_info, char **buffer)
689 {
690  return proxy_io_make_no_error (driver_info, buffer);
691 }
692 
693 int
694 proxy_io_make_ex_get_isolation_level (char *driver_info, char **buffer, void *argv)
695 {
696  return proxy_io_make_ex_get_int (driver_info, buffer, (int *) argv);
697 }
698 
699 int
700 proxy_io_make_ex_get_lock_timeout (char *driver_info, char **buffer, void *argv)
701 {
702  return proxy_io_make_ex_get_int (driver_info, buffer, (int *) argv);
703 }
704 
705 int
706 proxy_io_make_end_tran_request (char *driver_info, char **buffer, bool commit)
707 {
708  int error;
709  T_NET_BUF net_buf;
710  unsigned char func_code;
711  unsigned char tran_commit;
712  T_BROKER_VERSION client_version;
713 
714  assert (buffer);
715  assert (*buffer == NULL);
716 
717  client_version = CAS_MAKE_PROTO_VER (driver_info);
718  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
719  if (error)
720  {
721  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
722  goto error_return;
723  }
724 
725  proxy_init_net_buf (&net_buf);
726 
727  func_code = CAS_FC_END_TRAN;
728  tran_commit = (commit) ? CCI_TRAN_COMMIT : CCI_TRAN_ROLLBACK;
729 
730  /* function code */
731  net_buf_cp_byte (&net_buf, (unsigned char) func_code);
732 
733  /* arg1 : commit or rollback */
734  net_buf_cp_int (&net_buf, NET_SIZE_BYTE, NULL);
735  net_buf_cp_byte (&net_buf, tran_commit);
736 
737  *buffer = net_buf.data;
738  set_data_length (*buffer, net_buf.data_size);
739 
740  net_buf.data = NULL;
741 
742  return (net_buf.data_size + MSG_HEADER_SIZE);
743 
744 error_return:
745  *buffer = NULL;
746 
747  return -1;
748 }
749 
750 #if defined (ENABLE_UNUSED_FUNCTION)
751 int
752 proxy_io_make_end_tran_commit (char **buffer)
753 {
754  return proxy_io_make_end_tran_request (buffer, true);
755 }
756 #endif /* ENABLE_UNUSED_FUNCTION */
757 
758 int
759 proxy_io_make_end_tran_abort (char *driver_info, char **buffer)
760 {
761  return proxy_io_make_end_tran_request (driver_info, buffer, false);
762 }
763 
764 int
765 proxy_io_make_close_req_handle_ok (char *driver_info, char **buffer, bool is_in_tran)
766 {
767  int error;
768  char *p;
769  T_NET_BUF net_buf;
770  T_BROKER_VERSION client_version;
771 
772  assert (buffer);
773  assert (*buffer == NULL);
774 
775  client_version = CAS_MAKE_PROTO_VER (driver_info);
776  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
777  if (error)
778  {
779  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
780  goto error_return;
781  }
782 
783  proxy_init_net_buf (&net_buf);
784 
785  p = (char *) (net_buf.data + MSG_HEADER_MSG_SIZE);
786  if (is_in_tran)
787  {
789  }
790  else
791  {
793  }
794 
795  /* error code */
796  net_buf_cp_int (&net_buf, 0 /* success */ , NULL);
797 
798  *buffer = net_buf.data;
799  set_data_length (*buffer, net_buf.data_size);
800 
801  net_buf.data = NULL;
802 
803  return (net_buf.data_size + MSG_HEADER_SIZE);
804 
805 error_return:
806  *buffer = NULL;
807 
808  return -1;
809 }
810 
811 #if defined (ENABLE_UNUSED_FUNCTION)
812 int
813 proxy_io_make_close_req_handle_in_tran_ok (char **buffer)
814 {
815  return proxy_io_make_close_req_handle_ok (buffer, true /* in_tran */ );
816 }
817 #endif /* ENABLE_UNUSED_FUNCTION */
818 
819 int
820 proxy_io_make_close_req_handle_out_tran_ok (char *driver_info, char **buffer)
821 {
822  return proxy_io_make_close_req_handle_ok (driver_info, buffer, false /* out_tran */ );
823 }
824 
825 int
826 proxy_io_make_cursor_close_out_tran_ok (char *driver_info, char **buffer)
827 {
828  return proxy_io_make_close_req_handle_ok (driver_info, buffer, false /* out_tran */ );
829 }
830 
831 int
832 proxy_io_make_get_db_version (char *driver_info, char **buffer)
833 {
834  int error;
835  T_NET_BUF net_buf;
836  char *p;
837  T_BROKER_VERSION client_version;
838 
839  assert (buffer);
840  assert (*buffer == NULL);
841 
842  client_version = CAS_MAKE_PROTO_VER (driver_info);
843  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
844  if (error)
845  {
846  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
847  goto error_return;
848  }
849 
850  proxy_init_net_buf (&net_buf);
851 
852  p = (char *) rel_build_number ();
853 
854  net_buf_cp_int (&net_buf, 0 /* ok */ , NULL);
855  if (p == NULL)
856  {
857  net_buf_cp_byte (&net_buf, '\0');
858  }
859  else
860  {
861  net_buf_cp_str (&net_buf, p, strlen (p) + 1);
862  }
863 
864  *buffer = net_buf.data;
865  set_data_length (*buffer, net_buf.data_size);
866 
867  net_buf.data = NULL;
868 
869  return (net_buf.data_size + MSG_HEADER_SIZE);
870 
871 error_return:
872  *buffer = NULL;
873 
874  return -1;
875 }
876 
877 int
878 proxy_io_make_client_conn_ok (char *driver_info, char **buffer)
879 {
880  (*buffer) = (char *) malloc (sizeof (int));
881 
882  if ((*buffer) == NULL)
883  {
884  return 0;
885  }
886 
887  memset ((*buffer), 0, sizeof (int)); /* dummy port id */
888 
889  return sizeof (int);
890 }
891 
892 int
893 proxy_io_make_client_proxy_alive (char *driver_info, char **buffer)
894 {
895  int error;
896  T_NET_BUF net_buf;
897  T_BROKER_VERSION client_version;
898 
899  assert (buffer);
900 
901  client_version = CAS_MAKE_PROTO_VER (driver_info);
902  error = proxy_make_net_buf (&net_buf, MSG_HEADER_SIZE, client_version);
903  if (error)
904  {
905  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
906  *buffer = NULL;
907  return -1;
908  }
909 
910  net_buf.data_size = 0;
911  proxy_init_net_buf (&net_buf);
912 
913  *buffer = net_buf.data;
914  set_data_length (*buffer, net_buf.data_size);
915 
916  net_buf.data = NULL;
917 
918  return MSG_HEADER_SIZE;
919 }
920 
921 int
922 proxy_io_make_client_dbinfo_ok (char *driver_info, char **buffer)
923 {
924  char *p;
925  char dbms_type;
926  int reply_size, reply_nsize;
927  int cas_info_size;
928  int proxy_pid;
930  T_BROKER_VERSION client_version;
931 
932  assert (buffer);
933 
934  memset (broker_info, 0, BROKER_INFO_SIZE);
935 
936  client_version = CAS_MAKE_PROTO_VER (driver_info);
937  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V5))
938  {
939  if (proxy_info_p->appl_server == APPL_SERVER_CAS_ORACLE)
940  {
941  dbms_type = CAS_PROXY_DBMS_ORACLE;
942  }
943  else if (proxy_info_p->appl_server == APPL_SERVER_CAS_MYSQL
944  || proxy_info_p->appl_server == APPL_SERVER_CAS_MYSQL51)
945  {
946  dbms_type = CAS_PROXY_DBMS_MYSQL;
947  }
948  else
949  {
950  dbms_type = CAS_PROXY_DBMS_CUBRID;
951  }
952  }
953  else
954  {
955  dbms_type = CAS_DBMS_CUBRID;
956  }
957 
958  cas_bi_make_broker_info (broker_info, dbms_type, shm_as_p->statement_pooling, shm_as_p->cci_pconnect);
959 
960  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V4))
961  {
962  reply_size = CAS_CONNECTION_REPLY_SIZE;
963  }
964  else if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V3))
965  {
966  reply_size = CAS_CONNECTION_REPLY_SIZE_V3;
967  }
968  else
969  {
971  }
972 
973  *buffer = (char *) malloc (PROXY_CONNECTION_REPLY_SIZE (reply_size) * sizeof (char));
974  if (*buffer == NULL)
975  {
976  return -1;
977  }
978 
979  reply_nsize = htonl (reply_size);
980  cas_info_size = htonl (CAS_INFO_SIZE);
981  proxy_pid = htonl (getpid ());
982 
983  /* length */
984  p = *(buffer);
985  memcpy (p, &reply_nsize, PROTOCOL_SIZE);
986  p += PROTOCOL_SIZE;
987 
988  /* cas info */
989  memcpy (p, &cas_info_size, CAS_INFO_SIZE);
990  p += CAS_INFO_SIZE;
991 
992  /* proxy pid */
993  memcpy (p, &proxy_pid, CAS_PID_SIZE);
994  p += CAS_PID_SIZE;
995 
996  /* brokerinfo */
997  memcpy (p, broker_info, BROKER_INFO_SIZE);
998  p += BROKER_INFO_SIZE;
999 
1000  /* proxy id */
1001  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V4))
1002  {
1003  int v = htonl (proxy_id + 1);
1004 
1005  memcpy (p, &v, CAS_PID_SIZE);
1006  p += CAS_PID_SIZE;
1007  }
1008 
1009  /* session id */
1010  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V3))
1011  {
1012  memset ((char *) p, 0, DRIVER_SESSION_SIZE);
1013  p += DRIVER_SESSION_SIZE;
1014  }
1015  else
1016  {
1017  memset ((char *) p, 0, SESSION_ID_SIZE);
1018  p += SESSION_ID_SIZE;
1019  }
1020 
1021  return PROXY_CONNECTION_REPLY_SIZE (reply_size);
1022 }
1023 
1024 int
1025 proxy_io_make_client_acl_fail (char *driver_info, char **buffer)
1026 {
1027  char err_msg[1024];
1028  snprintf (err_msg, sizeof (err_msg), "Authorization error.(Address is rejected)");
1029 
1030  return proxy_io_make_error_msg (driver_info, buffer, DBMS_ERROR_INDICATOR, CAS_ER_NOT_AUTHORIZED_CLIENT, err_msg,
1031  false);
1032 }
1033 
1034 int
1035 proxy_io_make_shard_info (char *driver_info, char **buffer)
1036 {
1037  int error;
1038  int length;
1039  int shard_index;
1040  T_NET_BUF net_buf;
1041  T_SHARD_CONN *shard_conn_p;
1042  T_BROKER_VERSION client_version;
1043 
1044  assert (buffer);
1045  assert (*buffer == NULL);
1046 
1047  client_version = CAS_MAKE_PROTO_VER (driver_info);
1048  error = proxy_make_net_buf (&net_buf, MSG_HEADER_SIZE, client_version);
1049  if (error)
1050  {
1051  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
1052  *buffer = NULL;
1053  return -1;
1054  }
1055 
1056  net_buf.data_size = 0;
1057  proxy_init_net_buf (&net_buf);
1058 
1059  /* shard count */
1060  net_buf_cp_int (&net_buf, proxy_info_p->max_shard, NULL);
1061 
1062  /* N * shard info */
1063  for (shard_index = 0; shard_index < shm_conn_p->num_shard_conn; shard_index++)
1064  {
1065  shard_conn_p = &shm_conn_p->shard_conn[shard_index];
1066 
1067  /* shard id */
1068  net_buf_cp_int (&net_buf, shard_index, NULL);
1069 
1070  /* shard db name */
1071  length = strlen (shard_conn_p->db_name) + 1 /* NTS */ ;
1072  net_buf_cp_int (&net_buf, length, NULL);
1073  net_buf_cp_str (&net_buf, shard_conn_p->db_name, length);
1074 
1075  /* shard db server */
1076  length = strlen (shard_conn_p->db_conn_info) + 1 /* NTS */ ;
1077  net_buf_cp_int (&net_buf, length, NULL);
1078  net_buf_cp_str (&net_buf, shard_conn_p->db_conn_info, length);
1079  }
1080 
1081  *buffer = net_buf.data;
1082  set_data_length (*buffer, net_buf.data_size);
1083 
1084  net_buf.data = NULL;
1085  return (net_buf.data_size + MSG_HEADER_SIZE);
1086 }
1087 
1088 int
1089 proxy_io_make_check_cas (char *driver_info, char **buffer)
1090 {
1091  int error;
1092  T_NET_BUF net_buf;
1093  T_BROKER_VERSION client_version;
1094 
1095  assert (buffer);
1096  assert (*buffer == NULL);
1097 
1098  client_version = CAS_MAKE_PROTO_VER (driver_info);
1099  error = proxy_make_net_buf (&net_buf, MSG_HEADER_SIZE, client_version);
1100  if (error)
1101  {
1102  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
1103  *buffer = NULL;
1104  return -1;
1105  }
1106 
1107  net_buf.data_size = 0;
1108  proxy_init_net_buf (&net_buf);
1109 
1110  /* function code */
1111  net_buf_cp_byte (&net_buf, (unsigned char) CAS_FC_CHECK_CAS);
1112 
1113  *buffer = net_buf.data;
1114  set_data_length (*buffer, net_buf.data_size);
1115 
1116  proxy_set_force_out_tran (*buffer);
1117 
1118  net_buf.data = NULL;
1119  return (net_buf.data_size + MSG_HEADER_SIZE);
1120 }
1121 
1122 void
1124 {
1125  assert (io_buffer);
1126 
1127  io_buffer->length = 0;
1128  io_buffer->offset = 0;
1129  FREE_MEM (io_buffer->data);
1130 
1131  return;
1132 }
1133 
1134 static void
1136 {
1137  assert (sock_io_p);
1138 
1139 /* UNUSED */
1140 #if 0
1141  ENTER_FUNC ();
1142 
1143  PROXY_DEBUG_LOG ("free socket io.(fd:%d,from_cas:%s,shard/cas:%d/%d).\n", sock_io_p->fd,
1144  (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS) ? "cas" : "client", sock_io_p->id.shard.shard_id,
1145  sock_io_p->id.shard.cas_id);
1146 #endif
1147 
1148  sock_io_p->fd = INVALID_SOCKET;
1149  sock_io_p->status = SOCK_IO_IDLE;
1150  sock_io_p->ip_addr = 0;
1151  sock_io_p->from_cas = PROXY_EVENT_FROM_CLIENT;
1152  sock_io_p->id.shard.shard_id = PROXY_INVALID_SHARD;
1153  sock_io_p->id.shard.cas_id = PROXY_INVALID_CAS;
1154 
1155  if (sock_io_p->read_event)
1156  {
1157  proxy_event_free (sock_io_p->read_event);
1158  sock_io_p->read_event = NULL;
1159  }
1160 
1161  if (sock_io_p->write_event)
1162  {
1163  proxy_event_free (sock_io_p->write_event);
1164  sock_io_p->write_event = NULL;
1165  }
1166 
1167 /* UNUSED */
1168 #if 0
1169  EXIT_FUNC ();
1170 #endif
1171  return;
1172 }
1173 
1174 
1175 static int
1177 {
1178  int i;
1179  int size;
1180  T_SOCKET_IO *sock_io_p;
1181 
1182  if (proxy_Socket_io.ent)
1183  {
1184  /* alredy initialized */
1185  return 0;
1186  }
1187 
1188 #if defined(LINUX)
1189  proxy_Socket_io.max_socket = max_Socket;
1190 #else /* LINUX */
1191  proxy_Socket_io.max_socket = MAX_FD;
1192 #endif /* !LINUX */
1193  proxy_Socket_io.cur_socket = 0;
1194 
1195  size = sizeof (T_SOCKET_IO) * proxy_Socket_io.max_socket;
1196  proxy_Socket_io.ent = (T_SOCKET_IO *) malloc (size);
1197  if (proxy_Socket_io.ent == NULL)
1198  {
1200  "Not enough virtual memory. " "Failed to alloc socket entry. " "(errno:%d, size:%d).", errno, size);
1201  return -1;
1202  }
1203  memset (proxy_Socket_io.ent, 0, size);
1204 
1205  for (i = 0; i < proxy_Socket_io.max_socket; i++)
1206  {
1207  sock_io_p = &(proxy_Socket_io.ent[i]);
1208 
1209  proxy_socket_io_clear (sock_io_p);
1210  }
1211 
1212  return 0;
1213 }
1214 
1215 static void
1217 {
1218  int i;
1219  T_SOCKET_IO *sock_io_p;
1220 
1221  if (proxy_Socket_io.ent == NULL)
1222  {
1223  return;
1224  }
1225 
1226  for (i = 0; i < proxy_Socket_io.max_socket; i++)
1227  {
1228  sock_io_p = &(proxy_Socket_io.ent[i]);
1229 
1230  if (sock_io_p->fd != INVALID_SOCKET)
1231  {
1232 #if defined(LINUX)
1233  if (sock_io_p->status != SOCK_IO_CLOSE_WAIT)
1234  {
1235  (void) proxy_del_epoll_event (sock_io_p->fd);
1236  }
1237 #else /* LINUX */
1238  FD_CLR (sock_io_p->fd, &allset);
1239  FD_CLR (sock_io_p->fd, &wnewset);
1240 #endif /* !LINUX */
1241  CLOSE_SOCKET (sock_io_p->fd);
1242  }
1243  proxy_socket_io_clear (sock_io_p);
1244  }
1245 
1246  proxy_Socket_io.max_socket = 0;
1247  proxy_Socket_io.cur_socket = 0;
1248  FREE_MEM (proxy_Socket_io.ent);
1249 
1250  return;
1251 }
1252 
1253 #if defined(PROXY_VERBOSE_DEBUG)
1254 void
1255 proxy_socket_io_print (bool print_all)
1256 {
1257  int i;
1258  int client_id, shard_id, cas_id;
1259  char *from_cas;
1260  T_SOCKET_IO *sock_io_p;
1261 
1262  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* SOCKET IO *");
1263  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_socket", proxy_Socket_io.max_socket);
1264  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "cur_socket", proxy_Socket_io.cur_socket);
1265 
1266  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s %-5s %-8s %-16s %-8s %-10s %-10s %-10s", "idx", "fd", "status",
1267  "ip_addr", "cas", "client_id", "shard_id", "cas_id");
1268  if (proxy_Socket_io.ent)
1269  {
1270  for (i = 0; i < proxy_Socket_io.max_socket; i++)
1271  {
1272  sock_io_p = &(proxy_Socket_io.ent[i]);
1273  if (!print_all && IS_INVALID_SOCKET (sock_io_p->fd))
1274  {
1275  continue;
1276  }
1277 
1278  if (sock_io_p->from_cas)
1279  {
1280  client_id = PROXY_INVALID_CAS;
1281  shard_id = sock_io_p->id.shard.shard_id;
1282  cas_id = sock_io_p->id.shard.cas_id;
1283  from_cas = (char *) "cas";
1284  }
1285  else
1286  {
1287  client_id = sock_io_p->id.client_id;
1288  shard_id = PROXY_INVALID_SHARD;
1289  cas_id = PROXY_INVALID_CAS;
1290  from_cas = (char *) "client";
1291  }
1292 
1293  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d] %-5d %-8d %-16s %-8s %-10d %-10d %-10d", i,
1294  sock_io_p->fd, sock_io_p->status, inet_ntoa (*((struct in_addr *) &sock_io_p->ip_addr)), from_cas,
1295  client_id, shard_id, cas_id);
1296 
1297  }
1298  }
1299 
1300  return;
1301 }
1302 #endif /* PROXY_VERBOSE_DEBUG */
1303 
1304 static T_SOCKET_IO *
1305 proxy_socket_io_add (SOCKET fd, bool from_cas)
1306 {
1307 #if defined(LINUX)
1308  int error;
1309 #endif
1310  T_SOCKET_IO *sock_io_p;
1311 
1312  assert (proxy_Socket_io.ent);
1313  if (proxy_Socket_io.ent == NULL)
1314  {
1315  return NULL;
1316  }
1317 
1318  if (fd >= proxy_Socket_io.max_socket)
1319  {
1320  PROXY_LOG (PROXY_LOG_MODE_ERROR, "socket fd exceeds max socket fd. (fd:%d, max socket fd:%d).", fd,
1321  proxy_Socket_io.max_socket);
1322  return NULL;
1323  }
1324 
1325  if (proxy_Socket_io.cur_socket >= proxy_Socket_io.max_socket)
1326  {
1328  "Number of socket entry exceeds max_socket. " "(current_socket:%d, max_socket:%d).",
1329  proxy_Socket_io.cur_socket, proxy_Socket_io.max_socket);
1330  return NULL;
1331  }
1332 
1333  sock_io_p = &(proxy_Socket_io.ent[fd]);
1334 
1335  if (sock_io_p->fd > INVALID_SOCKET || sock_io_p->status != SOCK_IO_IDLE)
1336  {
1337  assert (false);
1338  proxy_Keep_running = false;
1339  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Receive duplicated socket fd. " "(received socket:%d, status:%d)",
1340  sock_io_p->fd, sock_io_p->status);
1341  return NULL;
1342  }
1343 
1344  shard_io_set_fl (fd, O_NONBLOCK);
1345 
1346 #if defined(LINUX)
1347  error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLOUT);
1348  if (error < 0)
1349  {
1350  CLOSE_SOCKET (fd);
1351  return NULL;
1352  }
1353 #else /* LINUX */
1354  FD_SET (fd, &allset);
1355  maxfd = max (maxfd, fd);
1356 #endif /* !LINUX */
1357 
1358  sock_io_p->fd = fd;
1359  sock_io_p->status = SOCK_IO_REG_WAIT;
1360  sock_io_p->ip_addr = 0;
1361 
1362  sock_io_p->from_cas = from_cas;
1363 
1364  sock_io_p->id.shard.shard_id = PROXY_INVALID_SHARD;
1365  sock_io_p->id.shard.cas_id = PROXY_INVALID_CAS;
1366 
1367  assert (sock_io_p->read_event == NULL);
1368  assert (sock_io_p->write_event == NULL);
1369  sock_io_p->read_event = NULL;
1370  sock_io_p->write_event = NULL;
1371 
1372  proxy_Socket_io.cur_socket++;
1373 
1374  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New socket io created. (fd:%d).", fd);
1375 #if defined(PROXY_VERBOSE_DEBUG)
1376  proxy_socket_io_print (false);
1377 #endif /* PROXY_VERBOSE_DEBUG */
1378 
1379  return sock_io_p;
1380 }
1381 
1382 int
1384 {
1385  T_SOCKET_IO *sock_io_p;
1386 
1387  ENTER_FUNC ();
1388 
1389  assert (proxy_Socket_io.ent);
1390  assert (proxy_Socket_io.cur_socket > 0);
1391 
1392  if (fd >= proxy_Socket_io.max_socket)
1393  {
1394  PROXY_LOG (PROXY_LOG_MODE_ERROR, "socket fd exceeds max socket fd. (fd:%d, max_socket_fd:%d).", fd,
1395  proxy_Socket_io.max_socket);
1396  EXIT_FUNC ();
1397  return -1;
1398  }
1399 
1400  sock_io_p = &(proxy_Socket_io.ent[fd]);
1401 
1402  if (sock_io_p->fd != INVALID_SOCKET)
1403  {
1404 #if defined(LINUX)
1405  if (sock_io_p->status != SOCK_IO_CLOSE_WAIT)
1406  {
1407  (void) proxy_del_epoll_event (sock_io_p->fd);
1408  }
1409 #else /* LINUX */
1410  FD_CLR (sock_io_p->fd, &allset);
1411  FD_CLR (sock_io_p->fd, &wnewset);
1412 #endif /* !LINUX */
1413 
1414  PROXY_DEBUG_LOG ("Close socket. (fd:%d).", sock_io_p->fd);
1415  CLOSE_SOCKET (sock_io_p->fd);
1416  }
1417  proxy_socket_io_clear (sock_io_p);
1418  proxy_Socket_io.cur_socket--;
1419 
1420  EXIT_FUNC ();
1421  return 0;
1422 }
1423 
1424 int
1426 {
1427  T_CLIENT_IO *cli_io_p = NULL;
1428  T_SOCKET_IO *sock_io_p = NULL;
1429 
1430  /* find client io */
1431  cli_io_p = proxy_client_io_find_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);
1432  if (cli_io_p == NULL)
1433  {
1434  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find client. " "(client_id:%d, context id:%d, context uid:%u).",
1435  ctx_p->client_id, ctx_p->cid, ctx_p->uid);
1436 
1437  EXIT_FUNC ();
1438  return -1;
1439  }
1440 
1441  sock_io_p = proxy_socket_io_find (cli_io_p->fd);
1442  assert (sock_io_p != NULL);
1443 
1444  sock_io_p->status = SOCK_IO_ESTABLISHED;
1445  return 0;
1446 }
1447 
1448 static T_SOCKET_IO *
1450 {
1451  T_SOCKET_IO *sock_io_p;
1452 
1453  assert (proxy_Socket_io.ent);
1454 
1455  sock_io_p = &(proxy_Socket_io.ent[fd]);
1456  return (sock_io_p->status == SOCK_IO_IDLE) ? NULL : sock_io_p;
1457 }
1458 
1459 int
1461 {
1462 #if defined(LINUX)
1463  int error;
1464 #endif
1465 
1466  assert (sock_io_p);
1467  assert (event_p);
1468 
1469  if (sock_io_p->write_event)
1470  {
1471  /* the procotol between driver and proxy must be broken */
1472  goto error_return;
1473  }
1474 
1475 #if defined(LINUX)
1476  error = proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN | EPOLLOUT);
1477  if (error < 0)
1478  {
1479  goto error_return;
1480  }
1481 #else /* LINUX */
1482  FD_SET (sock_io_p->fd, &wnewset);
1483 #endif /* !LINUX */
1484 
1485  event_p->buffer.offset = 0; // set offset to start of the write buffer
1486  sock_io_p->write_event = event_p;
1487 
1488  return 0;
1489 
1490 error_return:
1491 
1492  if (sock_io_p->write_event)
1493  {
1494  proxy_event_free (sock_io_p->write_event);
1495  sock_io_p->write_event = NULL;
1496  }
1497  return -1;
1498 }
1499 
1500 static int
1502 {
1503  int client_ip;
1504  SOCKET client_fd;
1505  T_CLIENT_IO *cli_io_p;
1506  T_SOCKET_IO *sock_io_p;
1507  T_CLIENT_INFO *client_info_p;
1508  int proxy_status = 0;
1509  char driver_info[SRV_CON_CLIENT_INFO_SIZE];
1510 #if !defined (WINDOWS)
1511  T_PROXY_EVENT *event_p;
1512  int length;
1513  int error;
1514 #endif
1515 
1516  proxy_info_p->num_connect_requests++;
1517 
1518 #if defined(WINDOWS)
1519  client_fd = lsnr_fd;
1520  client_ip = accept_ip_addr;
1521  memset (driver_info, 0, SRV_CON_CLIENT_INFO_SIZE);
1524 #else /* WINDOWS */
1525  client_fd = recv_fd (lsnr_fd, &client_ip, driver_info);
1526  if (client_fd < 0)
1527  {
1528  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to receive socket fd. " "(lsnf_fd:%d, client_fd:%d).", lsnr_fd,
1529  client_fd);
1530 
1531  /* shard_broker must be abnormal status */
1532  proxy_Keep_running = false;
1533  return -1;
1534  }
1535 
1536  proxy_status = 0;
1537  length = WRITESOCKET (lsnr_fd, &proxy_status, sizeof (proxy_status));
1538  if (length != sizeof (proxy_status))
1539  {
1540  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send proxy status to broker. " "(lsnr_fd:%d).", lsnr_fd);
1541  CLOSE_SOCKET (client_fd);
1542  return -1;
1543  }
1544 #endif /* !WINDOWS */
1545 
1546  ENTER_FUNC ();
1547 
1548  sock_io_p = proxy_socket_io_add (client_fd, PROXY_IO_FROM_CLIENT);
1549  if (sock_io_p == NULL)
1550  {
1551  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to add socket entry. (client fd:%d).", client_fd);
1552  CLOSE_SOCKET (client_fd);
1553  return -1;
1554  }
1555 
1556  cli_io_p = proxy_client_io_new (client_fd, driver_info);
1557  if (cli_io_p == NULL)
1558  {
1559  proxy_socket_io_delete (client_fd);
1560  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to create client entry. (client fd:%d).", client_fd);
1561  return -1;
1562  }
1563 
1564  sock_io_p->ip_addr = client_ip;
1565  sock_io_p->id.client_id = cli_io_p->client_id;
1566 
1567  /* set shared memory T_CLIENT_INFO */
1568  client_info_p = shard_shm_get_client_info (proxy_info_p, cli_io_p->client_id);
1569  client_info_p->client_id = cli_io_p->client_id;
1570  client_info_p->client_ip = client_ip;
1571  client_info_p->connect_time = time (NULL);
1572  memcpy (client_info_p->driver_info, cli_io_p->driver_info, SRV_CON_CLIENT_INFO_SIZE);
1573 
1574 #if !defined(WINDOWS)
1575  /* send client_conn_ok to the client */
1576  event_p =
1579  if (event_p == NULL)
1580  {
1581  proxy_socket_io_read_error (sock_io_p);
1582  EXIT_FUNC ();
1583  return -1;
1584  }
1585 
1586  /* set write event to the socket io */
1587  error = proxy_socket_set_write_event (sock_io_p, event_p);
1588  if (error)
1589  {
1590  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "(fd:%d). event(%s).", client_fd,
1591  proxy_str_event (event_p));
1592 
1593  proxy_event_free (event_p);
1594  event_p = NULL;
1595 
1596  proxy_socket_io_read_error (sock_io_p);
1597 
1598  EXIT_FUNC ();
1599  return -1;
1600  }
1601 #endif /* !WINDOWS */
1602 
1603  EXIT_FUNC ();
1604 
1605  return 0;
1606 }
1607 
1608 static int
1610 {
1611  int error = 0;
1612  char *db_name = NULL, *db_user = NULL, *db_passwd = NULL;
1613  char *url = NULL, *db_sessionid = NULL;
1614  struct timeval client_start_time;
1615  T_PROXY_CONTEXT *ctx_p;
1617  T_PROXY_EVENT *event_p;
1618  T_CLIENT_INFO *client_info_p;
1619  unsigned char *ip_addr;
1620  char len;
1621  char *driver_info;
1622  T_BROKER_VERSION client_version;
1623  char driver_version[SRV_CON_VER_STR_MAX_SIZE];
1624  char err_msg[256];
1625 
1626  ENTER_FUNC ();
1627 
1628  assert (sock_io_p);
1629  assert (sock_io_p->read_event);
1630 
1631  driver_info = proxy_get_driver_info_by_fd (sock_io_p);
1632 
1633  gettimeofday (&client_start_time, NULL);
1634 
1635  read_buffer = &(sock_io_p->read_event->buffer);
1636  assert (read_buffer); // __FOR_DEBUG
1637 
1638  db_name = read_buffer->data;
1639  db_name[SRV_CON_DBNAME_SIZE - 1] = '\0';
1640 
1641  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
1642  if (ctx_p == NULL)
1643  {
1644  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find context for this socket. (fd:%d)", sock_io_p->fd);
1645  error = -1;
1646  goto clear_event_and_return;
1647  }
1648 
1649  if (ctx_p->error_ind != CAS_NO_ERROR)
1650  {
1651  /* Skip authorization and process error */
1652  goto connection_established;
1653  }
1654 
1655  if (strcmp (db_name, HEALTH_CHECK_DUMMY_DB) == 0)
1656  {
1657  PROXY_DEBUG_LOG ("Incoming health check request from client.");
1658  /* send proxy_alive response to the client */
1659  event_p =
1662  if (event_p == NULL)
1663  {
1664  proxy_socket_io_read_error (sock_io_p);
1665  EXIT_FUNC ();
1666  return -1;
1667  }
1668 
1669  /* set write event to the socket io */
1670  error = proxy_socket_set_write_event (sock_io_p, event_p);
1671  if (error)
1672  {
1673  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "(fd:%d). event(%s).", sock_io_p->fd,
1674  proxy_str_event (event_p));
1675 
1676  proxy_event_free (event_p);
1677  event_p = NULL;
1678 
1679  proxy_socket_io_read_error (sock_io_p);
1680  EXIT_FUNC ();
1681  return -1;
1682  }
1683  goto clear_event_and_return;
1684  }
1685 
1686  db_user = db_name + SRV_CON_DBNAME_SIZE;
1687  db_user[SRV_CON_DBUSER_SIZE - 1] = '\0';
1688  if (db_user[0] == '\0')
1689  {
1690  strcpy (db_user, "PUBLIC");
1691  }
1692 
1693  db_passwd = db_user + SRV_CON_DBUSER_SIZE;
1694  db_passwd[SRV_CON_DBPASSWD_SIZE - 1] = '\0';
1695 
1696  client_version = CAS_MAKE_PROTO_VER (driver_info);
1697  if (client_version >= CAS_MAKE_VER (8, 2, 0))
1698  {
1699  url = db_passwd + SRV_CON_DBPASSWD_SIZE;
1700  url[SRV_CON_URL_SIZE - 1] = '\0';
1701  driver_version[0] = '\0';
1702  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V5))
1703  {
1704  len = *(url + strlen (url) + 1);
1705  if (len > 0 && len < SRV_CON_VER_STR_MAX_SIZE)
1706  {
1707  memcpy (driver_version, url + strlen (url) + 2, (int) len);
1708  driver_version[(int) len] = '\0';
1709  }
1710  else
1711  {
1712  snprintf (driver_version, SRV_CON_VER_STR_MAX_SIZE, "PROTOCOL V%d",
1713  (int) (CAS_PROTO_VER_MASK & client_version));
1714  }
1715  }
1716  else if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V1))
1717  {
1718  char *ver;
1719 
1720  CAS_PROTO_TO_VER_STR (&ver, (int) (CAS_PROTO_VER_MASK & client_version));
1721 
1722  strncpy_bufsize (driver_version, ver);
1723  }
1724  else
1725  {
1726  snprintf (driver_version, SRV_CON_VER_STR_MAX_SIZE, "%d.%d.%d", CAS_VER_TO_MAJOR (client_version),
1727  CAS_VER_TO_MINOR (client_version), CAS_VER_TO_PATCH (client_version));
1728  }
1729  client_info_p = shard_shm_get_client_info (proxy_info_p, sock_io_p->id.client_id);
1730  if (client_info_p)
1731  {
1732  memcpy (client_info_p->driver_version, driver_version, sizeof (driver_version));
1733  }
1734  }
1735 
1736  /* SHARD DO NOT SUPPORT SESSION */
1737  if (client_version >= CAS_MAKE_VER (8, 4, 0))
1738  {
1739  db_sessionid = url + SRV_CON_URL_SIZE;
1740  db_sessionid[SRV_CON_DBSESS_ID_SIZE - 1] = '\0';
1741  }
1742 
1743  ip_addr = (unsigned char *) (&sock_io_p->ip_addr);
1744 
1745  /* check acl */
1746  if (shm_as_p->access_control)
1747  {
1748  if (access_control_check_right (shm_as_p, db_name, db_user, ip_addr) < 0)
1749  {
1750  proxy_info_p->num_connect_rejected++;
1751 
1752  snprintf (err_msg, sizeof (err_msg), "Authorization error.(Address is rejected)");
1754 
1755  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Authentication failure. " "(db_name:[%s], db_user:[%s], db_passwd:[%s]).",
1756  db_name, db_user, db_passwd);
1757 
1758  if (shm_as_p->access_log == ON)
1759  {
1760  proxy_access_log (&client_start_time, sock_io_p->ip_addr, (db_name) ? (const char *) db_name : "-",
1761  (db_user) ? (const char *) db_user : "-", true);
1762  }
1763 
1764  goto connection_established;
1765  }
1766  }
1767 
1768  error = proxy_check_authorization (ctx_p, db_name, db_user, db_passwd);
1769  if (error < 0)
1770  {
1771  goto connection_established;
1772  }
1773 
1774  strncpy (ctx_p->database_user, db_user, SRV_CON_DBUSER_SIZE - 1);
1775  strncpy (ctx_p->database_passwd, db_passwd, SRV_CON_DBPASSWD_SIZE - 1);
1776 
1777  if (proxy_info_p->fixed_shard_user == false)
1778  {
1779  event_p =
1782  if (event_p == NULL)
1783  {
1784  proxy_socket_io_read_error (sock_io_p);
1785 
1786  EXIT_FUNC ();
1787  return -1;
1788  }
1789 
1790  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);
1791 
1792  error = shard_queue_enqueue (&proxy_Handler.cli_rcv_q, (void *) event_p);
1793  if (error)
1794  {
1795  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
1796  proxy_str_context (ctx_p), proxy_str_event (event_p));
1797 
1798  proxy_event_free (event_p);
1799  event_p = NULL;
1800 
1801  proxy_context_free (ctx_p);
1802 
1803  EXIT_FUNC ();
1804  return -1;
1805  }
1806 
1807  goto clear_event_and_return;
1808  }
1809 
1810 connection_established:
1811  if (ctx_p->error_ind != CAS_NO_ERROR)
1812  {
1813  /*
1814  * Process error message if exists.
1815  * context will be freed after sending error message.
1816  */
1817  proxy_context_send_error (ctx_p);
1818  proxy_context_clear_error (ctx_p);
1819 
1820  ctx_p->free_on_client_io_write = true;
1821  }
1822  else
1823  {
1824  /* send dbinfo_ok to the client */
1825  event_p =
1828  if (event_p == NULL)
1829  {
1830  proxy_socket_io_read_error (sock_io_p);
1831  EXIT_FUNC ();
1832  return -1;
1833  }
1834 
1835  /* set write event to the socket io */
1836  error = proxy_socket_set_write_event (sock_io_p, event_p);
1837  if (error)
1838  {
1839  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "(fd:%d). event(%s).", sock_io_p->fd,
1840  proxy_str_event (event_p));
1841 
1842  proxy_event_free (event_p);
1843  event_p = NULL;
1844 
1845  proxy_socket_io_read_error (sock_io_p);
1846  EXIT_FUNC ();
1847  return -1;
1848  }
1849 
1850  ctx_p->is_connected = true;
1851 
1852  sock_io_p->status = SOCK_IO_ESTABLISHED;
1853 
1854  if (shm_as_p->access_log == ON)
1855  {
1856  proxy_access_log (&client_start_time, sock_io_p->ip_addr, (db_name) ? (const char *) db_name : "-",
1857  (db_user) ? (const char *) db_user : "-", true);
1858  }
1859  }
1860 
1861 clear_event_and_return:
1862  assert (sock_io_p->read_event);
1863  proxy_event_free (sock_io_p->read_event);
1864  sock_io_p->read_event = NULL;
1865 
1866  EXIT_FUNC ();
1867  return error;
1868 }
1869 
1870 static int
1872 {
1873  int error;
1874  T_PROXY_CONTEXT *ctx_p;
1875  T_PROXY_EVENT *event_p;
1876 
1877  ENTER_FUNC ();
1878 
1879  assert (sock_io_p);
1880  assert (sock_io_p->read_event);
1881  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CLIENT);
1882 
1883  event_p = sock_io_p->read_event;
1884  sock_io_p->read_event = NULL;
1885 
1886  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
1887  if (ctx_p == NULL)
1888  {
1889  proxy_event_free (event_p);
1890  event_p = NULL;
1891  return -1;
1892  }
1893 
1895  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);
1896 
1897  error = shard_queue_enqueue (&proxy_Handler.cli_rcv_q, (void *) event_p);
1898  if (error)
1899  {
1900  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
1901  proxy_str_context (ctx_p), proxy_str_event (event_p));
1902 
1903  proxy_event_free (event_p);
1904  event_p = NULL;
1905 
1906  proxy_context_free (ctx_p);
1907 
1908  EXIT_FUNC ();
1909  return -1;
1910  }
1911 
1912  EXIT_FUNC ();
1913  return 0;
1914 }
1915 
1916 static int
1918 {
1919  int error = 0;
1920  T_PROXY_CONTEXT *ctx_p;
1921  T_PROXY_EVENT *event_p;
1922 
1923  ENTER_FUNC ();
1924 
1925  assert (sock_io_p);
1926  assert (sock_io_p->fd);
1927  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CLIENT);
1928 
1929  sock_io_p->status = SOCK_IO_CLOSE_WAIT;
1930 
1931 #if defined(LINUX)
1932  error = proxy_del_epoll_event (sock_io_p->fd);
1933  if (error < 0)
1934  {
1935  return -1;
1936  }
1937 #else /* LINUX */
1938  /* disable socket read/write */
1939  FD_CLR (sock_io_p->fd, &allset);
1940  FD_CLR (sock_io_p->fd, &wnewset);
1941 #endif /* !LINUX */
1942 
1943  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
1944  if (ctx_p == NULL)
1945  {
1946  return -1;
1947  }
1948 
1950  if (event_p == NULL)
1951  {
1952  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make new event. (%s, %s).", "PROXY_EVENT_CLIENT_CONN_ERROR",
1953  "PROXY_EVENT_FROM_CLIENT");
1954 
1955  proxy_context_free (ctx_p);
1956 
1957  EXIT_FUNC ();
1958  return -1;
1959  }
1960  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);
1961 
1962  error = shard_queue_enqueue (&proxy_Handler.cli_rcv_q, (void *) event_p);
1963  if (error)
1964  {
1965  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
1966  proxy_str_context (ctx_p), proxy_str_event (event_p));
1967 
1968  proxy_event_free (event_p);
1969  event_p = NULL;
1970 
1971  proxy_context_free (ctx_p);
1972 
1973  EXIT_FUNC ();
1974  return -1;
1975  }
1976 
1977  EXIT_FUNC ();
1978  return 0;
1979 }
1980 
1981 static int
1983 {
1984  int error;
1985 
1986  if (sock_io_p->write_event)
1987  {
1988  proxy_event_free (sock_io_p->write_event);
1989  sock_io_p->write_event = NULL;
1990  }
1991 
1992  error = proxy_process_client_conn_error (sock_io_p);
1993  if (error)
1994  {
1995  if (sock_io_p->fd != INVALID_SOCKET)
1996  {
1997  CLOSE_SOCKET (sock_io_p->fd);
1998  }
1999  }
2000 
2001  return error;
2002 }
2003 
2004 static int
2006 {
2007  int error;
2008 
2009  assert (sock_io_p);
2010 
2011 #if defined(LINUX)
2012  /*
2013  * If connection error event was triggered by EPOLLERR, EPOLLHUP,
2014  * there could be no error events.
2015  */
2016 #else /* LINUX */
2017  assert (sock_io_p->read_event);
2018 #endif /* !LINUX */
2019  if (sock_io_p->read_event)
2020  {
2021  proxy_event_free (sock_io_p->read_event);
2022  sock_io_p->read_event = NULL;
2023  }
2024 
2025  error = proxy_process_client_conn_error (sock_io_p);
2026  if (error)
2027  {
2028  if (sock_io_p->fd != INVALID_SOCKET)
2029  {
2030  CLOSE_SOCKET (sock_io_p->fd);
2031  }
2032  }
2033 
2034  return error;
2035 }
2036 
2037 static int
2039 {
2040  int error = 0;
2041 
2042  assert (sock_io_p);
2043 
2044  switch (sock_io_p->status)
2045  {
2046  case SOCK_IO_REG_WAIT:
2047  error = proxy_process_client_register (sock_io_p);
2048  break;
2049 
2050  case SOCK_IO_ESTABLISHED:
2051  error = proxy_process_client_request (sock_io_p);
2052  break;
2053 
2054  default:
2055  break;
2056  }
2057 
2058  return error;
2059 }
2060 
2061 static int
2063 {
2064  int length;
2065  char *p;
2066  int shard_id = PROXY_INVALID_SHARD, cas_id = PROXY_INVALID_CAS;
2067  char func_code;
2068  T_SHARD_IO *shard_io_p = NULL;
2069  T_CAS_IO *cas_io_p = NULL;
2071 
2072  ENTER_FUNC ();
2073 
2074  assert (sock_io_p);
2075  assert (sock_io_p->read_event);
2076 
2077  read_buffer = &(sock_io_p->read_event->buffer);
2078 
2079  length = get_msg_length (read_buffer->data);
2080  assert (read_buffer->offset == length);
2081 
2082  /* func code */
2083  p = (char *) (read_buffer->data + MSG_HEADER_SIZE);
2084  memcpy (&func_code, p, sizeof (char));
2085  p += sizeof (char);
2086 
2087  /* shard id */
2088  memcpy (&shard_id, p, sizeof (int));
2089  shard_id = ntohl (shard_id);
2090  p += sizeof (int);
2091 
2092  memcpy (&cas_id, p, sizeof (int));
2093  cas_id = ntohl (cas_id);
2094 
2095  cas_io_p = proxy_cas_io_new (shard_id, cas_id, sock_io_p->fd);
2096  if (cas_io_p == NULL)
2097  {
2098  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to create CAS entry. " "(shard_id:%d, cas_id:%d, fd:%d).", shard_id,
2099  cas_id, sock_io_p->fd);
2100  goto error_return;
2101  }
2102  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS);
2103 
2104  /* fill socket io entry */
2105  sock_io_p->status = SOCK_IO_ESTABLISHED;
2106  sock_io_p->id.shard.shard_id = shard_id;
2107  sock_io_p->id.shard.cas_id = cas_id;
2108 
2109  /* SHARD TODO : !!! check protocol */
2110 
2111 
2112  shard_io_p = proxy_shard_io_find (shard_id);
2113  assert (shard_io_p);
2114 
2115  /* set cas io status */
2116  cas_io_p->status = CAS_IO_CONNECTED;
2117 
2118  PROXY_DEBUG_LOG ("New CAS registered. (shard_id:%d). CAS(%s).", shard_io_p->shard_id, proxy_str_cas_io (cas_io_p));
2119 
2120  proxy_client_check_waiter_and_wakeup (shard_io_p, cas_io_p);
2121 
2122  /* in this case, we should free event now */
2123  proxy_event_free (sock_io_p->read_event);
2124  sock_io_p->read_event = NULL;
2125 
2126  EXIT_FUNC ();
2127  return 0;
2128 
2129 error_return:
2130 
2131  /* in this case, we should free event now */
2132  proxy_event_free (sock_io_p->read_event);
2133  sock_io_p->read_event = NULL;
2134 
2135  if (cas_io_p && shard_id >= 0 && cas_id >= 0)
2136  {
2137  proxy_cas_io_free (shard_id, cas_id);
2138  }
2139  else
2140  {
2141  /* cas have to retry register to proxy. */
2142  proxy_socket_io_delete (sock_io_p->fd);
2143  }
2144 
2145  EXIT_FUNC ();
2146  return -1;
2147 }
2148 
2149 static int
2151 {
2152  int error;
2153  T_PROXY_CONTEXT *ctx_p;
2154  T_CAS_IO *cas_io_p;
2155  T_PROXY_EVENT *event_p;
2156 
2157  ENTER_FUNC ();
2158 
2159  assert (sock_io_p);
2160  assert (sock_io_p->read_event);
2161  assert (sock_io_p->status != SOCK_IO_IDLE);
2162  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS);
2163 
2164  event_p = sock_io_p->read_event;
2165  sock_io_p->read_event = NULL;
2166 
2167  cas_io_p = proxy_cas_io_find_by_fd (sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd);
2168  if (cas_io_p == NULL)
2169  {
2170  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(shard_id:%d, cas_id:%d, fd:%d). " "event(%s).",
2171  sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd, proxy_str_event (event_p));
2172 
2173  proxy_event_free (event_p);
2174  event_p = NULL;
2175 
2176  proxy_socket_io_delete (sock_io_p->fd);
2177  EXIT_FUNC ();
2178  return -1;
2179  }
2180 
2181  if (cas_io_p->is_in_tran == false)
2182  {
2183  /* in case, cas session timeout !!! */
2184 
2186  "Unexpected CAS transaction status. " "(expected tran status:%d). CAS(%s). event(%s).", true,
2187  proxy_str_cas_io (cas_io_p), proxy_str_event (event_p));
2188 
2189  proxy_event_free (event_p);
2190  event_p = NULL;
2191 
2192  proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);
2193  EXIT_FUNC ();
2194  return -1;
2195  }
2196 
2197  ctx_p = proxy_context_find (cas_io_p->ctx_cid, cas_io_p->ctx_uid);
2198  if (ctx_p == NULL)
2199  {
2200  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. CAS(%s). event(%s).", proxy_str_cas_io (cas_io_p),
2201  proxy_str_event (event_p));
2202 
2203  proxy_event_free (event_p);
2204  event_p = NULL;
2205 
2206  proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);
2207  EXIT_FUNC ();
2208  return -1;
2209  }
2210 
2212  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);
2213 
2214  error = shard_queue_enqueue (&proxy_Handler.cas_rcv_q, (void *) event_p);
2215  if (error)
2216  {
2217  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
2218  proxy_str_context (ctx_p), proxy_str_event (event_p));
2219 
2220  proxy_event_free (event_p);
2221  event_p = NULL;
2222 
2223  proxy_context_free (ctx_p);
2224 
2225  EXIT_FUNC ();
2226  return -1;
2227  }
2228 
2229  /* in this case, we should detach event from socket io */
2230  sock_io_p->read_event = NULL;
2231 
2232  EXIT_FUNC ();
2233  return 0;
2234 }
2235 
2236 static int
2238 {
2239  int error;
2240  T_PROXY_CONTEXT *ctx_p;
2241  T_CAS_IO *cas_io_p;
2242  T_PROXY_EVENT *event_p;
2243 
2244  ENTER_FUNC ();
2245 
2246  assert (sock_io_p);
2247  assert (sock_io_p->fd != INVALID_SOCKET);
2248  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS);
2249 
2250  sock_io_p->status = SOCK_IO_CLOSE_WAIT;
2251 
2252 #if defined(LINUX)
2253  error = proxy_del_epoll_event (sock_io_p->fd);
2254  if (error == -1)
2255  {
2256  return -1;
2257  }
2258 #else /* LINUX */
2259  /* disable socket read/write */
2260  FD_CLR (sock_io_p->fd, &allset);
2261  FD_CLR (sock_io_p->fd, &wnewset);
2262 #endif /* !LINUX */
2263 
2264  cas_io_p = proxy_cas_io_find_by_fd (sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd);
2265  if (cas_io_p == NULL)
2266  {
2267  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(shard_id:%d, cas_id:%d, fd:%d). ",
2268  sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd);
2269 
2270  proxy_socket_io_delete (sock_io_p->fd);
2271  EXIT_FUNC ();
2272  return -1;
2273  }
2274 
2275  PROXY_DEBUG_LOG ("Detect CAS connection error. CAS(%s).", proxy_str_cas_io (cas_io_p));
2276 
2277  if (cas_io_p->is_in_tran == false)
2278  {
2279  /* __FOR_DEBUG */
2280  assert (cas_io_p->ctx_cid == PROXY_INVALID_CONTEXT);
2281  assert (cas_io_p->ctx_uid == 0);
2282 
2283  proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);
2284 
2285  EXIT_FUNC ();
2286  return -1;
2287  }
2288  cas_io_p->status = CAS_IO_CLOSE_WAIT;
2289 
2290 
2291  ctx_p = proxy_context_find (cas_io_p->ctx_cid, cas_io_p->ctx_uid);
2292  if (ctx_p == NULL)
2293  {
2294  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. CAS(%s).", proxy_str_cas_io (cas_io_p));
2295 
2296  proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);
2297 
2298  EXIT_FUNC ();
2299  return -1;
2300  }
2301 
2303  if (event_p == NULL)
2304  {
2305  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make new event. (%s, %s). context(%s).", "PROXY_EVENT_CAS_CONN_ERROR",
2306  "PROXY_EVENT_FROM_CAS", proxy_str_context (ctx_p));
2307 
2308  proxy_context_free (ctx_p);
2309 
2310  EXIT_FUNC ();
2311  return -1;
2312  }
2313  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);
2314 
2315  error = shard_queue_enqueue (&proxy_Handler.cas_rcv_q, (void *) event_p);
2316  if (error)
2317  {
2318  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
2319  proxy_str_context (ctx_p), proxy_str_event (event_p));
2320  proxy_event_free (event_p);
2321  event_p = NULL;
2322 
2323  proxy_context_free (ctx_p);
2324  EXIT_FUNC ();
2325  return -1;
2326  }
2327 
2328  EXIT_FUNC ();
2329  return 0;
2330 }
2331 
2332 int
2334 {
2335  int error;
2336 
2337  if (sock_io_p->write_event)
2338  {
2339  proxy_event_free (sock_io_p->read_event);
2340  sock_io_p->read_event = NULL;
2341  }
2342 
2343  error = proxy_process_cas_conn_error (sock_io_p);
2344  if (error)
2345  {
2346  if (sock_io_p->fd != INVALID_SOCKET)
2347  {
2348  CLOSE_SOCKET (sock_io_p->fd);
2349  }
2350  }
2351 
2352  return error;
2353 }
2354 
2355 int
2357 {
2358  int error;
2359 
2360  assert (sock_io_p);
2361 
2362 #if defined(LINUX)
2363  /*
2364  * If connection error event was triggered by EPOLLERR, EPOLLHUP,
2365  * there could be no error events.
2366  */
2367 #else /* LINUX */
2368  assert (sock_io_p->read_event);
2369 #endif /* !LINUX */
2370  if (sock_io_p->read_event)
2371  {
2372  proxy_event_free (sock_io_p->read_event);
2373  sock_io_p->read_event = NULL;
2374  }
2375 
2376  error = proxy_process_cas_conn_error (sock_io_p);
2377  if (error)
2378  {
2379  if (sock_io_p->fd != INVALID_SOCKET)
2380  {
2381  CLOSE_SOCKET (sock_io_p->fd);
2382  }
2383  }
2384 
2385  return error;
2386 }
2387 
2388 static int
2390 {
2391  int error = 0;
2392 
2393  assert (sock_io_p);
2394 
2395  switch (sock_io_p->status)
2396  {
2397  case SOCK_IO_REG_WAIT:
2398  error = proxy_process_cas_register (sock_io_p);
2399  break;
2400 
2401  case SOCK_IO_ESTABLISHED:
2402  error = proxy_process_cas_response (sock_io_p);
2403  break;
2404 
2405  default:
2406  break;
2407  }
2408 
2409  return error;
2410 }
2411 
2412 static int
2414 {
2415  int write_len;
2416  int remain;
2417  char *p;
2418  T_IO_BUFFER *send_buffer;
2419 
2420  // __FOR_DEBUG
2421  assert (sock_io_p);
2422  assert (sock_io_p->write_event);
2423 
2424  send_buffer = &(sock_io_p->write_event->buffer);
2425 
2426  if (send_buffer->length == send_buffer->offset)
2427  {
2428  write_len = 0;
2429  goto write_end;
2430  }
2431  else if (send_buffer->length < send_buffer->offset)
2432  {
2433  assert (false);
2434 
2435  write_len = -1;
2436  goto write_end;
2437  }
2438 
2439  remain = send_buffer->length - send_buffer->offset;
2440 
2441  p = (char *) (send_buffer->data + send_buffer->offset);
2442 
2443  write_len = WRITESOCKET (sock_io_p->fd, p, remain);
2444  if (write_len < 0)
2445  {
2446 #if defined(WINDOWS)
2447  int error;
2448 
2449  error = WSAGetLastError ();
2450  if (error == WSAEWOULDBLOCK)
2451 #else
2452  if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR))
2453 #endif
2454  {
2455 #if defined(LINUX)
2456  /* Nothing to do. epoll events has not been changed. */
2457 #else /* LINUX */
2458  FD_SET (sock_io_p->fd, &wnewset);
2459 #endif /* !LINUX */
2460  return 0;
2461  }
2462 
2463  goto write_end;
2464  }
2465 
2466  send_buffer->offset += write_len;
2467 
2468  if (send_buffer->offset < send_buffer->length)
2469  {
2470 #if defined(LINUX)
2471  /* Nothing to do. epoll events has not been changed. */
2472 #else /* LINUX */
2473  FD_SET (sock_io_p->fd, &wnewset);
2474 #endif /* !LINUX */
2475  return write_len;
2476  }
2477 
2478 write_end:
2479 
2480  proxy_event_free (sock_io_p->write_event);
2481  sock_io_p->write_event = NULL;
2482 
2483 #if defined(LINUX)
2484  (void) proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN);
2485 #else /* LINUX */
2486  FD_CLR (sock_io_p->fd, &wnewset);
2487 #endif /* !LINUX */
2488 
2489  return write_len;
2490 }
2491 
2492 static void
2494 {
2495  int len;
2496 
2497  assert (sock_io_p);
2498 
2499  len = proxy_socket_io_write_internal (sock_io_p);
2500  if (len < 0)
2501  {
2502  proxy_socket_io_write_error (sock_io_p);
2503  }
2504 
2505  return;
2506 }
2507 
2508 static void
2510 {
2511  int len;
2512  T_PROXY_CONTEXT *ctx_p;
2513  T_CLIENT_INFO *client_info_p;
2514 
2515  assert (sock_io_p);
2516 
2517  len = proxy_socket_io_write_internal (sock_io_p);
2518  if (len < 0)
2519  {
2520  proxy_socket_io_write_error (sock_io_p);
2521  }
2522 
2523  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
2524  if (ctx_p == NULL)
2525  {
2526  return;
2527  }
2528 
2529  if (ctx_p->free_on_client_io_write && sock_io_p->write_event == NULL)
2530  {
2531  /* init shared memory T_CLIENT_INFO */
2532  client_info_p = shard_shm_get_client_info (proxy_info_p, sock_io_p->id.client_id);
2533  shard_shm_init_client_info (client_info_p);
2534 
2535  proxy_context_free (ctx_p);
2536  }
2537 
2538  return;
2539 }
2540 
2541 static int
2543 {
2544  int error;
2545  int read_len, remain, total_len;
2546  char *buffer;
2548 
2549  assert (sock_io_p);
2550  assert (sock_io_p->read_event);
2551 
2552 read_again:
2553 
2554  read_buffer = &(sock_io_p->read_event->buffer);
2555  buffer = (char *) (read_buffer->data + read_buffer->offset);
2556  remain = read_buffer->length - read_buffer->offset;
2557 
2558  read_len = READSOCKET (sock_io_p->fd, buffer, remain);
2559  if (read_len < 0)
2560  {
2561 #if defined(WINDOWS)
2562  error = WSAGetLastError ();
2563  if ((error == WSAECONNRESET) || (error == WSAECONNABORTED))
2564  {
2565  return -1; /* failed */
2566  }
2567  else if (error == WSAEWOULDBLOCK)
2568 #else
2569  if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR))
2570 #endif
2571  {
2572  return 0; /* retry */
2573  }
2574  return -1; /* disconnected */
2575  }
2576  else if (read_len == 0)
2577  {
2578  return -1; /* disconnected */
2579  }
2580 
2581  read_buffer->offset += read_len;
2582 
2583  /* common message header */
2584  if (read_buffer->length == MSG_HEADER_SIZE && read_buffer->length == read_buffer->offset)
2585  {
2586  /* expand buffer to receive payload */
2587  total_len = get_msg_length (read_buffer->data);
2588  if (total_len == read_buffer->offset)
2589  {
2590  /* no more data */
2591  return read_buffer->offset;
2592  }
2593 
2594  error = proxy_event_realloc_buffer (sock_io_p->read_event, total_len);
2595  if (error)
2596  {
2597  PROXY_DEBUG_LOG ("Failed to realloc event buffer. (error:%d).", error);
2598  proxy_socket_io_read_error (sock_io_p);
2599  return -1;
2600  }
2601 
2602  goto read_again;
2603  }
2604 
2605  return read_len;
2606 }
2607 
2608 /* proxy_socket_io_read_internal */
2609 static void
2611 {
2612  int error;
2613 
2614  assert (sock_io_p);
2615 
2616  error = proxy_socket_io_read_internal (sock_io_p);
2617  if (error < 0)
2618  {
2619  proxy_socket_io_read_error (sock_io_p);
2620  return;
2621  }
2622 
2623  if (proxy_event_io_read_complete (sock_io_p->read_event))
2624  {
2625  proxy_process_cas_message (sock_io_p);
2626  }
2627 
2628  return;
2629 }
2630 
2631 static void
2633 {
2634  int error;
2635  int length;
2636 
2637  assert (sock_io_p);
2638  assert (sock_io_p->read_event);
2639 
2640  length = MSG_HEADER_SIZE;
2641  error = proxy_event_alloc_buffer (sock_io_p->read_event, length);
2642  if (error)
2643  {
2644  proxy_socket_io_read_error (sock_io_p);
2645  return;
2646  }
2647 
2649 }
2650 
2651 static void
2653 {
2654  T_IO_BUFFER *r_buf;
2655 
2656  assert (sock_io_p);
2657  assert (sock_io_p->read_event);
2658 
2659  r_buf = &(sock_io_p->read_event->buffer);
2660 
2661  if (r_buf->length == 0)
2662  {
2664  }
2665  else
2666  {
2668  }
2669 
2670  return;
2671 }
2672 
2673 static void
2675 {
2676  int error;
2677 
2678  assert (sock_io_p);
2679 
2680  error = proxy_socket_io_read_internal (sock_io_p);
2681  if (error < 0)
2682  {
2683  proxy_socket_io_read_error (sock_io_p);
2684  return;
2685  }
2686 
2687  if (proxy_event_io_read_complete (sock_io_p->read_event))
2688  {
2689  proxy_process_client_message (sock_io_p);
2690  }
2691 
2692  return;
2693 }
2694 
2695 static void
2697 {
2698  int error;
2699  int length;
2700  char *driver_info;
2701 
2702  assert (sock_io_p);
2703  assert (sock_io_p->read_event);
2704 
2705  if (sock_io_p->status == SOCK_IO_REG_WAIT)
2706  {
2707  driver_info = proxy_get_driver_info_by_fd (sock_io_p);
2708  length = get_dbinfo_length (driver_info);
2709  }
2710  else
2711  {
2712  length = MSG_HEADER_SIZE;
2713  }
2714 
2715  error = proxy_event_alloc_buffer (sock_io_p->read_event, length);
2716  if (error)
2717  {
2718  proxy_socket_io_write_error (sock_io_p);
2719  return;
2720  }
2721 
2723 }
2724 
2725 static void
2727 {
2728  T_IO_BUFFER *r_buf;
2729 
2730  assert (sock_io_p);
2731  assert (sock_io_p->read_event);
2732 
2733  r_buf = &(sock_io_p->read_event->buffer);
2734 
2735  if (r_buf->length == 0)
2736  {
2738  }
2739  else
2740  {
2742  }
2743 
2744  return;
2745 }
2746 
2747 static void
2749 {
2750  assert (sock_io_p);
2751 
2752  if (sock_io_p->status == SOCK_IO_CLOSE_WAIT)
2753  {
2754  PROXY_DEBUG_LOG ("Unexpected socket status. (fd:%d, status:%d). \n", sock_io_p->fd, sock_io_p->status);
2755 
2756  /*
2757  * free writer event when sock status is 'close wait'
2758  */
2759  if (sock_io_p->write_event)
2760  {
2761  proxy_event_free (sock_io_p->write_event);
2762  sock_io_p->write_event = NULL;
2763  }
2764 
2765 #if defined(LINUX)
2766  (void) proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN);
2767 #else /* LINUX */
2768  FD_CLR (sock_io_p->fd, &wnewset);
2769 #endif /* !LINUX */
2770 
2771  return;
2772  }
2773  else if (sock_io_p->status == SOCK_IO_IDLE)
2774  {
2775  assert (false);
2776  PROXY_DEBUG_LOG ("Unexpected socket status. (fd:%d, status:%d). \n", sock_io_p->fd, sock_io_p->status);
2777  }
2778 
2779  if (sock_io_p->write_event == NULL)
2780  {
2781 #if defined(LINUX)
2782  (void) proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN);
2783 #else /* LINUX */
2784  FD_CLR (sock_io_p->fd, &wnewset);
2785 #endif /* !LINUX */
2786 
2787  PROXY_DEBUG_LOG ("Write event couldn't be NULL. (fd:%d, status:%d). \n", sock_io_p->fd, sock_io_p->status);
2788  return;
2789  }
2790 
2791  if (sock_io_p->from_cas)
2792  {
2793  proxy_socket_io_write_to_cas (sock_io_p);
2794  }
2795  else
2796  {
2797  proxy_socket_io_write_to_client (sock_io_p);
2798  }
2799 
2800  return;
2801 }
2802 
2803 static void
2805 {
2806  assert (sock_io_p);
2807 
2808  if (sock_io_p->from_cas)
2809  {
2810  proxy_process_cas_write_error (sock_io_p);
2811  }
2812  else
2813  {
2815  }
2816 
2817  return;
2818 }
2819 
2820 static void
2822 {
2823  assert (sock_io_p);
2824 
2825  if (sock_io_p->status == SOCK_IO_CLOSE_WAIT)
2826  {
2827  /* this socket connection will be close */
2828 
2829  PROXY_DEBUG_LOG ("Unexpected socket status. " "socket will be closed. " "(fd:%d, status:%d).", sock_io_p->fd,
2830  sock_io_p->status);
2831  //
2832  // proxy_io_buffer_clear (&sock_io_p->recv_buffer);
2833 
2834  // assert (false);
2835  return;
2836  }
2837 
2838  if (sock_io_p->read_event == NULL)
2839  {
2840  sock_io_p->read_event = proxy_event_new (PROXY_EVENT_IO_READ, sock_io_p->from_cas);
2841  if (sock_io_p->read_event == NULL)
2842  {
2843  assert (false); /* __FOR_DEBUG */
2844 
2845  proxy_socket_io_read_error (sock_io_p);
2846 
2847  return;
2848  }
2849  }
2850 
2851  /* __FOR_DEBUG */
2852  assert (sock_io_p->read_event->type == PROXY_EVENT_IO_READ);
2853 
2854  if (sock_io_p->from_cas)
2855  {
2856  proxy_socket_io_read_from_cas (sock_io_p);
2857  }
2858  else
2859  {
2861  }
2862 
2863  return;
2864 }
2865 
2866 static void
2868 {
2869  assert (sock_io_p);
2870 
2871  if (sock_io_p->from_cas)
2872  {
2873  proxy_process_cas_read_error (sock_io_p);
2874  }
2875  else
2876  {
2877  proxy_process_client_read_error (sock_io_p);
2878  }
2879 
2880  return;
2881 }
2882 
2883 static int
2885 {
2886  int error;
2887  int i, size;
2888  T_CLIENT_IO *client_io_ent_p;
2889 
2890  proxy_Client_io.max_client = shm_proxy_p->max_client;
2891  proxy_Client_io.cur_client = 0;
2892  proxy_Client_io.max_context = shm_proxy_p->max_context;
2893 
2894  error = shard_cqueue_initialize (&proxy_Client_io.freeq, proxy_Client_io.max_context);
2895  if (error)
2896  {
2897  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize client entries. " "(error:%d).", error);
2898  return -1;
2899  }
2900 
2901  /* make client io entry */
2902  size = proxy_Client_io.max_context * sizeof (T_CLIENT_IO);
2903  proxy_Client_io.ent = (T_CLIENT_IO *) malloc (size);
2904  if (proxy_Client_io.ent == NULL)
2905  {
2907  "Not enough virtual memory. " "Failed to alloc client entries. " "(errno:%d, size:%d).", errno, size);
2908  goto error_return;
2909  }
2910 
2911  for (i = 0; i < proxy_Client_io.max_context; i++)
2912  {
2913  client_io_ent_p = &(proxy_Client_io.ent[i]);
2914 
2915  client_io_ent_p->client_id = i;
2916  client_io_ent_p->is_busy = false;
2917  client_io_ent_p->fd = INVALID_SOCKET;
2918  client_io_ent_p->ctx_cid = PROXY_INVALID_CONTEXT;
2919  client_io_ent_p->ctx_uid = 0;
2920 
2921  error = shard_cqueue_enqueue (&proxy_Client_io.freeq, (void *) client_io_ent_p);
2922  if (error)
2923  {
2924  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize client free queue." "(error:%d).", error);
2925  goto error_return;
2926  }
2927  }
2928 
2929  return 0;
2930 
2931 error_return:
2933 
2934  return -1;
2935 }
2936 
2937 static void
2939 {
2940  shard_cqueue_destroy (&proxy_Client_io.freeq);
2941  FREE_MEM (proxy_Client_io.ent);
2942  proxy_Client_io.max_client = -1;
2943  proxy_Client_io.cur_client = 0;
2944 }
2945 
2946 #if defined(PROXY_VERBOSE_DEBUG)
2947 void
2948 proxy_client_io_print (bool print_all)
2949 {
2950  int i;
2951  T_CLIENT_IO *cli_io_p = NULL;
2952 
2953  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* CLIENT IO *\n");
2954  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_client", proxy_Client_io.max_client);
2955  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_context", proxy_Client_io.max_context);
2956  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "cur_client", proxy_Client_io.cur_client);
2957 
2958  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s %-10s %-5s %-5s %-10s %-10s", "idx", "client_id", "busy", "fd",
2959  "context_id", "uid");
2960  if (proxy_Client_io.ent)
2961  {
2962  for (i = 0; i < proxy_Client_io.max_context; i++)
2963  {
2964  cli_io_p = &(proxy_Client_io.ent[i]);
2965  if (!print_all && !cli_io_p->is_busy)
2966  {
2967  continue;
2968  }
2969  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d] %-10d %-5s %-5d %-10d %-10u", i, cli_io_p->client_id,
2970  (cli_io_p->is_busy) ? "YES" : "NO", cli_io_p->fd, cli_io_p->ctx_cid, cli_io_p->ctx_uid);
2971  }
2972  }
2973 
2974  return;
2975 }
2976 #endif /* PROXY_VERBOSE_DEBUG */
2977 
2978 char *
2980 {
2981  static char buffer[LINE_MAX];
2982 
2983  if (cli_io_p == NULL)
2984  {
2985  return (char *) "-";
2986  }
2987 
2988  snprintf (buffer, sizeof (buffer), "client_id:%d, is_busy:%s, fd:%d, ctx_cid:%d, ctx_uid:%u", cli_io_p->client_id,
2989  (cli_io_p->is_busy) ? "Y" : "N", cli_io_p->fd, cli_io_p->ctx_cid, cli_io_p->ctx_uid);
2990 
2991  return (char *) buffer;
2992 }
2993 
2994 static T_CLIENT_IO *
2995 proxy_client_io_new (SOCKET fd, char *driver_info)
2996 {
2997  T_PROXY_CONTEXT *ctx_p;
2998  T_CLIENT_IO *cli_io_p = NULL;
2999 
3000  cli_io_p = (T_CLIENT_IO *) shard_cqueue_dequeue (&proxy_Client_io.freeq);
3001 
3002  if (cli_io_p)
3003  {
3004  cli_io_p->fd = fd;
3005  cli_io_p->is_busy = true;
3006 
3007  proxy_Client_io.cur_client++;
3008  proxy_info_p->cur_client = proxy_Client_io.cur_client;
3009 
3010  ctx_p = proxy_context_new ();
3011  if (ctx_p == NULL)
3012  {
3013  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to make new context. client(%s).", proxy_str_client_io (cli_io_p));
3014 
3015  proxy_client_io_free (cli_io_p);
3016  return NULL;
3017  }
3018 
3019  cli_io_p->ctx_cid = ctx_p->cid;
3020  cli_io_p->ctx_uid = ctx_p->uid;
3021  memcpy (cli_io_p->driver_info, driver_info, SRV_CON_CLIENT_INFO_SIZE);
3022 
3023  ctx_p->client_id = cli_io_p->client_id;
3024 
3025  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New client connected. client(%s).", proxy_str_client_io (cli_io_p));
3026 
3027  if (proxy_Client_io.cur_client > proxy_Client_io.max_client)
3028  {
3029  /*
3030  * Error message would be retured when processing
3031  * register(db_info) request.
3032  */
3033  char err_msg[256];
3034 
3035  snprintf (err_msg, sizeof (err_msg), "Proxy refused client connection. max clients exceeded");
3036 
3038  }
3039  }
3040 
3041 #if defined(PROXY_VERBOSE_DEBUG)
3042  proxy_client_io_print (false);
3043 #endif /* PROXY_VERBOSE_DEBUG */
3044 
3045  return cli_io_p;
3046 }
3047 
3048 void
3050 {
3051  int error;
3052 
3053  assert (cli_io_p->fd != INVALID_SOCKET);
3054  assert (cli_io_p->is_busy == true);
3055 
3056  proxy_socket_io_delete (cli_io_p->fd);
3057 
3058  cli_io_p->fd = INVALID_SOCKET;
3059  cli_io_p->is_busy = false;
3060  cli_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
3061  cli_io_p->ctx_uid = 0;
3062  memset (cli_io_p->driver_info, 0, SRV_CON_CLIENT_INFO_SIZE);
3063 
3064  proxy_Client_io.cur_client--;
3065  if (proxy_Client_io.cur_client < 0)
3066  {
3067  assert (false);
3068  proxy_Client_io.cur_client = 0;
3069  }
3070  proxy_info_p->cur_client = proxy_Client_io.cur_client;
3071 
3072  error = shard_cqueue_enqueue (&proxy_Client_io.freeq, (void *) cli_io_p);
3073  if (error)
3074  {
3075  assert (false);
3076  return;
3077  }
3078 
3079  return;
3080 }
3081 
3082 void
3083 proxy_client_io_free_by_ctx (int client_id, int ctx_cid, int ctx_uid)
3084 {
3085  T_CLIENT_IO *cli_io_p;
3086 
3087  cli_io_p = proxy_client_io_find_by_ctx (client_id, ctx_cid, ctx_uid);
3088  if (cli_io_p == NULL)
3089  {
3090  return;
3091  }
3092 
3093  proxy_client_io_free (cli_io_p);
3094 
3095  return;
3096 }
3097 
3098 
3099 T_CLIENT_IO *
3100 proxy_client_io_find_by_ctx (int client_id, int ctx_cid, unsigned int ctx_uid)
3101 {
3102  T_CLIENT_IO *cli_io_p;
3103 
3104  if (client_id < 0 || client_id >= proxy_Client_io.max_context)
3105  {
3106  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid client id. (client_id;%d, max_context:%d).", client_id,
3107  proxy_Client_io.max_context);
3108  return NULL;
3109  }
3110 
3111  cli_io_p = &(proxy_Client_io.ent[client_id]);
3112 
3113  if (cli_io_p->ctx_cid != ctx_cid || cli_io_p->ctx_uid != ctx_uid)
3114  {
3115  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find client by context. " "(context id:%d/%d, context uid:%d/%d).",
3116  cli_io_p->ctx_cid, ctx_cid, cli_io_p->ctx_uid, ctx_uid);
3117  return NULL;
3118  }
3119 
3120  return (cli_io_p->is_busy) ? cli_io_p : NULL;
3121 }
3122 
3123 T_CLIENT_IO *
3125 {
3126  T_CLIENT_IO *cli_io_p;
3127 
3128  if (client_id < 0 || client_id >= proxy_Client_io.max_context)
3129  {
3130  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid client id. (client_id:%d, max_context:%d).", client_id,
3131  proxy_Client_io.max_context);
3132  return NULL;
3133  }
3134 
3135  cli_io_p = &(proxy_Client_io.ent[client_id]);
3136 
3137  /* client io's socket fd must be the same requested fd */
3138  assert (cli_io_p->fd == fd);
3139 
3140  return (cli_io_p->is_busy) ? cli_io_p : NULL;
3141 }
3142 
3143 int
3145 {
3146  int error;
3147 
3148  T_SOCKET_IO *sock_io_p;
3149 
3150  assert (cli_io_p);
3151  assert (event_p);
3152 
3153  sock_io_p = proxy_socket_io_find (cli_io_p->fd);
3154  if (sock_io_p == NULL)
3155  {
3156  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find socket entry. (fd:%d).", cli_io_p->fd);
3157  return -1;
3158  }
3159 
3160  error = proxy_socket_set_write_event (sock_io_p, event_p);
3161  if (error)
3162  {
3163  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "client(%s). event(%s).",
3164  proxy_str_client_io (cli_io_p), proxy_str_event (event_p));
3165  return -1;
3166  }
3167 
3168  return 0;
3169 }
3170 
3171 static int
3172 proxy_cas_io_initialize (int shard_id, T_CAS_IO ** cas_io_pp, int size)
3173 {
3174  int i;
3175  T_CAS_IO *cas_io_p;
3176  T_CAS_IO *buffer;
3177 
3178  assert (cas_io_pp);
3179 
3180  buffer = (T_CAS_IO *) malloc (sizeof (T_CAS_IO) * size);
3181  if (buffer == NULL)
3182  {
3183  return -1;
3184  }
3185 
3186  for (i = 0; i < size; i++)
3187  {
3188  cas_io_p = &(buffer[i]);
3189 
3190  cas_io_p->cas_id = i;
3191  cas_io_p->shard_id = shard_id;
3192  cas_io_p->is_in_tran = false;
3193  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
3194  cas_io_p->ctx_uid = 0;
3195  cas_io_p->fd = INVALID_SOCKET;
3196  }
3197 
3198  *cas_io_pp = buffer;
3199 
3200  return 0;
3201 }
3202 
3203 int
3205 {
3206  int error;
3207  T_SOCKET_IO *sock_io_p;
3208 
3209  assert (cas_io_p);
3210  assert (event_p);
3211 
3212  sock_io_p = proxy_socket_io_find (cas_io_p->fd);
3213  if (sock_io_p == NULL)
3214  {
3215  assert (false);
3216  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find socket entry. (fd:%d).", cas_io_p->fd);
3217  return -1;
3218  }
3219 
3220  error = proxy_socket_set_write_event (sock_io_p, event_p);
3221  if (error)
3222  {
3223  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "CAS(%s). event(%s).",
3224  proxy_str_cas_io (cas_io_p), proxy_str_event (event_p));
3225  return -1;
3226  }
3227 
3228  return 0;
3229 }
3230 
3231 static int
3233 {
3234  int error;
3235  int i;
3236  T_SHARD_IO *shard_io_p;
3237  T_SHARD_INFO *shard_info_p;
3238  int max_appl_server;
3239 
3240  proxy_Shard_io.max_shard = proxy_info_p->max_shard;
3241 
3242  proxy_Shard_io.ent = (T_SHARD_IO *) malloc (sizeof (T_SHARD_IO) * proxy_Shard_io.max_shard);
3243  if (proxy_Shard_io.ent == NULL)
3244  {
3245  return -1;
3246  }
3247 
3248  shard_info_p = shard_shm_find_shard_info (proxy_info_p, 0);
3249  max_appl_server = shard_info_p->max_appl_server;
3250 
3251  for (i = 0; i < proxy_Shard_io.max_shard; i++)
3252  {
3253  shard_io_p = &(proxy_Shard_io.ent[i]);
3254 
3255  shard_io_p->shard_id = i;
3256  shard_io_p->max_num_cas = max_appl_server;
3257  shard_io_p->cur_num_cas = 0;
3258  shard_io_p->num_cas_in_tran = 0;
3259  error = shard_queue_initialize (&shard_io_p->waitq);
3260  if (error)
3261  {
3262  goto error_return;
3263  }
3264 
3265  error = proxy_cas_io_initialize (shard_io_p->shard_id, &shard_io_p->ent, shard_io_p->max_num_cas);
3266  if (error)
3267  {
3268  goto error_return;
3269  }
3270  }
3271 
3272  return 0;
3273 
3274 error_return:
3276  return -1;
3277 }
3278 
3279 static void
3281 {
3282  int i;
3283  T_SHARD_IO *shard_io_p;
3284 
3285  for (i = 0; i < proxy_Shard_io.max_shard; i++)
3286  {
3287  shard_io_p = &(proxy_Shard_io.ent[i]);
3288 
3289  shard_queue_destroy (&shard_io_p->waitq);
3290  FREE_MEM (shard_io_p->ent);
3291  }
3292 
3293  FREE_MEM (proxy_Shard_io.ent);
3294 
3295  return;
3296 }
3297 
3298 #if defined(PROXY_VERBOSE_DEBUG)
3299 void
3300 proxy_shard_io_print (bool print_all)
3301 {
3302  int i, j;
3303  T_SHARD_IO *shard_io_p;
3304  T_CAS_IO *cas_io_p;
3305 
3306  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* SHARD IO *");
3307  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_shard", proxy_Shard_io.max_shard);
3308 
3309  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s %-15s %-15s %-15s %-15s", "idx", "shard_id", "max_cas", "cur_cas",
3310  "in_tran");
3311  if (proxy_Shard_io.ent)
3312  {
3313  for (i = 0; i < proxy_Shard_io.max_shard; i++)
3314  {
3315  shard_io_p = &(proxy_Shard_io.ent[i]);
3316 
3317  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d] %-15d %-15d %-15d %-15d", i, shard_io_p->shard_id,
3318  shard_io_p->max_num_cas, shard_io_p->cur_num_cas, shard_io_p->num_cas_in_tran);
3319 
3320  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, " <cas> %-7s %-10s %-10s %-10s %-10s", "idx", "cas_id",
3321  "shard_id", "in_tran", "fd");
3322  if (shard_io_p->ent)
3323  {
3324  for (j = 0; j < shard_io_p->max_num_cas; j++)
3325  {
3326  cas_io_p = &(shard_io_p->ent[j]);
3327  if (!print_all && IS_INVALID_SOCKET (cas_io_p->fd))
3328  {
3329  continue;
3330  }
3331 
3332  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, " [%-5d] %-10d %-10d %-10s %-10d", j,
3333  cas_io_p->cas_id, cas_io_p->shard_id, (cas_io_p->is_in_tran) ? "YES" : "NO", cas_io_p->fd);
3334  }
3335  }
3336  }
3337  }
3338 
3339  return;
3340 }
3341 #endif /* PROXY_VERBOSE_DEBUG */
3342 
3343 char *
3345 {
3346  static char buffer[LINE_MAX];
3347 
3348  if (cas_io_p == NULL)
3349  {
3350  return (char *) "-";
3351  }
3352 
3353  snprintf (buffer, sizeof (buffer),
3354  "cas_id:%d, shard_id:%d, is_in_tran:%s, " "status:%d, ctx_cid:%d, ctx_uid:%u, fd:%d", cas_io_p->cas_id,
3355  cas_io_p->shard_id, (cas_io_p->is_in_tran) ? "Y" : "N", cas_io_p->status, cas_io_p->ctx_cid,
3356  cas_io_p->ctx_uid, cas_io_p->fd);
3357 
3358  return (char *) buffer;
3359 }
3360 
3361 static T_SHARD_IO *
3362 proxy_shard_io_find (int shard_id)
3363 {
3364  T_SHARD_IO *shard_io_p;
3365 
3366  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
3367  {
3368  PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
3369  return NULL;
3370  }
3371 
3372  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
3373 
3374  return shard_io_p;
3375 }
3376 
3377 static T_CAS_IO *
3378 proxy_cas_io_new (int shard_id, int cas_id, SOCKET fd)
3379 {
3380  T_SHARD_IO *shard_io_p;
3381  T_CAS_IO *cas_io_p;
3382 
3383  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
3384  {
3385  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id,
3386  proxy_Shard_io.max_shard);
3387  return NULL;
3388  }
3389 
3390  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
3391  if (cas_id < 0 || cas_id >= shard_io_p->max_num_cas)
3392  {
3393  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
3394  return NULL;
3395  }
3396 
3397  cas_io_p = &(shard_io_p->ent[cas_id]);
3398 
3399  if (cas_io_p->fd != INVALID_SOCKET || cas_io_p->is_in_tran)
3400  {
3401  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Already registered CAS. " "(fd:%d, shard_id:%d, cas_id:%d, max_num_cas:%d).",
3402  cas_io_p->fd, shard_id, cas_id, shard_io_p->max_num_cas);
3403  return NULL;
3404  }
3405  assert (cas_io_p->fd == INVALID_SOCKET);
3406  assert (cas_io_p->is_in_tran == false);
3407 
3408  cas_io_p->status = CAS_IO_NOT_CONNECTED;
3409  cas_io_p->fd = fd;
3410  cas_io_p->is_in_tran = false;
3411  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
3412  cas_io_p->ctx_uid = 0;
3413 
3414  shard_io_p->cur_num_cas++;
3415 
3416  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New CAS connected. CAS(%s).", proxy_str_cas_io (cas_io_p));
3417 #if defined(PROXY_VERBOSE_DEBUG)
3418  proxy_shard_io_print (false);
3419 #endif /* PROXY_VERBOSE_DEBUG */
3420 
3421  return cas_io_p;
3422 }
3423 
3424 /* by socket event */
3425 static void
3426 proxy_cas_io_free (int shard_id, int cas_id)
3427 {
3428  T_SHARD_IO *shard_io_p;
3429  T_CAS_IO *cas_io_p;
3430 
3431  ENTER_FUNC ();
3432 
3433  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
3434  {
3435  PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
3436  EXIT_FUNC ();
3437  return;
3438  }
3439 
3440  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
3441  if (cas_id < 0 || cas_id >= shard_io_p->max_num_cas)
3442  {
3443  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
3444  EXIT_FUNC ();
3445  return;
3446  }
3447 
3448  cas_io_p = &(shard_io_p->ent[cas_id]);
3449 
3450  proxy_socket_io_delete (cas_io_p->fd);
3451 
3452  if (cas_io_p->fd != INVALID_SOCKET)
3453  {
3454  cas_io_p->fd = INVALID_SOCKET;
3456  }
3457 
3458  if (cas_io_p->is_in_tran == true)
3459  {
3460  shard_io_p->num_cas_in_tran--;
3461 
3462  PROXY_DEBUG_LOG ("shard/CAS status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
3463  shard_io_p->shard_id);
3464 
3465  assert (shard_io_p->num_cas_in_tran >= 0);
3466  if (shard_io_p->num_cas_in_tran < 0)
3467  {
3468  shard_io_p->num_cas_in_tran = 0;
3469  }
3470  }
3471 
3472 
3473  cas_io_p->is_in_tran = false;
3474  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
3475  cas_io_p->ctx_uid = 0;
3476 
3477  cas_io_p->status = CAS_IO_NOT_CONNECTED;
3478  shard_io_p->cur_num_cas--;
3479 
3480  EXIT_FUNC ();
3481  return;
3482 }
3483 
3484 /* by context */
3485 void
3486 proxy_cas_io_free_by_ctx (int shard_id, int cas_id, int ctx_cid, int unsigned ctx_uid)
3487 {
3488  T_SHARD_IO *shard_io_p;
3489  T_CAS_IO *cas_io_p;
3490 
3491  ENTER_FUNC ();
3492 
3493  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
3494  {
3495 
3496  PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
3497  EXIT_FUNC ();
3498  return;
3499  }
3500 
3501  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
3502  if (cas_id < 0 || cas_id >= shard_io_p->max_num_cas)
3503  {
3504  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
3505  EXIT_FUNC ();
3506  return;
3507  }
3508 
3509  cas_io_p = &(shard_io_p->ent[cas_id]);
3510 
3511  if (cas_io_p->is_in_tran == false || cas_io_p->ctx_cid != ctx_cid || cas_io_p->ctx_uid != ctx_uid)
3512  {
3513  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(context id:%d, context uid:%d). CAS(%S).",
3514  ctx_cid, ctx_uid, proxy_str_cas_io (cas_io_p));
3515 
3516  assert (false);
3517 
3518  EXIT_FUNC ();
3519  return;
3520  }
3521 
3522  proxy_socket_io_delete (cas_io_p->fd);
3523 
3524  if (cas_io_p->fd != INVALID_SOCKET)
3525  {
3526  cas_io_p->fd = INVALID_SOCKET;
3528  }
3529 
3530  cas_io_p->is_in_tran = false;
3531  cas_io_p->status = CAS_IO_NOT_CONNECTED;
3532  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
3533  cas_io_p->ctx_uid = 0;
3534 
3535  shard_io_p->cur_num_cas--;
3536  assert (shard_io_p->cur_num_cas >= 0);
3537  if (shard_io_p->cur_num_cas < 0)
3538  {
3539  shard_io_p->cur_num_cas = 0;
3540  }
3541 
3542  shard_io_p->num_cas_in_tran--;
3543  PROXY_DEBUG_LOG ("Shard/CAS status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
3544  shard_io_p->shard_id);
3545  assert (shard_io_p->num_cas_in_tran >= 0);
3546  if (shard_io_p->num_cas_in_tran < 0)
3547  {
3548  shard_io_p->num_cas_in_tran = 0;
3549  }
3550 
3551  EXIT_FUNC ();
3552  return;
3553 }
3554 
3555 static T_CAS_IO *
3556 proxy_cas_io_find_by_fd (int shard_id, int cas_id, SOCKET fd)
3557 {
3558  T_SHARD_IO *shard_io_p;
3559  T_CAS_IO *cas_io_p;
3560 
3561  if (shard_id < 0 || cas_id < 0 || fd == INVALID_SOCKET)
3562  {
3563  PROXY_DEBUG_LOG ("Unable to find CAS entry. " "(shard:%d, cas:%d, fd:%d).", shard_id, cas_id, fd);
3564  return NULL;
3565  }
3566 
3567  /* __FOR_DEBUG */
3568  assert (shard_id <= proxy_Shard_io.max_shard);
3569  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
3570 
3571  /* __FOR_DEBUG */
3572  assert (cas_id <= shard_io_p->max_num_cas);
3573  cas_io_p = &(shard_io_p->ent[cas_id]);
3574 
3575  assert (cas_io_p->fd == fd);
3576 
3577  return cas_io_p;
3578 }
3579 
3580 T_CAS_IO *
3581 proxy_cas_find_io_by_ctx (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
3582 {
3583  /* in case, find cas explicitly */
3584  T_SHARD_IO *shard_io_p;
3585  T_CAS_IO *cas_io_p;
3586 
3587  if (0 > shard_id || shard_id >= proxy_Shard_io.max_shard)
3588  {
3589  PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
3590  return NULL;
3591  }
3592  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
3593 
3594  if (0 > cas_id || cas_id >= shard_io_p->max_num_cas)
3595  {
3596  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
3597  return NULL;
3598  }
3599  cas_io_p = &(shard_io_p->ent[cas_id]);
3600 
3601  if (cas_io_p->is_in_tran == false || cas_io_p->ctx_cid != ctx_cid || cas_io_p->ctx_uid != ctx_uid)
3602  {
3603  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(context id:%d, context uid:%d). CAS(%S).",
3604  ctx_cid, ctx_uid, proxy_str_cas_io (cas_io_p));
3605  return NULL;
3606  }
3607 
3608  return cas_io_p;
3609 }
3610 
3611 T_CAS_IO *
3612 proxy_cas_alloc_by_ctx (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, int timeout,
3613  int func_code)
3614 {
3615  int error;
3616  T_SHARD_IO *shard_io_p;
3617  T_CAS_IO *cas_io_p;
3618  T_CLIENT_INFO *client_info_p;
3619 
3620  unsigned int curr_shard_id = 0;
3621  static unsigned int last_shard_id = 0;
3622 
3623  /* valid shard id */
3624  if ((shard_id < 0 && cas_id >= 0) || (shard_id >= proxy_Shard_io.max_shard))
3625  {
3626  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid shard/CAS id is requested. " "(shard_id:%d, cas_id:%d). ", shard_id,
3627  cas_id);
3628 
3629  assert (false);
3630 
3631  return NULL;
3632  }
3633 
3634  if (shard_id >= 0 && cas_id >= 0)
3635  {
3636  cas_io_p = proxy_cas_alloc_by_shard_and_cas_id (client_id, shard_id, cas_id, ctx_cid, ctx_uid);
3637  }
3638  else
3639  {
3640  if (func_code == CAS_FC_CHECK_CAS)
3641  {
3642  cas_io_p =
3643  proxy_cas_alloc_anything (client_id, shard_id, cas_id, ctx_cid, ctx_uid, proxy_find_idle_cas_by_desc);
3644  }
3645  else
3646  {
3647  cas_io_p =
3648  proxy_cas_alloc_anything (client_id, shard_id, cas_id, ctx_cid, ctx_uid, proxy_find_idle_cas_by_conn_info);
3649  if (cas_io_p == NULL)
3650  {
3651  cas_io_p =
3652  proxy_cas_alloc_anything (client_id, shard_id, cas_id, ctx_cid, ctx_uid, proxy_find_idle_cas_by_asc);
3653  }
3654  }
3655  }
3656 
3657  if (cas_io_p == NULL)
3658  {
3659  goto set_waiter;
3660  }
3661 
3662  shard_id = cas_io_p->shard_id;
3663  cas_id = cas_io_p->cas_id;
3664 
3665  client_info_p = shard_shm_get_client_info (proxy_info_p, client_id);
3666  if (client_info_p == NULL)
3667  {
3669  "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_cid, ctx_uid);
3670  }
3671  else
3673  (proxy_info_p, shm_as_p, cas_io_p->shard_id, cas_io_p->cas_id, client_info_p) == false)
3674  {
3675 
3676  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS info in shared memory. " "(shard_id:%d, cas_id:%d).",
3677  cas_io_p->shard_id, cas_io_p->cas_id);
3678  }
3679 
3680  cas_io_p->is_in_tran = true;
3681  cas_io_p->ctx_cid = ctx_cid;
3682  cas_io_p->ctx_uid = ctx_uid;
3683 
3684  proxy_set_conn_info (func_code, ctx_cid, ctx_uid, shard_id, cas_id);
3685 
3686  return cas_io_p;
3687 
3688 set_waiter:
3689  if (shard_id >= 0)
3690  {
3691  curr_shard_id = (unsigned int) shard_id;
3692  }
3693  else
3694  {
3695  curr_shard_id = (unsigned int) (last_shard_id + 1) % proxy_Shard_io.max_shard;
3696 
3697  last_shard_id = curr_shard_id;
3698  }
3699 
3700  shard_io_p = &(proxy_Shard_io.ent[curr_shard_id]);
3701  if (shard_io_p->cur_num_cas <= 0)
3702  {
3704  "Failed to allocate shard/cas. " "No available cas in this shard. "
3705  "Wait until shard has available cas. " "(cur_num_cas:%d, max_num_cas:%d)", shard_io_p->cur_num_cas,
3706  shard_io_p->max_num_cas);
3707  }
3708 
3709  error = proxy_client_add_waiter_by_shard (shard_io_p, ctx_cid, ctx_uid, timeout);
3710  if (error)
3711  {
3712  return NULL;
3713  }
3714 
3716 }
3717 
3718 void
3719 proxy_cas_release_by_ctx (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
3720 {
3721  T_SHARD_IO *shard_io_p;
3722  T_CAS_IO *cas_io_p;
3723 
3724  ENTER_FUNC ();
3725 
3726  if (0 > shard_id || shard_id >= proxy_Shard_io.max_shard)
3727  {
3728  PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
3729  EXIT_FUNC ();
3730  return;
3731  }
3732  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
3733 
3734  if (0 > cas_id || cas_id >= shard_io_p->max_num_cas)
3735  {
3736  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
3737  EXIT_FUNC ();
3738  return;
3739  }
3740  cas_io_p = &(shard_io_p->ent[cas_id]);
3741 
3742  if (cas_io_p->is_in_tran == false || cas_io_p->ctx_cid != ctx_cid || cas_io_p->ctx_uid != ctx_uid)
3743  {
3744  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(context id:%d, context uid:%d). CAS(%S).",
3745  ctx_cid, ctx_uid, proxy_str_cas_io (cas_io_p));
3746 
3747  assert (false);
3748 
3749  EXIT_FUNC ();
3750  return;
3751  }
3752 
3753  cas_io_p->is_in_tran = false;
3754  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
3755  cas_io_p->ctx_uid = 0;
3756 
3757  /* decrease number of cas in transaction */
3758  shard_io_p->num_cas_in_tran--;
3759 
3760  PROXY_DEBUG_LOG ("Shard status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
3761  shard_io_p->shard_id);
3762 
3763  assert (shard_io_p->num_cas_in_tran >= 0);
3764  if (shard_io_p->num_cas_in_tran < 0)
3765  {
3766  shard_io_p->num_cas_in_tran = 0;
3767  }
3768 
3769  /* check and wakeup shard/cas waiter */
3770  proxy_client_check_waiter_and_wakeup (shard_io_p, cas_io_p);
3771 
3772  EXIT_FUNC ();
3773  return;
3774 }
3775 
3776 static int
3777 proxy_client_add_waiter_by_shard (T_SHARD_IO * shard_io_p, int ctx_cid, int ctx_uid, int timeout)
3778 {
3779  int error;
3780  T_WAIT_CONTEXT *waiter_p;
3781  T_SHARD_INFO *shard_info_p = NULL;
3782 
3783  ENTER_FUNC ();
3784 
3785  assert (shard_io_p);
3786 
3787  waiter_p = proxy_waiter_new (ctx_cid, ctx_uid, timeout);
3788  if (waiter_p == NULL)
3789  {
3790  EXIT_FUNC ();
3791  return -1;
3792  }
3793 
3794  PROXY_DEBUG_LOG ("Context(context id:%d, context uid:%u) " "is waiting on shard(shard_id:%d).", ctx_cid, ctx_uid,
3795  shard_io_p->shard_id);
3796 
3797  error = shard_queue_ordered_enqueue (&shard_io_p->waitq, (void *) waiter_p, proxy_waiter_comp_fn);
3798  if (error)
3799  {
3800  EXIT_FUNC ();
3801  return -1;
3802  }
3803 
3804  shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_io_p->shard_id);
3805  if (shard_info_p != NULL)
3806  {
3807  shard_info_p->waiter_count++;
3808  PROXY_DEBUG_LOG ("Add waiter by shard. (waiter_count:%d).", shard_info_p->waiter_count);
3809  }
3810 
3811  EXIT_FUNC ();
3812  return 0;
3813 }
3814 
3815 static void
3817 {
3818  int error;
3819  T_WAIT_CONTEXT *waiter_p = NULL;
3820  T_SHARD_INFO *shard_info_p = NULL;
3821 
3822  ENTER_FUNC ();
3823 
3824  assert (shard_io_p);
3825  assert (cas_io_p);
3826 
3827  waiter_p = (T_WAIT_CONTEXT *) shard_queue_dequeue (&shard_io_p->waitq);
3828  while (waiter_p)
3829  {
3830  PROXY_DEBUG_LOG ("Wakeup waiter by shard. (shard_id:%d, cas_id:%d).", cas_io_p->shard_id, cas_io_p->cas_id);
3831 
3832  shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_io_p->shard_id);
3833  if ((shard_info_p != NULL) && (shard_info_p->waiter_count > 0))
3834  {
3835  shard_info_p->waiter_count--;
3836  PROXY_DEBUG_LOG ("Wakeup context by shard. (waiter_count:%d).", shard_info_p->waiter_count);
3837  }
3838 
3839  error = proxy_wakeup_context_by_shard (waiter_p, cas_io_p->shard_id, cas_io_p->cas_id);
3840  if (error)
3841  {
3842  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to wakeup context by shard. " "(error:%d, shard_id:%d, cas_id:%d).",
3843  error, cas_io_p->shard_id, cas_io_p->cas_id);
3844  FREE_MEM (waiter_p);
3845  continue;
3846  }
3847 
3848  FREE_MEM (waiter_p);
3849  break;
3850  }
3851 
3852  EXIT_FUNC ();
3853  return;
3854 }
3855 
3856 #if !defined(WINDOWS)
3857 static SOCKET
3859 {
3860  SOCKET fd;
3861  int len;
3862 
3863  struct sockaddr_un shard_sock_addr;
3864  char *port_name;
3865 
3866  if ((port_name = shm_as_p->port_name) == NULL)
3867  {
3868  return (-1);
3869  }
3870 
3871  /* FOR DEBUG */
3872  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Connect to broker. (port_name:[%s]).", port_name);
3873 
3874  if ((fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
3875  {
3876  return (-1);
3877  }
3878 
3879  memset (&shard_sock_addr, 0, sizeof (shard_sock_addr));
3880  shard_sock_addr.sun_family = AF_UNIX;
3881  strcpy (shard_sock_addr.sun_path, port_name);
3882 #ifdef _SOCKADDR_LEN /* 4.3BSD Reno and later */
3883  len = sizeof (shard_sock_addr.sun_len) + sizeof (shard_sock_addr.sun_family) + strlen (shard_sock_addr.sun_path) + 1;
3884  shard_sock_addr.sun_len = len;
3885 #else /* vanilla 4.3BSD */
3886  len = strlen (shard_sock_addr.sun_path) + sizeof (shard_sock_addr.sun_family) + 1;
3887 #endif
3888 
3889  if (connect (fd, (struct sockaddr *) &shard_sock_addr, len) != 0)
3890  {
3891  CLOSESOCKET (fd);
3892  return (-4);
3893  }
3894 
3895  return fd;
3896 }
3897 
3898 static int
3900 {
3901  SOCKET fd = INVALID_SOCKET;
3902  int error;
3903  int tmp_proxy_id;
3904  int len;
3905  int num_retry = 0;
3906 
3907  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Register to broker. ");
3908 
3909  do
3910  {
3911  sleep (1);
3913  }
3914  while (IS_INVALID_SOCKET (fd) && (num_retry++) < 5);
3915  if (IS_INVALID_SOCKET (fd))
3916  {
3917  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to connect to broker. (fd:%d).", fd);
3918  return -1;
3919  }
3920 
3921  tmp_proxy_id = htonl (proxy_id);
3922  len = WRITESOCKET (fd, &tmp_proxy_id, sizeof (tmp_proxy_id));
3923  if (len != sizeof (tmp_proxy_id))
3924  {
3925  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to register to broker. (fd:%d).", fd);
3926  return -1;
3927  }
3928 
3929 #if defined(LINUX)
3930  shard_io_set_fl (fd, O_NONBLOCK);
3931 
3932  error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLPRI);
3933  if (error < 0)
3934  {
3935  CLOSE_SOCKET (fd);
3936  return -1;
3937  }
3938 #else /* LINUX */
3939  FD_SET (fd, &allset);
3940  maxfd = max (maxfd, fd);
3941 #endif /* !LINUX */
3942  broker_conn_fd = fd;
3943 
3944  return 0;
3945 }
3946 #endif /* !WINDOWS */
3947 
3948 #if defined(WINDOWS)
3949 static int
3950 proxy_io_inet_lsnr (int port)
3951 {
3952  SOCKET fd;
3953  int len, backlog_size;
3954  int one = 1;
3955  struct sockaddr_in shard_sock_addr;
3956 
3957  fd = socket (AF_INET, SOCK_STREAM, 0);
3958  if (IS_INVALID_SOCKET (fd))
3959  {
3960  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Fail to create socket. (Error:%d).", WSAGetLastError ());
3961  return (-1);
3962  }
3963  if ((setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) < 0)
3964  {
3965  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Fail to set socket option. (Error:%d).", WSAGetLastError ());
3966  return (-1);
3967  }
3968 
3969  memset (&shard_sock_addr, 0, sizeof (struct sockaddr_in));
3970  shard_sock_addr.sin_family = AF_INET;
3971  shard_sock_addr.sin_port = htons ((unsigned short) (port));
3972  len = sizeof (struct sockaddr_in);
3973  shard_sock_addr.sin_addr.s_addr = htonl (INADDR_ANY);
3974 
3975  /* bind the name to the descriptor */
3976  if (bind (fd, (struct sockaddr *) &shard_sock_addr, len) < 0)
3977  {
3978  CLOSESOCKET (fd);
3979  return (-2);
3980  }
3981 
3982  /* SHARD TODO -- modify backlog size to max_client_size or other rule -- tigger */
3983  backlog_size = 10;
3984  if (listen (fd, backlog_size) < 0)
3985  {
3986  /* tell kernel we're a server */
3987  CLOSESOCKET (fd);
3988  return (-3);
3989  }
3990 
3991  return fd;
3992 }
3993 #else /* WINDOWS */
3994 static int
3995 proxy_io_unixd_lsnr (char *unixd_sock_name)
3996 {
3997  SOCKET fd;
3998  int len, backlog_size;
3999  struct sockaddr_un shard_sock_addr;
4000 
4001  if ((fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
4002  {
4003  return (-1);
4004  }
4005 
4006  memset (&shard_sock_addr, 0, sizeof (shard_sock_addr));
4007  shard_sock_addr.sun_family = AF_UNIX;
4008  strcpy (shard_sock_addr.sun_path, unixd_sock_name);
4009 
4010 #ifdef _SOCKADDR_LEN /* 4.3BSD Reno and later */
4011  len = sizeof (shard_sock_addr.sun_len) + sizeof (shard_sock_addr.sun_family) + strlen (shard_sock_addr.sun_path) + 1;
4012  shard_sock_addr.sun_len = len;
4013 #else /* vanilla 4.3BSD */
4014  len = strlen (shard_sock_addr.sun_path) + sizeof (shard_sock_addr.sun_family) + 1;
4015 #endif
4016 
4017  /* bind the name to the descriptor */
4018  if (bind (fd, (struct sockaddr *) &shard_sock_addr, len) < 0)
4019  {
4020  CLOSESOCKET (fd);
4021  return (-2);
4022  }
4023 
4024  /* SHARD TODO -- modify backlog size to max_client_size or other rule -- tigger */
4025  backlog_size = 10;
4026  if (listen (fd, backlog_size) < 0)
4027  {
4028  /* tell kernel we're a server */
4029  CLOSESOCKET (fd);
4030  return (-3);
4031  }
4032 
4033  return fd;
4034 }
4035 #endif /* !WINDOWS */
4036 
4037 static int
4039 {
4040  int error = 0;
4041  SOCKET fd;
4042 
4043 #if defined(WINDOWS)
4044  int port = GET_CAS_PORT (broker_port, proxy_info_p->proxy_id,
4045  shm_proxy_p->num_proxy);
4046 
4047  /* FOR DEBUG */
4048  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Listen CAS socket. (port number:[%d])", port);
4049 
4050  fd = proxy_io_inet_lsnr (port);
4051 #else /* WINDOWS */
4052  /* FOR DEBUG */
4053  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Listen CAS socket. (port name:[%s])", proxy_info_p->port_name);
4054 
4055  fd = proxy_io_unixd_lsnr (proxy_info_p->port_name);
4056 #endif /* !WINDOWS */
4057 
4058  if (IS_INVALID_SOCKET (fd))
4059  {
4060  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to listen CAS socket. (fd:%d).", fd);
4061  return -1; /* FAIELD */
4062  }
4063 
4064 #if defined(LINUX)
4065  shard_io_set_fl (fd, O_NONBLOCK);
4066 
4067  error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLPRI);
4068  if (error < 0)
4069  {
4070  CLOSE_SOCKET (fd);
4071  return -1;
4072  }
4073 #else /* LINUX */
4074  FD_SET (fd, &allset);
4075  maxfd = max (maxfd, fd);
4076 #endif /* !LINUX */
4077  cas_lsnr_fd = fd;
4078 
4079  return 0; /* SUCCESS */
4080 }
4081 
4082 #if defined(WINDOWS)
4083 static int
4084 proxy_io_client_lsnr (void)
4085 {
4086  SOCKET fd;
4087 
4088  int port = GET_CLIENT_PORT (broker_port, proxy_info_p->proxy_id);
4089 
4090  /* FOR DEBUG */
4091  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Listen Client socket. " "(port number:[%d])", port);
4092 
4093  fd = proxy_io_inet_lsnr (port);
4094  if (IS_INVALID_SOCKET (fd))
4095  {
4096  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to listen Client socket. (fd:%d).", fd);
4097  return -1; /* FAIELD */
4098  }
4099 
4100  proxy_info_p->proxy_port = port;
4101  client_lsnr_fd = fd;
4102  FD_SET (client_lsnr_fd, &allset);
4103  maxfd = max (maxfd, client_lsnr_fd);
4104 
4105  return 0;
4106 }
4107 #endif
4108 
4109 static SOCKET
4111 {
4112  socklen_t len;
4113  SOCKET fd;
4114 #if defined(WINDOWS)
4115  struct sockaddr_in shard_sock_addr;
4116 #else /* WINDOWS */
4117  struct sockaddr_un shard_sock_addr;
4118 #endif /* !WINDOWS */
4119 
4120  len = sizeof (shard_sock_addr);
4121  fd = accept (lsnr_fd, (struct sockaddr *) &shard_sock_addr, &len);
4122  if (IS_INVALID_SOCKET (fd))
4123  {
4124  return (-1);
4125  }
4126 
4127 #if defined(WINDOWS)
4128  memcpy (&accept_ip_addr, &(shard_sock_addr.sin_addr), 4);
4129 #endif /* WINDOWS */
4130 
4131  return (fd);
4132 }
4133 
4134 static SOCKET
4136 {
4137  return proxy_io_accept (lsnr_fd);
4138 }
4139 
4140 #if defined(WINDOWS)
4141 static SOCKET
4142 proxy_io_client_accept (SOCKET lsnr_fd)
4143 {
4144  return proxy_io_accept (lsnr_fd);
4145 }
4146 #endif
4147 
4148 int
4150 {
4151  int error;
4152 
4153 #if defined(LINUX)
4154  max_Socket = proxy_get_max_socket ();
4155  assert (max_Socket > PROXY_RESERVED_FD);
4156 
4157  /* create epoll */
4158  ep_Fd = epoll_create (max_Socket);
4159  if (ep_Fd == INVALID_SOCKET)
4160  {
4161  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to create epoll fd. (errno=%d[%s])", errno, strerror (errno));
4162  return -1;
4163  }
4164 
4165  ep_Event = (epoll_event *) calloc (max_Socket, sizeof (struct epoll_event));
4166  if (ep_Event == NULL)
4167  {
4168  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory for epoll event. (error:%d[%s]).", errno,
4169  strerror (errno));
4170  return -1;
4171  }
4172 
4173 #else /* LINUX */
4174  /* clear fds */
4175  FD_ZERO (&allset);
4176  FD_ZERO (&wnewset);
4177 #endif /* !LINUX */
4178 
4179 #if defined(WINDOWS)
4180  broker_port = shm_as_p->broker_port;
4181 
4182  if (broker_port <= 0)
4183  {
4184  return (-1);
4185  }
4186 
4187  /* make listener for client */
4188  error = proxy_io_client_lsnr ();
4189  if (error)
4190  {
4191  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize Client socket listener. (error:%d).", error);
4192  return -1;
4193  }
4194 #else /* WINDOWS */
4195  /* register to broker */
4196  error = proxy_io_register_to_broker ();
4197  if (error)
4198  {
4199  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to register to broker. (error:%d).", error);
4200  return -1;
4201  }
4202 #endif /* !WINDOWS */
4203 
4204  /* make listener for cas */
4205  error = proxy_io_cas_lsnr ();
4206  if (error)
4207  {
4208  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize CAS socket listener. (error:%d).", error);
4209  return -1;
4210  }
4211 
4212  error = proxy_socket_io_initialize ();
4213  if (error)
4214  {
4215  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize socket entries. (error:%d).", error);
4216  return error;
4217  }
4218 
4219  /* initialize client io */
4220  error = proxy_client_io_initialize ();
4221  if (error)
4222  {
4223  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize client entries. (error:%d).", error);
4224  return error;
4225  }
4226 
4227  /* initialize shard/cas io */
4228  error = proxy_shard_io_initialize ();
4229  if (error)
4230  {
4231  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize shard/CAS entries. (error:%d).", error);
4232  return error;
4233  }
4234 
4235  return 0;
4236 }
4237 
4238 void
4240 {
4245 
4246 #if defined (LINUX)
4247  FREE_MEM (ep_Event);
4248 #endif /* LINUX */
4249 
4250  return;
4251 }
4252 
4253 int
4255 {
4256  int i;
4257  T_SOCKET_IO *sock_io_p;
4258 
4259  /* close client/cas fd */
4260  for (i = 0; i < proxy_Socket_io.max_socket; i++)
4261  {
4262  sock_io_p = &(proxy_Socket_io.ent[i]);
4263  if (sock_io_p->fd == INVALID_SOCKET)
4264  {
4265  continue;
4266  }
4267  CLOSE_SOCKET (sock_io_p->fd);
4268  }
4269 
4270  /* close cas listen fd */
4271  if (cas_lsnr_fd != INVALID_SOCKET)
4272  {
4274  }
4275 
4276 #if defined(WINDOWS)
4277  /* close client listen fd */
4278  if (client_lsnr_fd != INVALID_SOCKET)
4279  {
4280  CLOSE_SOCKET (client_lsnr_fd);
4281  }
4282 #else /* WINDOWS */
4283  /* close broker connection fd */
4285  {
4287  }
4288 #endif /* !WINDOWS */
4289 
4290 #if defined(LINUX)
4291  if (ep_Fd != INVALID_SOCKET)
4292  {
4293  CLOSE_SOCKET (ep_Fd);
4294  }
4295 #endif /* LINUX */
4296  return 0;
4297 }
4298 
4299 int
4301 {
4302  int cas_fd;
4303  int i;
4304 #if defined(WINDOWS)
4305  int client_fd;
4306 #endif
4307  int n;
4308 
4309 #if defined(LINUX)
4310  int sock_fd;
4311  int timeout;
4312 #else /* LINUX */
4313  struct timeval tv;
4314 #endif /* !LINUX */
4315 
4316  T_SOCKET_IO *sock_io_p = NULL;
4317 
4318 #if defined(LINUX)
4319  timeout = 1000 / HZ;
4320 
4321  n = epoll_wait (ep_Fd, ep_Event, max_Socket, timeout);
4322 #else /* LINUX */
4323  rset = allset;
4324  wset = wallset = wnewset;
4325 
4326  tv.tv_sec = 0;
4327  tv.tv_usec = 1000000 / HZ;
4328  n = select (maxfd + 1, &rset, &wset, NULL, &tv);
4329 #endif /* !LINUX */
4330  if (n < 0)
4331  {
4332  if (errno != EINTR)
4333  {
4334  perror ("select error");
4335  return -1;
4336  }
4337  }
4338  else if (n == 0)
4339  {
4340  /* print_statistics (); */
4341  return 0; /* or -1 */
4342  }
4343 
4344 #if defined(LINUX)
4345  for (i = 0; i < n; i++)
4346  {
4347  if (cas_lsnr_fd == ep_Event[i].data.fd) /* new cas */
4348  {
4349  if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
4350  {
4351  proxy_Keep_running = false;
4352  return -1;
4353  }
4354  else if (ep_Event[i].events & EPOLLIN || ep_Event[i].events & EPOLLPRI)
4355  {
4356  cas_fd = proxy_io_cas_accept (cas_lsnr_fd);
4357  if (cas_fd >= 0)
4358  {
4359  if (proxy_socket_io_add (cas_fd, PROXY_IO_FROM_CAS) == NULL)
4360  {
4361  PROXY_DEBUG_LOG ("Close socket. (fd:%d). \n", cas_fd);
4362  CLOSE_SOCKET (cas_fd);
4363  }
4364  }
4365  else
4366  {
4367  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Accept socket failure. (fd:%d).", cas_fd);
4368  return 0; /* or -1 */
4369  }
4370 
4371  }
4372  }
4373  else if (broker_conn_fd == ep_Event[i].data.fd) /* new client */
4374  {
4375  if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
4376  {
4377  proxy_Keep_running = false;
4378  return -1;
4379  }
4380  else if (ep_Event[i].events & EPOLLIN || ep_Event[i].events & EPOLLPRI)
4381  {
4383  }
4384  }
4385  else
4386  {
4387  sock_fd = ep_Event[i].data.fd;
4388  assert (sock_fd <= proxy_Socket_io.max_socket);
4389 
4390  sock_io_p = &(proxy_Socket_io.ent[sock_fd]);
4391  if (sock_io_p->fd != sock_fd)
4392  {
4393  assert (false);
4394  continue;
4395  }
4396 
4397  if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
4398  {
4399  if (ep_Event[i].events & EPOLLIN)
4400  {
4401  proxy_socket_io_read (sock_io_p);
4402  }
4403  else
4404  {
4405  proxy_socket_io_read_error (sock_io_p);
4406  }
4407  }
4408  else
4409  {
4410  if (ep_Event[i].events & EPOLLOUT)
4411  {
4412  proxy_socket_io_write (sock_io_p);
4413  }
4414 
4415  if (ep_Event[i].events & EPOLLIN)
4416  {
4417  proxy_socket_io_read (sock_io_p);
4418  }
4419  }
4420  }
4421  }
4422 #else /* LINUX */
4423 
4424  /* new cas */
4425  if (FD_ISSET (cas_lsnr_fd, &rset))
4426  {
4427  cas_fd = proxy_io_cas_accept (cas_lsnr_fd);
4428  if (cas_fd >= 0)
4429  {
4430  if (proxy_socket_io_add (cas_fd, PROXY_IO_FROM_CAS) == NULL)
4431  {
4432  PROXY_DEBUG_LOG ("Close socket. (fd:%d). \n", cas_fd);
4433  CLOSE_SOCKET (cas_fd);
4434  }
4435  }
4436  else
4437  {
4438  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Accept socket failure. (fd:%d).", cas_fd);
4439  return 0; /* or -1 */
4440  }
4441  }
4442 
4443  /* new client */
4444 #if defined(WINDOWS)
4445  if (FD_ISSET (client_lsnr_fd, &rset))
4446  {
4447  client_fd = proxy_io_client_accept (client_lsnr_fd);
4448  if (client_fd < 0)
4449  {
4450  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Accept socket failure. (fd:%d).", client_fd);
4451  return 0; /* or -1 */
4452  }
4453  proxy_socket_io_new_client (client_fd);
4454  }
4455 #else /* WINDOWS */
4456  if (FD_ISSET (broker_conn_fd, &rset))
4457  {
4459  }
4460 #endif /* !WINDOWS */
4461 
4462  /* process socket io */
4463  for (i = 0; i <= maxfd; i++)
4464  {
4465  sock_io_p = &(proxy_Socket_io.ent[i]);
4466  if (IS_INVALID_SOCKET (sock_io_p->fd))
4467  {
4468  continue;
4469  }
4470  if (!IS_INVALID_SOCKET (sock_io_p->fd) && FD_ISSET (sock_io_p->fd, &wset))
4471  {
4472  proxy_socket_io_write (sock_io_p);
4473  }
4474  if (!IS_INVALID_SOCKET (sock_io_p->fd) && FD_ISSET (sock_io_p->fd, &rset))
4475  {
4476  proxy_socket_io_read (sock_io_p);
4477  }
4478  }
4479 #endif /* !LINUX */
4480 
4481  return 0;
4482 }
4483 
4484 char *
4486 {
4487  static char dummy_info[SRV_CON_CLIENT_INFO_SIZE] = {
4488  'C', 'U', 'B', 'R', 'K',
4492  0,
4493  0
4494  };
4495  T_CLIENT_IO *cli_io_p;
4496 
4497  assert (ctx_p);
4498 
4499  if (ctx_p == NULL)
4500  {
4501  return dummy_info;
4502  }
4503 
4504  cli_io_p = proxy_client_io_find_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);
4505  if (cli_io_p == NULL)
4506  {
4507  return dummy_info;
4508  }
4509 
4510  return cli_io_p->driver_info;
4511 
4512 }
4513 
4514 char *
4516 {
4517  static char dummy_info[SRV_CON_CLIENT_INFO_SIZE] = {
4518  'C', 'U', 'B', 'R', 'K',
4522  0,
4523  0
4524  };
4525  T_CLIENT_IO *cli_io_p;
4526 
4527  assert (sock_io_p);
4528 
4529  if (sock_io_p == NULL)
4530  {
4531  return dummy_info;
4532  }
4533 
4534  cli_io_p = proxy_client_io_find_by_fd (sock_io_p->id.client_id, sock_io_p->fd);
4535  if (cli_io_p == NULL)
4536  {
4537  return dummy_info;
4538  }
4539 
4540  return cli_io_p->driver_info;
4541 
4542 }
4543 
4544 void
4546 {
4547  T_SHARD_IO *shard_io_p;
4548  T_SHARD_INFO *shard_info_p = NULL;
4549  T_WAIT_CONTEXT *waiter_p;
4550  int i, now;
4551 
4552  now = time (NULL);
4553 
4554  for (i = 0; i < proxy_Shard_io.max_shard; i++)
4555  {
4556  shard_io_p = &(proxy_Shard_io.ent[i]);
4557  proxy_waiter_timeout (&shard_io_p->waitq, (shard_info_p != NULL) ? &shard_info_p->waiter_count : NULL, now);
4558 
4559  waiter_p = (T_WAIT_CONTEXT *) shard_queue_peek_value (&shard_io_p->waitq);
4560  if (waiter_p == NULL)
4561  {
4562  shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_io_p->shard_id);
4563  if ((shard_info_p != NULL) && (shard_info_p->waiter_count > 0))
4564  {
4565  shard_info_p->waiter_count = 0;
4566  PROXY_DEBUG_LOG ("Clear shard(%d) waiter queue by timer.", shard_io_p->shard_id);
4567  }
4568  }
4569  }
4570 
4571  return;
4572 }
4573 
4574 static void
4575 proxy_set_conn_info (int func_code, int ctx_cid, int ctx_uid, int shard_id, int cas_id)
4576 {
4577  T_PROXY_CONTEXT *ctx_p = NULL;
4578  T_APPL_SERVER_INFO *as_info_p = NULL;
4579 
4580  ctx_p = proxy_context_find (ctx_cid, ctx_uid);
4581  assert (ctx_p != NULL);
4582 
4583  as_info_p = shard_shm_get_as_info (proxy_info_p, shm_as_p, shard_id, cas_id);
4584  assert (as_info_p != NULL);
4585 
4586  if (func_code == CAS_FC_CHECK_CAS)
4587  {
4588  as_info_p->force_reconnect = true;
4589  }
4590 
4591  if (proxy_info_p->appl_server == APPL_SERVER_CAS_MYSQL)
4592  {
4593  if (strcmp (as_info_p->database_user, ctx_p->database_user) == 0
4594  && strcmp (as_info_p->database_passwd, ctx_p->database_passwd) == 0)
4595  {
4596  return;
4597  }
4598  }
4599  else
4600  {
4601  if (strcasecmp (as_info_p->database_user, ctx_p->database_user) == 0
4602  && strcmp (as_info_p->database_passwd, ctx_p->database_passwd) == 0)
4603  {
4604  return;
4605  }
4606  }
4607 
4608  /* this cas will reconnect to database. */
4609  shard_stmt_del_all_srv_h_id_for_shard_cas (shard_id, cas_id);
4610 
4611  strncpy_bufsize (as_info_p->database_user, ctx_p->database_user);
4612  strncpy_bufsize (as_info_p->database_passwd, ctx_p->database_passwd);
4613 }
4614 
4615 static T_CAS_IO *
4616 proxy_cas_alloc_by_shard_and_cas_id (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
4617 {
4618  T_SHARD_IO *shard_io_p = NULL;
4619  T_CAS_IO *cas_io_p = NULL;
4620 
4621  assert (shard_id >= 0 && cas_id >= 0);
4622 
4623  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
4624 
4625  if (0 > cas_id || cas_id >= shard_io_p->max_num_cas)
4626  {
4627  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find available CAS. " "(shard_id:%d, cas_id:%d, max_num_cas:%d).",
4628  shard_id, cas_id, shard_io_p->max_num_cas);
4629  return NULL;
4630  }
4631  cas_io_p = &(shard_io_p->ent[cas_id]);
4632 
4633  if ((cas_io_p->ctx_cid != PROXY_INVALID_CONTEXT && cas_io_p->ctx_cid != ctx_cid)
4634  || (cas_io_p->ctx_uid != 0 && cas_io_p->ctx_uid != ctx_uid))
4635  {
4636  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS status. " "(context id:%d, context uid:%d). CAS(%s). ", ctx_cid,
4637  ctx_uid, proxy_str_cas_io (cas_io_p));
4638 
4639  assert (false);
4640  return NULL;
4641  }
4642 
4643  if (cas_io_p->status != CAS_IO_CONNECTED)
4644  {
4645  PROXY_DEBUG_LOG ("Unexpected CAS status. (context id:%d, context uid:%d). " "CAS(%s). ", ctx_cid, ctx_uid,
4646  proxy_str_cas_io (cas_io_p));
4647  return NULL;
4648  }
4649 
4650  if (cas_io_p->ctx_cid == PROXY_INVALID_CONTEXT)
4651  {
4652  shard_io_p->num_cas_in_tran++;
4653 
4654  PROXY_DEBUG_LOG ("Shard IO status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
4655  shard_io_p->shard_id);
4656 
4657  assert (shard_io_p->cur_num_cas >= shard_io_p->num_cas_in_tran);
4658  assert (cas_io_p->is_in_tran == false);
4659  assert (cas_io_p->ctx_uid == 0);
4660  }
4661 
4662  return cas_io_p;
4663 }
4664 
4665 static T_CAS_IO *
4666 proxy_find_idle_cas_by_asc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
4667 {
4668  int i = 0;
4669  T_CAS_IO *cas_io_p = NULL;
4670  T_SHARD_IO *shard_io_p = NULL;
4671 
4672  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
4673 
4674  for (i = 0; i < shard_io_p->cur_num_cas; i++)
4675  {
4676  cas_io_p = &(shard_io_p->ent[i]);
4677 
4678  if (cas_io_p->is_in_tran || cas_io_p->status != CAS_IO_CONNECTED)
4679  {
4680  continue;
4681  }
4682 
4683  return cas_io_p;
4684  }
4685 
4686  return NULL;
4687 }
4688 
4689 static T_CAS_IO *
4690 proxy_find_idle_cas_by_desc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
4691 {
4692  int i = 0;
4693  T_CAS_IO *cas_io_p = NULL;
4694  T_SHARD_IO *shard_io_p = NULL;
4695 
4696  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
4697 
4698  for (i = shard_io_p->cur_num_cas - 1; i >= 0; i--)
4699  {
4700  cas_io_p = &(shard_io_p->ent[i]);
4701 
4702  if (cas_io_p->is_in_tran || cas_io_p->status != CAS_IO_CONNECTED)
4703  {
4704  continue;
4705  }
4706 
4707  return cas_io_p;
4708  }
4709 
4710  return NULL;
4711 }
4712 
4713 static T_CAS_IO *
4714 proxy_find_idle_cas_by_conn_info (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
4715 {
4716  int i = 0;
4717  T_CAS_IO *cas_io_p = NULL;
4718  T_SHARD_IO *shard_io_p = NULL;
4719  T_PROXY_CONTEXT *ctx_p = NULL;
4720  T_APPL_SERVER_INFO *as_info_p = NULL;
4721 
4722  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
4723 
4724  ctx_p = proxy_context_find (ctx_cid, ctx_uid);
4725  assert (ctx_p != NULL);
4726 
4727  for (i = 0; i < shard_io_p->cur_num_cas; i++)
4728  {
4729  as_info_p = shard_shm_get_as_info (proxy_info_p, shm_as_p, shard_id, i);
4730 
4731  cas_io_p = &(shard_io_p->ent[i]);
4732 
4733  if (cas_io_p->is_in_tran || cas_io_p->status != CAS_IO_CONNECTED)
4734  {
4735  continue;
4736  }
4737 
4738  if (proxy_info_p->appl_server == APPL_SERVER_CAS_MYSQL)
4739  {
4740  if (strcmp (as_info_p->database_user, ctx_p->database_user)
4741  || strcmp (as_info_p->database_passwd, ctx_p->database_passwd))
4742  {
4743  continue;
4744  }
4745  }
4746  else
4747  {
4748  if (strcasecmp (as_info_p->database_user, ctx_p->database_user)
4749  || strcmp (as_info_p->database_passwd, ctx_p->database_passwd))
4750  {
4751  continue;
4752  }
4753  }
4754 
4755  return cas_io_p;
4756  }
4757 
4758  return NULL;
4759 }
4760 
4761 static T_CAS_IO *
4762 proxy_cas_alloc_anything (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid,
4763  T_FUNC_FIND_CAS function)
4764 {
4765  int i = 0;
4766  T_SHARD_IO *shard_io_p = NULL;
4767  T_CAS_IO *cas_io_p = NULL;
4768  static int last_shard_id = -1;
4769 
4770  if (shard_id >= 0)
4771  {
4772  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
4773 
4774  cas_io_p = function (shard_id, cas_id, ctx_cid, ctx_uid);
4775  }
4776  else
4777  {
4778  /* select any shard */
4779  shard_id = last_shard_id;
4780  for (i = 0; i < proxy_Shard_io.max_shard; i++)
4781  {
4782  shard_id = (shard_id + 1) % proxy_Shard_io.max_shard;
4783  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
4784  assert (shard_io_p->cur_num_cas >= shard_io_p->num_cas_in_tran);
4785 
4786  if (shard_io_p->cur_num_cas == shard_io_p->num_cas_in_tran)
4787  {
4788  continue;
4789  }
4790 
4791  cas_io_p = function (shard_id, cas_id, ctx_cid, ctx_uid);
4792  if (cas_io_p != NULL)
4793  {
4794  break;
4795  }
4796  }
4797 
4798  if (i >= proxy_Shard_io.max_shard)
4799  {
4800  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Unable to find avaiable shard. (index:%d, max_shard:%d).", i,
4801  proxy_Shard_io.max_shard);
4802 
4803  return NULL;
4804  }
4805 
4806  last_shard_id = shard_id;
4807 
4808  assert (cas_io_p != NULL);
4809  }
4810 
4811  if (cas_io_p != NULL)
4812  {
4813  shard_io_p->num_cas_in_tran++;
4814 
4815  PROXY_DEBUG_LOG ("Shard status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
4816  shard_io_p->shard_id);
4817 
4818  assert (shard_io_p->cur_num_cas >= shard_io_p->num_cas_in_tran);
4819  assert (cas_io_p->is_in_tran == false);
4820  assert (cas_io_p->ctx_cid == PROXY_INVALID_CONTEXT);
4821  assert (cas_io_p->ctx_uid == 0);
4822  assert (cas_io_p->fd != INVALID_SOCKET);
4823  }
4824 
4825  return cas_io_p;
4826 }
4827 
4828 static int
4829 proxy_check_authorization (T_PROXY_CONTEXT * ctx_p, const char *db_name, const char *db_user, const char *db_passwd)
4830 {
4831  T_SHARD_USER *user_p = NULL;
4832  char err_msg[256];
4833 
4834  user_p = shard_metadata_get_shard_user (shm_user_p);
4835  assert (user_p);
4836 
4837  if (proxy_info_p->appl_server == APPL_SERVER_CAS_MYSQL)
4838  {
4839  if (strcmp (db_name, user_p->db_name))
4840  {
4841  goto authorization_error;
4842  }
4843 
4844  if (proxy_info_p->fixed_shard_user == false)
4845  {
4846  return 0;
4847  }
4848 
4849  if (strcmp (db_user, user_p->db_user) || strcmp (db_passwd, user_p->db_password))
4850  {
4851  goto authorization_error;
4852  }
4853  }
4854  else
4855  {
4856  if (strcasecmp (db_name, user_p->db_name))
4857  {
4858  goto authorization_error;
4859  }
4860 
4861  if (proxy_info_p->fixed_shard_user == false)
4862  {
4863  return 0;
4864  }
4865 
4866  if (strcasecmp (db_user, user_p->db_user) || strcmp (db_passwd, user_p->db_password))
4867  {
4868  goto authorization_error;
4869  }
4870  }
4871 
4872  return 0;
4873 
4874 authorization_error:
4875  snprintf (err_msg, sizeof (err_msg), "Authorization error.");
4877 
4878  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Authentication failure. " "(db_name:[%s], db_user:[%s], db_passwd:[%s]).", db_name,
4879  db_user, db_passwd);
4880 
4881  return -1;
4882 }
4883 
4884 int
4885 proxy_convert_error_code (int error_ind, int error_code, char *driver_info, T_BROKER_VERSION client_version,
4886  bool to_new)
4887 {
4888  if (error_code >= 0)
4889  {
4890  assert (error_code == 0);
4891  return error_code;
4892  }
4893 
4894  if (client_version < CAS_MAKE_VER (8, 3, 0))
4895  {
4896  if (to_new == PROXY_CONV_ERR_TO_NEW)
4897  {
4898  error_code = CAS_CONV_ERROR_TO_NEW (error_code);
4899  }
4900  else
4901  {
4902  error_code = CAS_CONV_ERROR_TO_OLD (error_code);
4903  }
4904  }
4905  else if (!DOES_CLIENT_MATCH_THE_PROTOCOL (client_version, PROTOCOL_V2)
4906  && !cas_di_understand_renewed_error_code (driver_info) && error_code != NO_ERROR)
4907  {
4908  if (error_ind == CAS_ERROR_INDICATOR || error_code == CAS_ER_NOT_AUTHORIZED_CLIENT)
4909  {
4910  if (to_new == PROXY_CONV_ERR_TO_NEW)
4911  {
4912  error_code = CAS_CONV_ERROR_TO_NEW (error_code);
4913  }
4914  else
4915  {
4916  error_code = CAS_CONV_ERROR_TO_OLD (error_code);
4917  }
4918  }
4919  }
4920 
4921  return error_code;
4922 }
4923 
4924 #if defined(LINUX)
4925 static int
4926 proxy_get_max_socket (void)
4927 {
4928  int max_socket = 0;
4929  T_SHARD_INFO *first_shard_info_p;
4930 
4931  first_shard_info_p = shard_shm_find_shard_info (proxy_info_p, 0);
4932  assert (first_shard_info_p != NULL);
4933 
4934  max_socket = proxy_info_p->max_context;
4935  max_socket += (proxy_info_p->max_shard * first_shard_info_p->max_appl_server);
4936 
4937  return max_socket;
4938 }
4939 
4940 static int
4941 proxy_add_epoll_event (int fd, unsigned int events)
4942 {
4943  int error;
4944  struct epoll_event ep_ev;
4945 
4946  assert (ep_Fd != INVALID_SOCKET);
4947 
4948  memset (&ep_ev, 0, sizeof (struct epoll_event));
4949  ep_ev.data.fd = fd;
4950  ep_ev.events = events;
4951  error = epoll_ctl (ep_Fd, EPOLL_CTL_ADD, fd, &ep_ev);
4952  if (error == -1)
4953  {
4954  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to add epoll event. (fd:%d), (error=%d[%s])", fd, errno,
4955  strerror (errno));
4956  CLOSE_SOCKET (fd);
4957  return error;
4958  }
4959 
4960  return error;
4961 }
4962 
4963 static int
4964 proxy_mod_epoll_event (int fd, unsigned int events)
4965 {
4966  int error;
4967  struct epoll_event ep_ev;
4968 
4969  assert (ep_Fd != INVALID_SOCKET);
4970 
4971  memset (&ep_ev, 0, sizeof (struct epoll_event));
4972  ep_ev.data.fd = fd;
4973  ep_ev.events = events;
4974  error = epoll_ctl (ep_Fd, EPOLL_CTL_MOD, fd, &ep_ev);
4975  if (error == -1)
4976  {
4977  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to modify epoll event. (fd:%d), (error=%d[%s])", fd, errno,
4978  strerror (errno));
4979  CLOSE_SOCKET (fd);
4980  return error;
4981  }
4982 
4983  return error;
4984 }
4985 
4986 static int
4987 proxy_del_epoll_event (int fd)
4988 {
4989  int error;
4990  struct epoll_event ep_ev;
4991 
4992  assert (ep_Fd != INVALID_SOCKET);
4993 
4994  memset (&ep_ev, 0, sizeof (struct epoll_event));
4995  ep_ev.data.fd = INVALID_SOCKET;
4996  /* events will be ignored, and it is only for portability */
4997  ep_ev.events = 0;
4998  error = epoll_ctl (ep_Fd, EPOLL_CTL_DEL, fd, &ep_ev);
4999  if (error == -1)
5000  {
5001  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to delete epoll event. (fd:%d), (error=%d[%s])", fd, errno,
5002  strerror (errno));
5003  CLOSE_SOCKET (fd);
5004  return error;
5005  }
5006 
5007  return error;
5008 
5009 }
5010 #endif /* LINUX */
int proxy_io_set_established_by_ctx(T_PROXY_CONTEXT *ctx_p)
int proxy_io_make_end_tran_request(char *driver_info, char **buffer, bool commit)
static void proxy_init_net_buf(T_NET_BUF *net_buf)
static int proxy_io_unixd_lsnr(char *unixd_sock_name)
void proxy_socket_io_print(bool print_all)
#define NET_SIZE_BYTE
Definition: cas_network.h:41
static int proxy_process_client_register(T_SOCKET_IO *sock_io_p)
static int proxy_process_client_message(T_SOCKET_IO *sock_io_p)
fd_set wset
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
Definition: broker_shm.h:391
void proxy_set_force_out_tran(char *msg)
static void proxy_shard_io_destroy(void)
int proxy_make_net_buf(T_NET_BUF *net_buf, int size, T_BROKER_VERSION client_version)
#define MSG_HEADER_SIZE
Definition: cas_protocol.h:114
#define NO_ERROR
Definition: error_code.h:46
void set_data_length(char *buffer, int length)
void proxy_event_free(T_PROXY_EVENT *event_p)
void init_msg_header(MSG_HEADER *header)
int net_decode_str(char *msg, int msg_size, char *func_code, void ***ret_argv)
int proxy_io_make_shard_info(char *driver_info, char **buffer)
#define CAS_INFO_FLAG_MASK_FORCE_OUT_TRAN
Definition: cas_protocol.h:106
void proxy_set_con_status_out_tran(char *msg)
int proxy_wakeup_context_by_shard(T_WAIT_CONTEXT *waiter_p, int shard_id, int cas_id)
static int proxy_cas_io_initialize(int shard_id, T_CAS_IO **cas_io_pp, int size)
void proxy_io_buffer_clear(T_IO_BUFFER *io_buffer)
T_SHM_SHARD_USER * shm_user_p
Definition: shard_proxy.c:50
int SOCKET
Definition: porting.h:482
static int read_buffer(SOCKET sock_fd, char *buf, int size)
Definition: cas_network.c:527
fd_set wallset
int net_buf_cp_str(T_NET_BUF *net_buf, const char *buf, int size)
Definition: cas_net_buf.c:110
#define SRV_CON_CLIENT_INFO_SIZE
Definition: cas_protocol.h:34
#define PROXY_IO_FROM_CLIENT
static int proxy_socket_io_write_internal(T_SOCKET_IO *sock_io_p)
#define DRIVER_SESSION_SIZE
Definition: cas_protocol.h:130
T_SHARD_INFO * shard_shm_find_shard_info(T_PROXY_INFO *proxy_info_p, int shard_id)
Definition: shard_shm.c:433
char * proxy_str_cas_io(T_CAS_IO *cas_io_p)
static void proxy_socket_io_read_from_client(T_SOCKET_IO *sock_io_p)
int cas_info_size
T_PROXY_CONTEXT * proxy_context_find(int cid, unsigned int uid)
static T_SOCKET_IO * proxy_socket_io_find(SOCKET fd)
#define PROXY_DEBUG_LOG(fmt, args...)
T_SHARD_CONN shard_conn[MAX_SHARD_CONN]
Definition: broker_shm.h:286
void proxy_shard_io_print(bool print_all)
T_PROXY_HANDLER proxy_Handler
#define CAS_CONV_ERROR_TO_OLD(V)
Definition: cas_protocol.h:301
int proxy_io_make_client_acl_fail(char *driver_info, char **buffer)
#define CAS_PROTO_VER_MASK
Definition: cas_protocol.h:293
int argc
Definition: dynamic_load.c:951
unsigned int htonl(unsigned int from)
T_CLIENT_IO * proxy_client_io_find_by_ctx(int client_id, int ctx_cid, unsigned int ctx_uid)
#define CAS_CONNECTION_REPLY_SIZE
Definition: cas_protocol.h:134
static void proxy_socket_io_read_error(T_SOCKET_IO *sock_io_p)
int proxy_cas_io_write(T_CAS_IO *cas_io_p, T_PROXY_EVENT *event_p)
char port_name[SHM_APPL_SERVER_NAME_MAX]
Definition: broker_shm.h:601
int proxy_io_process(void)
int access_control_check_right(T_SHM_APPL_SERVER *shm_as_p, char *dbname, char *dbuser, unsigned char *address)
Definition: broker_acl.c:401
static T_CAS_IO * proxy_find_idle_cas_by_conn_info(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
T_PROXY_CONTEXT * proxy_context_find_by_socket_client_io(T_SOCKET_IO *sock_io_p)
#define PROXY_INVALID_CONTEXT
void proxy_available_cas_wait_timer(void)
char database_user[SRV_CON_DBUSER_SIZE]
static void proxy_socket_io_write(T_SOCKET_IO *sock_io_p)
static int proxy_process_cas_register(T_SOCKET_IO *sock_io_p)
#define PROXY_CONV_ERR_TO_OLD
static void proxy_socket_io_read_from_client_first(T_SOCKET_IO *sock_io_p)
int get_msg_length(char *buffer)
int proxy_io_make_client_proxy_alive(char *driver_info, char **buffer)
#define NET_BUF_HEADER_MSG_SIZE
Definition: cas_net_buf.h:77
#define CAS_PROTO_INDICATOR
Definition: cas_protocol.h:281
#define SRV_CON_MSG_IDX_PROTO_VERSION
Definition: cas_protocol.h:46
int shard_queue_enqueue(T_SHARD_QUEUE *q, void *v)
#define EXIT_FUNC()
#define PROXY_EVENT_FROM_CAS
void proxy_event_set_type_from(T_PROXY_EVENT *event_p, unsigned int type, int from_cas)
#define CAS_VER_TO_MAJOR(VER)
Definition: cas_protocol.h:317
bool proxy_event_io_read_complete(T_PROXY_EVENT *event_p)
static int proxy_process_cas_message(T_SOCKET_IO *sock_io_p)
char * proxy_get_driver_info_by_ctx(T_PROXY_CONTEXT *ctx_p)
char database_user[SRV_CON_DBUSER_SIZE]
Definition: broker_shm.h:354
#define CAS_CONNECTION_REPLY_SIZE_V3
Definition: cas_protocol.h:132
T_IO_BUFFER buffer
#define MSG_HEADER_INFO_SIZE
Definition: cas_protocol.h:112
static void proxy_cas_io_free(int shard_id, int cas_id)
T_SHARD_QUEUE cli_rcv_q
#define CAS_CONV_ERROR_TO_NEW(V)
Definition: cas_protocol.h:302
fd_set wnewset
#define CLOSESOCKET(fd)
int shard_queue_ordered_enqueue(T_SHARD_QUEUE *q, void *v, SHARD_COMP_FN comp_fn)
int proxy_access_log(struct timeval *start_time, int client_ip_addr, const char *dbname, const char *dbuser, bool accepted)
T_SOCKET_IO_GLOBAL proxy_Socket_io
static void proxy_socket_io_write_error(T_SOCKET_IO *sock_io_p)
static void proxy_client_io_destroy(void)
int proxy_io_make_get_db_version(char *driver_info, char **buffer)
#define CAS_INFO_SIZE
Definition: cas_protocol.h:109
#define SRV_CON_DBPASSWD_SIZE
Definition: cas_protocol.h:56
#define CAS_MAKE_VER(MAJOR, MINOR, PATCH)
Definition: cas_protocol.h:304
static int proxy_io_register_to_broker(void)
int max_appl_server
Definition: broker_shm.h:416
#define SRV_CON_DB_INFO_SIZE
Definition: cas_protocol.h:61
#define INVALID_SOCKET
Definition: porting.h:483
struct t_socket_io T_SOCKET_IO
T_CAS_IO *(* T_FUNC_FIND_CAS)(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
T_APPL_SERVER_INFO * shard_shm_get_as_info(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id)
Definition: shard_shm.c:671
T_SHM_PROXY * shm_proxy_p
int proxy_id
Definition: shard_proxy.c:45
#define SRV_CON_DBNAME_SIZE
int max_client
Definition: broker_shm.h:533
#define SRV_CON_VER_STR_MAX_SIZE
Definition: cas_protocol.h:59
void proxy_io_destroy(void)
char db_password[SRV_CON_DBPASSWD_SIZE]
Definition: broker_shm.h:236
int proxy_io_make_close_req_handle_ok(char *driver_info, char **buffer, bool is_in_tran)
bool shard_shm_set_as_client_info_with_db_param(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id, T_CLIENT_INFO *client_info_p)
Definition: shard_shm.c:724
#define ENTER_FUNC()
char cci_default_autocommit
Definition: broker_shm.h:572
static int proxy_client_add_waiter_by_shard(T_SHARD_IO *shard_io_p, int ctx_cid, int ctx_uid, int timeout)
#define CAS_PROTO_UNPACK_NET_VER(VER)
Definition: cas_protocol.h:296
time_t connect_time
Definition: broker_shm.h:390
#define CAS_INFO_RESERVED_DEFAULT
Definition: cas_protocol.h:110
static void proxy_socket_io_read(T_SOCKET_IO *sock_io_p)
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
static T_SOCKET_IO * proxy_socket_io_add(SOCKET fd, bool from_cas)
#define APPL_SERVER_CAS_MYSQL
Definition: broker_config.h:37
char db_name[SRV_CON_DBNAME_SIZE]
Definition: broker_shm.h:234
INT64 num_connect_requests
Definition: broker_shm.h:506
#define APPL_SERVER_CAS_ORACLE
Definition: broker_config.h:35
int max_context
Definition: broker_shm.h:534
void net_buf_init(T_NET_BUF *net_buf, T_BROKER_VERSION client_version)
Definition: cas_net_buf.c:54
int net_buf_cp_int(T_NET_BUF *net_buf, int value, int *begin_offset)
Definition: cas_net_buf.c:126
void proxy_cas_io_free_by_ctx(int shard_id, int cas_id, int ctx_cid, int unsigned ctx_uid)
int maxfd
int proxy_context_send_error(T_PROXY_CONTEXT *ctx_p)
#define CAS_MAKE_PROTO_VER(DRIVER_INFO)
Definition: cas_protocol.h:307
#define DOES_CLIENT_MATCH_THE_PROTOCOL(CLIENT, MATCH)
Definition: cas_protocol.h:289
static void proxy_socket_io_destroy(void)
void proxy_client_io_free_by_ctx(int client_id, int ctx_cid, int ctx_uid)
#define CAS_VER_TO_PATCH(VER)
Definition: cas_protocol.h:319
char * proxy_str_event(T_PROXY_EVENT *event_p)
int proxy_convert_error_code(int error_ind, int error_code, char *driver_info, T_BROKER_VERSION client_version, bool to_new)
#define assert(x)
static char broker_info[BROKER_INFO_SIZE]
Definition: cas_meta.c:34
struct t_client_io T_CLIENT_IO
void proxy_unset_force_out_tran(char *msg)
char database_passwd[SRV_CON_DBPASSWD_SIZE]
char database_passwd[SRV_CON_DBPASSWD_SIZE]
Definition: broker_shm.h:355
void shard_shm_init_client_info(T_CLIENT_INFO *client_info_p)
Definition: shard_shm.c:756
static int get_dbinfo_length(char *driver_info)
#define MAX_FD
static int proxy_io_cas_lsnr(void)
static int proxy_socket_io_read_internal(T_SOCKET_IO *sock_io_p)
static void proxy_socket_io_read_from_cas_next(T_SOCKET_IO *sock_io_p)
int proxy_socket_set_write_event(T_SOCKET_IO *sock_io_p, T_PROXY_EVENT *event_p)
T_SHM_SHARD_CONN * shm_conn_p
Definition: shard_proxy.c:52
int * msg_body_size_ptr
Definition: cas_network.h:270
#define SRV_CON_DB_INFO_SIZE_PRIOR_8_2_0
Definition: cas_protocol.h:67
bool proxy_Keep_running
Definition: shard_proxy.c:57
int tran_commit(bool retain_lock)
T_CAS_IO * proxy_cas_find_io_by_ctx(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
#define IS_INVALID_SOCKET(socket)
Definition: porting.h:484
#define PROXY_CONNECTION_REPLY_SIZE(con_reply_size)
int proxy_waiter_comp_fn(const void *arg1, const void *arg2)
T_PROXY_INFO * proxy_info_p
Definition: shard_proxy.c:48
void proxy_context_set_error_with_msg(T_PROXY_CONTEXT *ctx_p, int error_ind, int error_code, const char *error_msg)
static void proxy_socket_io_read_from_cas(T_SOCKET_IO *sock_io_p)
T_CLIENT_IO_GLOBAL proxy_Client_io
T_PROXY_CONTEXT proxy_Context
SOCKET cas_lsnr_fd
char appl_server
Definition: broker_shm.h:470
int proxy_io_make_no_error(char *driver_info, char **buffer)
static T_CLIENT_IO * proxy_client_io_new(SOCKET fd, char *driver_info)
T_SHARD_QUEUE cas_rcv_q
bool fixed_shard_user
Definition: broker_shm.h:489
#define CAS_INFO_FLAG_MASK_NEW_SESSION_ID
Definition: cas_protocol.h:107
int proxy_event_realloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
#define SRV_CON_URL_SIZE
Definition: cas_protocol.h:57
#define CAS_ERROR_INDICATOR
Definition: cas.h:39
char * data
Definition: cas_net_buf.h:96
int get_data_length(char *buffer)
#define READSOCKET(fd, buf, len)
static T_CAS_IO * proxy_cas_alloc_anything(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, T_FUNC_FIND_CAS function)
int proxy_io_make_ex_get_lock_timeout(char *driver_info, char **buffer, void *argv)
#define SRV_CON_DB_INFO_SIZE_PRIOR_8_4_0
Definition: cas_protocol.h:64
int proxy_io_initialize(void)
static void proxy_socket_io_write_to_cas(T_SOCKET_IO *sock_io_p)
#define NULL
Definition: freelistheap.h:34
union t_socket_io::@41 id
static int proxy_process_cas_write_error(T_SOCKET_IO *sock_io_p)
struct t_socket_io::@41::@42 shard
#define strncpy_bufsize(buf, str)
Definition: porting.h:340
static T_CAS_IO * proxy_find_idle_cas_by_desc(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
static void proxy_socket_io_read_from_cas_first(T_SOCKET_IO *sock_io_p)
INT64 num_connect_rejected
Definition: broker_shm.h:507
T_SHARD_USER * shard_metadata_get_shard_user(T_SHM_SHARD_USER *shm_user_p)
T_CLIENT_INFO * shard_shm_get_client_info(T_PROXY_INFO *proxy_info_p, int idx)
Definition: shard_shm.c:448
char * proxy_get_driver_info_by_fd(T_SOCKET_IO *sock_io_p)
static int proxy_process_client_conn_error(T_SOCKET_IO *sock_io_p)
char db_conn_info[MAX_CONN_INFO_LENGTH]
Definition: broker_shm.h:279
#define PROXY_IO_FROM_CAS
static struct sockaddr_un shard_sock_addr
Definition: broker.c:307
unsigned short htons(unsigned short from)
void proxy_client_io_print(bool print_all)
#define MALLOC(SIZE)
Definition: cas_common.h:53
#define WRITESOCKET(fd, buf, len)
#define FREE_MEM(PTR)
Definition: cas_common.h:58
#define SHARD_TEMPORARY_UNAVAILABLE
static T_CAS_IO * proxy_cas_alloc_by_shard_and_cas_id(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
void * shard_cqueue_dequeue(T_SHARD_CQUEUE *q)
fd_set allset
char driver_version[SRV_CON_VER_STR_MAX_SIZE]
Definition: broker_shm.h:392
int data_size
Definition: cas_net_buf.h:95
char * proxy_dup_msg(char *msg)
static int proxy_shard_io_initialize(void)
void proxy_client_io_free(T_CLIENT_IO *cli_io_p)
T_SHARD_IO_GLOBAL proxy_Shard_io
#define PROXY_INVALID_CAS
Definition: broker_util.h:41
#define BROKER_INFO_SIZE
Definition: cas_protocol.h:116
void cas_bi_make_broker_info(char *broker_info, char dbms_type, char statement_pooling, char cci_pconnect)
Definition: cas_meta.c:214
static T_SHARD_IO * proxy_shard_io_find(int shard_id)
int proxy_event_alloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
int proxy_io_make_error_msg(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran)
char * db_name
#define PROXY_CONV_ERR_TO_NEW
T_CAS_IO * proxy_cas_alloc_by_ctx(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, int timeout, int func_code)
#define SRV_CON_MSG_IDX_FUNCTION_FLAG
Definition: cas_protocol.h:47
#define max(a, b)
static SOCKET proxy_io_connect_to_broker(void)
void proxy_context_clear_error(T_PROXY_CONTEXT *ctx_p)
static T_CAS_IO * proxy_cas_io_new(int shard_id, int cas_id, SOCKET fd)
int proxy_io_make_set_db_parameter_ok(char *driver_info, char **buffer)
T_SHARD_QUEUE waitq
T_SHM_APPL_SERVER * shm_as_p
Definition: shard_proxy.c:43
#define BROKER_RENEWED_ERROR_CODE
Definition: cas_protocol.h:117
T_PROXY_EVENT * read_event
#define MSG_HEADER_MSG_SIZE
Definition: cas_protocol.h:113
static int proxy_socket_io_new_client(SOCKET lsnr_fd)
static void error(const char *msg)
Definition: gencat.c:331
T_CLIENT_IO * proxy_client_io_find_by_fd(int client_id, SOCKET fd)
static T_CAS_IO * proxy_find_idle_cas_by_asc(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
T_PROXY_CONTEXT * proxy_context_new(void)
#define SRV_CON_DBSESS_ID_SIZE
Definition: cas_protocol.h:58
#define GET_CAS_PORT(broker_port, proxy_index, proxy_max_count)
fd_set rset
#define PROXY_RESERVED_FD
Definition: broker_shm.h:160
#define BROKER_SUPPORT_HOLDABLE_RESULT
Definition: cas_protocol.h:118
void proxy_waiter_timeout(T_SHARD_QUEUE *waitq, INT64 *counter, int now)
static SOCKET proxy_io_accept(SOCKET lsnr_fd)
static int broker_port
int proxy_client_io_write(T_CLIENT_IO *cli_io_p, T_PROXY_EVENT *event_p)
unsigned short type
void proxy_cas_release_by_ctx(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
void proxy_term(void)
Definition: shard_proxy.c:65
INT64 waiter_count
Definition: broker_shm.h:420
int proxy_io_close_all_fd(void)
char * proxy_str_context(T_PROXY_CONTEXT *ctx_p)
int proxy_io_make_check_cas_ok(char *driver_info, char **buffer)
int proxy_io_make_ex_get_isolation_level(char *driver_info, char **buffer, void *argv)
#define CAS_VER_TO_MINOR(VER)
Definition: cas_protocol.h:318
void shard_queue_destroy(T_SHARD_QUEUE *q)
static void proxy_socket_io_clear(T_SOCKET_IO *sock_io_p)
const char ** argv
Definition: dynamic_load.c:952
#define SRV_CON_DBUSER_SIZE
Definition: cas_protocol.h:55
#define strlen(s1)
Definition: intl_support.c:43
void shard_stmt_del_all_srv_h_id_for_shard_cas(int shard_id, int cas_id)
static int proxy_process_cas_read_error(T_SOCKET_IO *sock_io_p)
#define PROXY_LOG(level, fmt, args...)
int proxy_io_make_client_dbinfo_ok(char *driver_info, char **buffer)
int alloc_size
Definition: cas_net_buf.h:94
int T_BROKER_VERSION
Definition: cas_protocol.h:342
static SOCKET proxy_io_cas_accept(SOCKET lsnr_fd)
#define CLOSE_SOCKET(X)
Definition: cas_common.h:85
#define CAS_PROTO_TO_VER_STR(MSG_P, VER)
Definition: cas_protocol.h:320
static int proxy_io_make_ex_get_int(char *driver_info, char **buffer, int *argv)
#define DOES_CLIENT_UNDERSTAND_THE_PROTOCOL(CLIENT, REQUIRE)
Definition: cas_protocol.h:290
static SOCKET sock_fd
Definition: broker.c:300
void shard_cqueue_destroy(T_SHARD_CQUEUE *q)
char buf[MSG_HEADER_SIZE]
Definition: cas_network.h:272
T_PROXY_EVENT * proxy_event_new_with_rsp(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
#define DBMS_ERROR_INDICATOR
Definition: cas.h:40
T_CAS_IO * ent
int i
Definition: dynamic_load.c:954
bool cas_di_understand_renewed_error_code(const char *driver_info)
Definition: cas_meta.c:203
#define HEALTH_CHECK_DUMMY_DB
Definition: cas_protocol.h:156
int proxy_io_make_end_tran_ok(char *driver_info, char **buffer)
void * shard_queue_dequeue(T_SHARD_QUEUE *q)
char port_name[SHM_PROXY_NAME_MAX]
Definition: broker_shm.h:488
static int proxy_process_cas_response(T_SOCKET_IO *sock_io_p)
int shard_cqueue_enqueue(T_SHARD_CQUEUE *q, void *e)
#define CAS_CONNECTION_REPLY_SIZE_PRIOR_PROTOCOL_V3
Definition: cas_protocol.h:131
static T_CAS_IO * proxy_cas_io_find_by_fd(int shard_id, int cas_id, SOCKET fd)
void * shard_queue_peek_value(T_SHARD_QUEUE *q)
static void proxy_set_conn_info(int func_code, int ctx_cid, int ctx_uid, int shard_id, int cas_id)
int proxy_io_make_client_conn_ok(char *driver_info, char **buffer)
#define APPL_SERVER_CAS_MYSQL51
Definition: broker_config.h:36
unsigned int ctx_uid
#define SHARD_NET_BUF_ALLOC_SIZE
Definition: cas_net_buf.h:75
#define PROTOCOL_SIZE
static int shard_io_set_fl(int fd, int flags)
unsigned int ntohl(unsigned int from)
int recv_fd(int fd, int *rid, char *driver_info)
int shard_queue_initialize(T_SHARD_QUEUE *q)
const char * rel_build_number(void)
#define GET_CLIENT_PORT(broker_port, proxy_index)
char db_user[SRV_CON_DBUSER_SIZE]
Definition: broker_shm.h:235
void proxy_context_free(T_PROXY_CONTEXT *ctx_p)
static int proxy_process_client_write_error(T_SOCKET_IO *sock_io_p)
static int proxy_process_client_request(T_SOCKET_IO *sock_io_p)
void proxy_set_con_status_in_tran(char *msg)
static void proxy_socket_io_read_from_client_next(T_SOCKET_IO *sock_io_p)
#define CAS_INFO_FLAG_MASK_AUTOCOMMIT
Definition: cas_protocol.h:105
#define PROXY_INVALID_SHARD
Definition: broker_util.h:38
static int proxy_check_authorization(T_PROXY_CONTEXT *ctx_p, const char *db_name, const char *db_user, const char *db_passwd)
static int proxy_process_cas_conn_error(T_SOCKET_IO *sock_io_p)
SOCKET broker_conn_fd
int proxy_io_make_end_tran_abort(char *driver_info, char **buffer)
T_PROXY_EVENT * proxy_event_new(unsigned int type, int from_cas)
static int proxy_socket_io_initialize(void)
int proxy_socket_io_delete(SOCKET fd)
#define PROXY_EVENT_FROM_CLIENT
int proxy_io_make_close_req_handle_out_tran_ok(char *driver_info, char **buffer)
#define CAS_PID_SIZE
Definition: cas_protocol.h:128
int proxy_io_make_check_cas(char *driver_info, char **buffer)
int net_buf_cp_byte(T_NET_BUF *net_buf, char ch)
Definition: cas_net_buf.c:97
static void proxy_socket_io_write_to_client(T_SOCKET_IO *sock_io_p)
void proxy_event_set_context(T_PROXY_EVENT *event_p, int cid, unsigned int uid)
#define SESSION_ID_SIZE
Definition: cas_protocol.h:129
const char ** p
Definition: dynamic_load.c:945
int proxy_io_make_cursor_close_out_tran_ok(char *driver_info, char **buffer)
char * proxy_str_client_io(T_CLIENT_IO *cli_io_p)
int shard_cqueue_initialize(T_SHARD_CQUEUE *q, int size)
char db_name[MAX_DBNAME_LENGTH]
Definition: broker_shm.h:278
int proxy_io_make_con_close_ok(char *driver_info, char **buffer)
#define CAS_NO_ERROR
Definition: cas.h:41
#define REALLOC(PTR, SIZE)
Definition: cas_common.h:54
unsigned int ctx_uid
static int proxy_process_client_read_error(T_SOCKET_IO *sock_io_p)
T_PROXY_EVENT * write_event
char * info_ptr
Definition: cas_network.h:271
static void proxy_client_check_waiter_and_wakeup(T_SHARD_IO *shard_io_p, T_CAS_IO *cas_io_p)
T_WAIT_CONTEXT * proxy_waiter_new(int ctx_cid, unsigned int ctx_uid, int timeout)
static int proxy_client_io_initialize(void)