CUBRID Engine  latest
broker.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 
20 /*
21  * broker.c -
22  */
23 
24 #ident "$Id$"
25 
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <signal.h>
30 #include <errno.h>
31 #include <fcntl.h>
32 #include <assert.h>
33 
34 #if !defined(WINDOWS)
35 #include <pthread.h>
36 #include <unistd.h>
37 #include <sys/file.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <arpa/inet.h>
41 #include <netinet/tcp.h>
42 #include <sys/time.h>
43 #include <sys/un.h>
44 #else
45 #include <io.h>
46 #endif
47 
48 #ifdef BROKER_DEBUG
49 #include <sys/time.h>
50 #endif
51 
52 #include "connection_defs.h"
53 #include "connection_cl.h"
54 
55 #include "system_parameter.h"
56 #include "databases_file.h"
57 #include "util_func.h"
58 
59 #include "cas_error.h"
60 #include "cas_common.h"
61 #include "broker_error.h"
62 #include "broker_env_def.h"
63 #include "broker_shm.h"
64 #include "broker_msg.h"
65 #include "broker_process_size.h"
66 #include "broker_util.h"
67 #include "broker_access_list.h"
68 #include "broker_filename.h"
69 #include "broker_er_html.h"
70 #include "broker_send_fd.h"
71 #include "error_manager.h"
72 #include "shard_shm.h"
73 #include "shard_metadata.h"
74 #include "broker_proxy_conn.h"
75 #include "dbtype_def.h"
76 
77 #if defined(WINDOWS)
78 #include "broker_wsa_init.h"
79 #endif
80 
81 #if defined(CAS_FOR_ORACLE) || defined(CAS_FOR_MYSQL)
82 #define DB_EMPTY_SESSION (0)
83 #endif /* CAS_FOR_ORACLE || CAS_FOR_MYSQL */
84 
85 #ifdef WIN_FW
86 #if !defined(WINDOWS)
87 #error DEFINE ERROR
88 #endif
89 #endif
90 
91 #ifdef BROKER_RESTART_DEBUG
92 #define PS_CHK_PERIOD 30
93 #else
94 #define PS_CHK_PERIOD 600
95 #endif
96 
97 #ifdef ASYNC_MODE
98 #ifdef HPUX10_2
99 #define SELECT_MASK int
100 #else
101 #define SELECT_MASK fd_set
102 #endif
103 #endif
104 
105 #define IP_ADDR_STR_LEN 20
106 #define BUFFER_SIZE ONE_K
107 
108 #define ENV_BUF_INIT_SIZE 512
109 #define ALIGN_ENV_BUF_SIZE(X) \
110  ((((X) + ENV_BUF_INIT_SIZE) / ENV_BUF_INIT_SIZE) * ENV_BUF_INIT_SIZE)
111 
112 #define MONITOR_SERVER_INTERVAL 5
113 
114 #ifdef BROKER_DEBUG
115 #define BROKER_LOG(X) \
116  do { \
117  FILE *fp; \
118  fp = fopen("broker.log", "a"); \
119  if (fp) { \
120  struct timeval tv; \
121  gettimeofday(&tv, NULL); \
122  fprintf(fp, "%d %d.%06d %s\n", __LINE__, (int)tv.tv_sec, (int)tv.tv_usec, X); \
123  fclose(fp); \
124  } \
125  } while (0)
126 #define BROKER_LOG_INT(X) \
127  do { \
128  FILE *fp; \
129  struct timeval tv; \
130  gettimeofday(&tv, NULL); \
131  fp = fopen("broker.log", "a"); \
132  if (fp) { \
133  fprintf(fp, "%d %d.%06d %s=%d\n", __LINE__, (int)tv.tv_sec, (int)tv.tv_usec, #X, X); \
134  fclose(fp); \
135  } \
136  } while (0)
137 #endif
138 
139 #ifdef SESSION_LOG
140 #define SESSION_LOG_WRITE(IP, SID, APPL, INDEX) \
141  do { \
142  FILE *fp; \
143  struct timeval tv; \
144  char ip_str[64]; \
145  gettimeofday(&tv, NULL); \
146  ip2str(IP, ip_str); \
147  fp = fopen("session.log", "a"); \
148  if (fp) { \
149  fprintf(fp, "%d %-15s %d %d.%06d %s %d \n", br_index, ip_str, (int) SID, tv.tv_sec, tv.tv_usec, APPL, INDEX); \
150  fclose(fp); \
151  } \
152  } while (0)
153 #endif
154 
155 #ifdef _EDU_
156 #define EDU_KEY "86999522480552846466422480899195252860256028745"
157 #endif
158 
159 #define V3_WRITE_HEADER_OK_FILE_SOCK(sock_fd) \
160  do { \
161  char buf[V3_RESPONSE_HEADER_SIZE]; \
162  memset(buf, '\0', sizeof(buf)); \
163  sprintf(buf, V3_HEADER_OK); \
164  write_to_client(sock_fd, buf, sizeof(buf)); \
165  } while(0);
166 
167 #define V3_WRITE_HEADER_ERR_SOCK(sockfd) \
168  do { \
169  char buf[V3_RESPONSE_HEADER_SIZE]; \
170  memset(buf, '\0', sizeof(buf)); \
171  sprintf(buf, V3_HEADER_ERR); \
172  write_to_client(sockfd, buf, sizeof(buf)); \
173  } while(0);
174 
175 #define SET_BROKER_ERR_CODE() \
176  do { \
177  if (shm_br && br_index >= 0) { \
178  shm_br->br_info[br_index].err_code = uw_get_error_code(); \
179  shm_br->br_info[br_index].os_err_code = uw_get_os_error_code(); \
180  } \
181  } while (0)
182 
183 #define SET_BROKER_OK_CODE() \
184  do { \
185  if (shm_br && br_index >= 0) { \
186  shm_br->br_info[br_index].err_code = 0; \
187  } \
188  } while (0)
189 
190 #define CAS_SEND_ERROR_CODE(FD, VAL) \
191  do { \
192  int write_val; \
193  write_val = htonl(VAL); \
194  write_to_client(FD, (char*) &write_val, 4); \
195  } while (0)
196 
197 #define JOB_COUNT_MAX 1000000
198 
199 /* num of collecting counts per monitoring interval */
200 #define NUM_COLLECT_COUNT_PER_INTVL 4
201 #define HANG_COUNT_THRESHOLD_RATIO 0.5
202 
203 #if defined(WINDOWS)
204 #define F_OK 0
205 #else
206 #define SOCKET_TIMEOUT_SEC 2
207 #endif
208 
209 /* server state */
211 {
221 };
222 
223 typedef struct t_clt_table T_CLT_TABLE;
225 {
228 };
229 static void shard_broker_process (void);
230 static void cleanup (int signo);
231 static int init_env (void);
232 #if !defined(WINDOWS)
233 static int init_proxy_env (void);
234 #endif /* !WINDOWS */
235 static int broker_init_shm (void);
236 
237 static void cas_monitor_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index, int *busy_uts);
238 static void psize_check_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
239 
241 
242 static void proxy_monitor_worker (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);
243 
244 static THREAD_FUNC receiver_thr_f (void *arg);
245 static THREAD_FUNC dispatch_thr_f (void *arg);
246 static THREAD_FUNC shard_dispatch_thr_f (void *arg);
247 static THREAD_FUNC psize_check_thr_f (void *arg);
248 static THREAD_FUNC cas_monitor_thr_f (void *arg);
249 static THREAD_FUNC hang_check_thr_f (void *arg);
250 static THREAD_FUNC proxy_monitor_thr_f (void *arg);
251 #if !defined(WINDOWS)
252 static THREAD_FUNC proxy_listener_thr_f (void *arg);
253 #endif /* !WINDOWS */
254 static THREAD_FUNC server_monitor_thr_f (void *arg);
255 
256 static int read_nbytes_from_client (SOCKET sock_fd, char *buf, int size);
257 
258 #if defined(WIN_FW)
259 static THREAD_FUNC service_thr_f (void *arg);
260 static int process_cas_request (int cas_pid, int as_index, SOCKET clt_sock_fd, SOCKET srv_sock_fd);
261 static int read_from_cas_client (SOCKET sock_fd, char *buf, int size, int as_index, int cas_pid);
262 #endif
263 
264 static int write_to_client (SOCKET sock_fd, char *buf, int size);
265 static int write_to_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec);
266 static int read_from_client (SOCKET sock_fd, char *buf, int size);
267 static int read_from_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec);
268 static int run_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
269 static int stop_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
270 static void restart_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
271 
272 static int run_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);
273 static int stop_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);
274 static void restart_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);
275 static SOCKET connect_srv (char *br_name, int as_index);
276 static int find_idle_cas (void);
277 static int find_drop_as_index (void);
278 static int find_add_as_index (void);
279 static bool broker_add_new_cas (void);
280 
281 static void check_cas_log (char *br_name, T_APPL_SERVER_INFO * as_info_p, int as_index);
282 static void check_proxy_log (char *br_name, T_PROXY_INFO * proxy_info_p);
284 
285 static void get_as_sql_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p,
286  int as_index);
287 static void get_as_slow_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p,
288  int as_index);
289 
290 static CSS_CONN_ENTRY *connect_to_master_for_server_monitor (const char *db_name, const char *db_host);
291 static int get_server_state_from_master (CSS_CONN_ENTRY * conn, const char *db_name);
292 
293 static int insert_db_server_check_list (T_DB_SERVER * list_p, int check_list_cnt, const char *db_name,
294  const char *db_host);
295 
296 #if defined(WINDOWS)
297 static int get_cputime_sec (int pid);
298 #endif
299 
301 static struct sockaddr_in sock_addr;
302 static int sock_addr_len;
303 #if defined(WINDOWS)
304 static struct sockaddr_in shard_sock_addr;
305 #else /* WINDOWS */
307 static struct sockaddr_un shard_sock_addr;
308 #endif /* !WINDOWS */
309 
314 
315 static int br_index = -1;
316 static int br_shard_flag = OFF;
317 
318 #if defined(WIN_FW)
319 static int num_thr;
320 #endif
321 
322 static pthread_cond_t clt_table_cond;
323 static pthread_mutex_t clt_table_mutex;
324 static pthread_mutex_t run_appl_mutex;
325 static pthread_mutex_t broker_shm_mutex;
326 static pthread_mutex_t run_proxy_mutex;
327 static char run_proxy_flag = 0;
328 
329 static char run_appl_server_flag = 0;
330 
332 
333 static int process_flag = 1;
334 
335 static int num_busy_uts = 0;
336 
337 static int max_open_fd = 128;
338 
339 #if defined(WIN_FW)
340 static int last_job_fetch_time;
341 static time_t last_session_id = 0;
342 static T_MAX_HEAP_NODE *session_request_q;
343 #endif
344 
345 static int hold_job = 0;
346 
347 static bool
349 {
350  int cur_appl_server_num;
351  int add_as_index;
352  int pid;
353 
354  cur_appl_server_num = shm_br->br_info[br_index].appl_server_num;
355 
356  /* ADD UTS */
357  if (cur_appl_server_num >= shm_br->br_info[br_index].appl_server_max_num)
358  {
359  return false;
360  }
361 
362  add_as_index = find_add_as_index ();
363  if (add_as_index < 0)
364  {
365  return false;
366  }
367 
368  pid = run_appl_server (&(shm_appl->as_info[add_as_index]), br_index, add_as_index);
369  if (pid <= 0)
370  {
371  return false;
372  }
373 
375  shm_appl->as_info[add_as_index].pid = pid;
376  shm_appl->as_info[add_as_index].psize = getsize (pid);
377  shm_appl->as_info[add_as_index].psize_time = time (NULL);
378  shm_appl->as_info[add_as_index].uts_status = UTS_STATUS_IDLE;
379  shm_appl->as_info[add_as_index].service_flag = SERVICE_ON;
380  shm_appl->as_info[add_as_index].reset_flag = FALSE;
381 
382  memset (&shm_appl->as_info[add_as_index].cas_clt_ip[0], 0x0, sizeof (shm_appl->as_info[add_as_index].cas_clt_ip));
383  shm_appl->as_info[add_as_index].cas_clt_port = 0;
384  shm_appl->as_info[add_as_index].driver_version[0] = '\0';
385 
386  (shm_br->br_info[br_index].appl_server_num)++;
387  (shm_appl->num_appl_server)++;
389 
390  return true;
391 }
392 
393 static bool
395 {
396  int cur_appl_server_num, wait_job_cnt;
397  int drop_as_index;
398  T_APPL_SERVER_INFO *drop_as_info;
399 
400  /* DROP UTS */
401  cur_appl_server_num = shm_br->br_info[br_index].appl_server_num;
402  wait_job_cnt = shm_appl->job_queue[0].id + hold_job;
403  wait_job_cnt -= (cur_appl_server_num - num_busy_uts);
404 
405  if (cur_appl_server_num <= shm_br->br_info[br_index].appl_server_min_num || wait_job_cnt > 0)
406  {
407  return false;
408  }
409 
410  drop_as_index = find_drop_as_index ();
411  if (drop_as_index < 0)
412  {
413  return false;
414  }
415 
417  current_dropping_as_index = drop_as_index;
418  drop_as_info = &shm_appl->as_info[drop_as_index];
419  drop_as_info->service_flag = SERVICE_OFF_ACK;
421 
422  CON_STATUS_LOCK (drop_as_info, CON_STATUS_LOCK_BROKER);
423  if (drop_as_info->uts_status == UTS_STATUS_IDLE)
424  {
425  /* do nothing */
426  }
427  else if (drop_as_info->cur_keep_con == KEEP_CON_AUTO && drop_as_info->uts_status == UTS_STATUS_BUSY
428  && drop_as_info->con_status == CON_STATUS_OUT_TRAN
429  && time (NULL) - drop_as_info->last_access_time > shm_br->br_info[br_index].time_to_kill)
430  {
431  drop_as_info->con_status = CON_STATUS_CLOSE;
432  }
433  else
434  {
435  drop_as_info->service_flag = SERVICE_ON;
436  drop_as_index = -1;
437  }
439 
440  if (drop_as_index >= 0)
441  {
443  (shm_br->br_info[br_index].appl_server_num)--;
444  (shm_appl->num_appl_server)--;
446 
447  stop_appl_server (drop_as_info, br_index, drop_as_index);
448  }
450 
451  return true;
452 }
453 
454 #if defined(WINDOWS)
455 int WINAPI
456 WinMain (HINSTANCE hInstance, // handle to current instance
457  HINSTANCE hPrevInstance, // handle to previous instance
458  LPSTR lpCmdLine, // pointer to command line
459  int nShowCmd // show state of window
460  )
461 #else
462 int
463 main (int argc, char *argv[])
464 #endif
465 {
466  pthread_t receiver_thread;
467  pthread_t dispatch_thread;
468  pthread_t cas_monitor_thread;
469  pthread_t psize_check_thread;
470  pthread_t hang_check_thread;
471  pthread_t server_monitor_thread;
472  pthread_t proxy_monitor_thread;
473 #if !defined(WINDOWS)
474  pthread_t proxy_listener_thread;
475 #endif /* !WINDOWS */
476 
477 #if defined(WIN_FW)
478  pthread_t service_thread;
479  int *thr_index;
480  int i;
481 #endif
482  int error;
483 
484  error = broker_init_shm ();
485  if (error)
486  {
487  goto error1;
488  }
489 
490  br_shard_flag = br_info_p->shard_flag;
491 
492  signal (SIGTERM, cleanup);
493  signal (SIGINT, cleanup);
494 #if !defined(WINDOWS)
495  signal (SIGCHLD, SIG_IGN);
496  signal (SIGPIPE, SIG_IGN);
497 #endif
498 
499  pthread_cond_init (&clt_table_cond, NULL);
503  if (br_shard_flag == ON)
504  {
506  }
507 
508 #if defined(WINDOWS)
509  if (wsa_initialize () < 0)
510  {
512  goto error1;
513  }
514 #endif /* WINDOWS */
515 
516  if (br_shard_flag == OFF && uw_acl_make (shm_br->br_info[br_index].acl_file) < 0)
517  {
518  goto error1;
519  }
520 
521  if (init_env () == -1)
522  {
523  goto error1;
524  }
525 
526  if (br_shard_flag == ON)
527  {
528 #if !defined(WINDOWS)
529  if (init_proxy_env () == -1)
530  {
531  goto error1;
532  }
533 
534  if (broker_init_proxy_conn (br_info_p->num_proxy) < 0)
535  {
536  goto error1;
537  }
538 #endif /* !WINDOWS */
539  }
540  else
541  {
542 #if defined(WIN_FW)
543  num_thr = shm_br->br_info[br_index].appl_server_max_num;
544 
545  thr_index = (int *) malloc (sizeof (int) * num_thr);
546  if (thr_index == NULL)
547  {
549  goto error1;
550  }
551 
552  /* initialize session request queue. queue size is 1 */
553  session_request_q = (T_MAX_HEAP_NODE *) malloc (sizeof (T_MAX_HEAP_NODE) * num_thr);
554  if (session_request_q == NULL)
555  {
557  goto error1;
558  }
559  for (i = 0; i < num_thr; i++)
560  {
561  session_request_q[i].clt_sock_fd = INVALID_SOCKET;
562  }
563 #endif
564  }
565 
568 
569  while (shm_br->br_info[br_index].ready_to_service != true)
570  {
571  SLEEP_MILISEC (0, 200);
572  }
573 
574  THREAD_BEGIN (receiver_thread, receiver_thr_f, NULL);
575 
576  if (br_shard_flag == ON)
577  {
578  THREAD_BEGIN (dispatch_thread, shard_dispatch_thr_f, NULL);
579  }
580  else
581  {
582  THREAD_BEGIN (dispatch_thread, dispatch_thr_f, NULL);
583  }
584  THREAD_BEGIN (psize_check_thread, psize_check_thr_f, NULL);
585  THREAD_BEGIN (cas_monitor_thread, cas_monitor_thr_f, NULL);
586  THREAD_BEGIN (server_monitor_thread, server_monitor_thr_f, NULL);
587 
588  if (br_shard_flag == ON)
589  {
590  THREAD_BEGIN (proxy_monitor_thread, proxy_monitor_thr_f, NULL);
591 #if !defined(WINDOWS)
592  THREAD_BEGIN (proxy_listener_thread, proxy_listener_thr_f, NULL);
593 #endif /* !WINDOWS */
594  }
595 
596  if (shm_br->br_info[br_index].monitor_hang_flag)
597  {
598  THREAD_BEGIN (hang_check_thread, hang_check_thr_f, NULL);
599  }
600 
601  if (br_shard_flag == ON)
602  {
603  br_info_p->err_code = 0; /* DO NOT DELETE!!! : reset error code */
604 
606  }
607  else
608  {
609 #if defined(WIN_FW)
610  for (i = 0; i < num_thr; i++)
611  {
612  thr_index[i] = i;
613  THREAD_BEGIN (service_thread, service_thr_f, thr_index + i);
614  shm_appl->as_info[i].last_access_time = time (NULL);
615  shm_appl->as_info[i].transaction_start_time = (time_t) 0;
616  if (i < shm_br->br_info[br_index].appl_server_min_num)
617  {
618  shm_appl->as_info[i].service_flag = SERVICE_ON;
619  }
620  else
621  {
622  shm_appl->as_info[i].service_flag = SERVICE_OFF_ACK;
623  }
624  }
625 #endif
626 
628 
629  while (process_flag)
630  {
631  SLEEP_MILISEC (0, 100);
632 
633  if (shm_br->br_info[br_index].auto_add_appl_server == OFF)
634  {
635  continue;
636  }
637 
639  } /* end of while (process_flag) */
640  } /* end of if (SHARD == OFF) */
641 
642 error1:
643  if (br_shard_flag == ON)
644  {
645 #if !defined(WINDOWS)
647 #endif /* !WINDOWS */
648  }
649 
651  return -1;
652 }
653 
654 static void
656 {
657  int proxy_index, shard_index, i;
659  T_SHARD_INFO *shard_info_p;
660  T_APPL_SERVER_INFO *as_info_p;
661 
662  while (process_flag)
663  {
664  SLEEP_MILISEC (0, 100);
665 
666  if (shm_br->br_info[br_index].auto_add_appl_server == OFF)
667  {
668  continue;
669  }
670 
671  /* ADD UTS */
672  for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
673  {
674  proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);
675 
676  for (shard_index = 0; shard_index < proxy_info_p->num_shard_conn; shard_index++)
677  {
678  shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_index);
679 
680  if ((shard_info_p->waiter_count > 0) && (shard_info_p->num_appl_server < shard_info_p->max_appl_server))
681  {
682  for (i = shard_info_p->min_appl_server; i < shard_info_p->max_appl_server; i++)
683  {
684  as_info_p = &(shm_appl->as_info[i + shard_info_p->as_info_index_base]);
685  if (as_info_p->service_flag == SERVICE_OFF)
686  {
687  int pid;
688 
689  as_info_p->uts_status = UTS_STATUS_START;
690  as_info_p->cur_sql_log_mode = shm_appl->sql_log_mode;
691  as_info_p->cur_slow_log_mode = shm_appl->slow_log_mode;
692 
693  pid = run_appl_server (as_info_p, br_index, i + shard_info_p->as_info_index_base);
694  if (pid > 0)
695  {
696  as_info_p->pid = pid;
697  as_info_p->psize = getsize (pid);
698  as_info_p->psize_time = time (NULL);
699  as_info_p->service_flag = SERVICE_ON;
700  as_info_p->reset_flag = FALSE;
701 
702  (shm_br->br_info[br_index].appl_server_num)++;
703  (shard_info_p->num_appl_server)++;
704  (shm_appl->num_appl_server)++;
705  }
706  break;
707  }
708  }
709  }
710  }
711  }
712  }
713  return;
714 }
715 
716 static void
717 cleanup (int signo)
718 {
719  signal (signo, SIG_IGN);
720 
721  process_flag = 0;
722 #ifdef SOLARIS
723  SLEEP_MILISEC (1, 0);
724 #endif
726  if (br_shard_flag == ON)
727  {
728 #if !defined(WINDOWS)
730 #endif /* !WINDOWS */
731  }
732  exit (0);
733 }
734 
735 static void
736 send_error_to_driver (int sock, int error, char *driver_info)
737 {
738  int write_val;
739  int driver_version;
740 
741  driver_version = CAS_MAKE_PROTO_VER (driver_info);
742 
743  if (error == NO_ERROR)
744  {
745  write_val = 0;
746  }
747  else
748  {
749  if (driver_version == CAS_PROTO_MAKE_VER (PROTOCOL_V2) || cas_di_understand_renewed_error_code (driver_info))
750  {
751  write_val = htonl (error);
752  }
753  else
754  {
755  write_val = htonl (CAS_CONV_ERROR_TO_OLD (error));
756  }
757  }
758 
759  write_to_client (sock, (char *) &write_val, sizeof (int));
760 }
761 
762 static const char *cas_client_type_str[] = {
763  "UNKNOWN", /* CAS_CLIENT_NONE */
764  "CCI", /* CAS_CLIENT_CCI */
765  "ODBC", /* CAS_CLIENT_ODBC */
766  "JDBC", /* CAS_CLIENT_JDBC */
767  "PHP", /* CAS_CLIENT_PHP */
768  "OLEDB", /* CAS_CLIENT_OLEDB */
769  "INTERNAL_JDBC" /* CAS_CLIENT_SERVER_SIDE_JDBC */
770 };
771 
772 static THREAD_FUNC
773 receiver_thr_f (void *arg)
774 {
775  T_SOCKLEN clt_sock_addr_len;
776  struct sockaddr_in clt_sock_addr;
778  int job_queue_size;
779  T_MAX_HEAP_NODE *job_queue;
780  T_MAX_HEAP_NODE new_job;
781  int job_count;
782  int read_len;
783  int one = 1;
784  char cas_req_header[SRV_CON_CLIENT_INFO_SIZE];
785  char cas_client_type;
786  char driver_version;
787  T_BROKER_VERSION client_version;
788 #if defined(LINUX)
789  int timeout;
790 #endif /* LINUX */
791 
792  job_queue_size = shm_appl->job_queue_size;
793  job_queue = shm_appl->job_queue;
794  job_count = 1;
795 
796 #if !defined(WINDOWS)
797  signal (SIGPIPE, SIG_IGN);
798 #endif
799 
800 #if defined(LINUX)
801  timeout = 5;
802  setsockopt (sock_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, (char *) &timeout, sizeof (timeout));
803 #endif /* LINUX */
804 
805  while (process_flag)
806  {
807  clt_sock_addr_len = sizeof (clt_sock_addr);
808  clt_sock_fd = accept (sock_fd, (struct sockaddr *) &clt_sock_addr, &clt_sock_addr_len);
809  if (IS_INVALID_SOCKET (clt_sock_fd))
810  {
811  continue;
812  }
813 
815  {
817  CLOSE_SOCKET (clt_sock_fd);
818  continue;
819  }
820 
821 #if !defined(WINDOWS) && defined(ASYNC_MODE)
822  if (fcntl (clt_sock_fd, F_SETFL, FNDELAY) < 0)
823  {
824  CLOSE_SOCKET (clt_sock_fd);
825  continue;
826  }
827 #endif
828 
829  setsockopt (clt_sock_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof (one));
830  ut_set_keepalive (clt_sock_fd);
831 
832  cas_client_type = CAS_CLIENT_NONE;
833 
834  /* read header */
835  read_len = read_nbytes_from_client (clt_sock_fd, cas_req_header, SRV_CON_CLIENT_INFO_SIZE);
836  if (read_len < 0)
837  {
838  CLOSE_SOCKET (clt_sock_fd);
839  continue;
840  }
841 
842  if (strncmp (cas_req_header, "PING", 4) == 0)
843  {
844  int ret_code = 0;
845  CAS_SEND_ERROR_CODE (clt_sock_fd, ret_code);
846  CLOSE_SOCKET (clt_sock_fd);
847  continue;
848  }
849 
850  if (strncmp (cas_req_header, "ST", 2) == 0)
851  {
852  int status = FN_STATUS_NONE;
853  int pid, i;
854  unsigned int session_id;
855 
856  memcpy ((char *) &pid, cas_req_header + 2, 4);
857  pid = ntohl (pid);
858  memcpy ((char *) &session_id, cas_req_header + 6, 4);
859  session_id = ntohl (session_id);
860 
861  if (shm_br->br_info[br_index].shard_flag == OFF)
862  {
863  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
864  {
865  if (shm_appl->as_info[i].service_flag == SERVICE_ON && shm_appl->as_info[i].pid == pid)
866  {
867  if (session_id == shm_appl->as_info[i].session_id)
868  {
869  status = shm_appl->as_info[i].fn_status;
870  }
871  break;
872  }
873  }
874  }
875 
876  CAS_SEND_ERROR_CODE (clt_sock_fd, status);
877  CLOSE_SOCKET (clt_sock_fd);
878  continue;
879  }
880 
881  /*
882  * Query cancel message (size in bytes)
883  *
884  * - For client version 8.4.0 patch 1 or below:
885  * |COMMAND("CANCEL",6)|PID(4)|
886  *
887  * - For CAS protocol version 1 or above:
888  * |COMMAND("QC",2)|PID(4)|CLIENT_PORT(2)|RESERVED(2)|
889  *
890  * CLIENT_PORT can be 0 if the client failed to get its local port.
891  */
892  else if (strncmp (cas_req_header, "QC", 2) == 0 || strncmp (cas_req_header, "CANCEL", 6) == 0
893  || strncmp (cas_req_header, "X1", 2) == 0)
894  {
895  int ret_code = 0;
896 #if !defined(WINDOWS)
897  int pid, i;
898  unsigned short client_port = 0;
899 #endif
900 
901 #if !defined(WINDOWS)
902  if (cas_req_header[0] == 'Q')
903  {
904  memcpy ((char *) &pid, cas_req_header + 2, 4);
905  memcpy ((char *) &client_port, cas_req_header + 6, 2);
906  pid = ntohl (pid);
907  client_port = ntohs (client_port);
908  }
909  else
910  {
911  memcpy ((char *) &pid, cas_req_header + 6, 4);
912  pid = ntohl (pid);
913  }
914 
915  ret_code = CAS_ER_QUERY_CANCEL;
916  if (shm_br->br_info[br_index].shard_flag == OFF)
917  {
918 
919  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
920  {
921  if (shm_appl->as_info[i].service_flag == SERVICE_ON && shm_appl->as_info[i].pid == pid
922  && shm_appl->as_info[i].uts_status == UTS_STATUS_BUSY)
923  {
924  if (cas_req_header[0] == 'Q' && client_port > 0
925  && shm_appl->as_info[i].cas_clt_port != client_port
926  && memcmp (&shm_appl->as_info[i].cas_clt_ip, &clt_sock_addr.sin_addr, 4) != 0)
927  {
928  continue;
929  }
930 
931  ret_code = 0;
932  kill (pid, SIGUSR1);
933  break;
934  }
935  }
936  }
937  else
938  {
939  /* SHARD TODO : not implemented yet */
940  }
941 #endif
942  if (cas_req_header[0] == 'X')
943  {
944  char driver_info[SRV_CON_CLIENT_INFO_SIZE];
945 
946  driver_info[SRV_CON_MSG_IDX_PROTO_VERSION] = cas_req_header[2];
947  driver_info[SRV_CON_MSG_IDX_FUNCTION_FLAG] = cas_req_header[3];
948  send_error_to_driver (clt_sock_fd, ret_code, driver_info);
949  }
950  else
951  {
952  ret_code = CAS_CONV_ERROR_TO_OLD (ret_code);
953  CAS_SEND_ERROR_CODE (clt_sock_fd, ret_code);
954  }
955  CLOSE_SOCKET (clt_sock_fd);
956  continue;
957  }
958 
959  cas_client_type = cas_req_header[SRV_CON_MSG_IDX_CLIENT_TYPE];
960  if (!(strncmp (cas_req_header, SRV_CON_CLIENT_MAGIC_STR, SRV_CON_CLIENT_MAGIC_LEN) == 0
961  || strncmp (cas_req_header, SRV_CON_CLIENT_MAGIC_STR_SSL, SRV_CON_CLIENT_MAGIC_LEN) == 0)
962  || cas_client_type < CAS_CLIENT_TYPE_MIN || cas_client_type > CAS_CLIENT_TYPE_MAX)
963  {
964  send_error_to_driver (clt_sock_fd, CAS_ER_NOT_AUTHORIZED_CLIENT, cas_req_header);
965  CLOSE_SOCKET (clt_sock_fd);
966  continue;
967  }
968 
969  if ((IS_SSL_CLIENT (cas_req_header) && shm_br->br_info[br_index].use_SSL == OFF)
970  || (!IS_SSL_CLIENT (cas_req_header) && shm_br->br_info[br_index].use_SSL == ON))
971  {
972  send_error_to_driver (clt_sock_fd, CAS_ER_SSL_TYPE_NOT_ALLOWED, cas_req_header);
973  CLOSE_SOCKET (clt_sock_fd);
974  continue;
975  }
976 
977  driver_version = cas_req_header[SRV_CON_MSG_IDX_PROTO_VERSION];
978  if (driver_version & CAS_PROTO_INDICATOR)
979  {
980  /* Protocol version */
981  client_version = CAS_PROTO_UNPACK_NET_VER (driver_version);
982  }
983  else
984  {
985  /* Build version; major, minor, and patch */
986  client_version =
987  CAS_MAKE_VER (cas_req_header[SRV_CON_MSG_IDX_MAJOR_VER], cas_req_header[SRV_CON_MSG_IDX_MINOR_VER],
988  cas_req_header[SRV_CON_MSG_IDX_PATCH_VER]);
989  }
990 
991  if (br_shard_flag == ON)
992  {
993  /* SHARD ONLY SUPPORT client_version.8.2.0 ~ */
994  if (client_version < CAS_MAKE_VER (8, 2, 0))
995  {
997  CLOSE_SOCKET (clt_sock_fd);
998  continue;
999  }
1000  }
1001 
1002  if (v3_acl != NULL)
1003  {
1004  unsigned char ip_addr[4];
1005 
1006  memcpy (ip_addr, &(clt_sock_addr.sin_addr), 4);
1007 
1008  if (uw_acl_check (ip_addr) < 0)
1009  {
1010  send_error_to_driver (clt_sock_fd, CAS_ER_NOT_AUTHORIZED_CLIENT, cas_req_header);
1011  CLOSE_SOCKET (clt_sock_fd);
1012  continue;
1013  }
1014  }
1015 
1016  if (job_queue[0].id == job_queue_size)
1017  {
1018  send_error_to_driver (clt_sock_fd, CAS_ER_FREE_SERVER, cas_req_header);
1019  CLOSE_SOCKET (clt_sock_fd);
1020  continue;
1021  }
1022 
1023  if (max_open_fd < clt_sock_fd)
1024  {
1026  }
1027 
1028  job_count = (job_count >= JOB_COUNT_MAX) ? 1 : job_count + 1;
1029  new_job.id = job_count;
1030  new_job.clt_sock_fd = clt_sock_fd;
1031  new_job.recv_time = time (NULL);
1032  new_job.priority = 0;
1033  new_job.script[0] = '\0';
1034  new_job.cas_client_type = cas_client_type;
1035  new_job.port = ntohs (clt_sock_addr.sin_port);
1036  memcpy (new_job.ip_addr, &(clt_sock_addr.sin_addr), 4);
1037  strcpy (new_job.prg_name, cas_client_type_str[(int) cas_client_type]);
1038  new_job.clt_version = client_version;
1039  memcpy (new_job.driver_info, cas_req_header, SRV_CON_CLIENT_INFO_SIZE);
1040 
1041  while (1)
1042  {
1044  if (max_heap_insert (job_queue, job_queue_size, &new_job) < 0)
1045  {
1047  SLEEP_MILISEC (0, 100);
1048  }
1049  else
1050  {
1051  pthread_cond_signal (&clt_table_cond);
1053  break;
1054  }
1055  }
1056  }
1057 
1058 #if defined(WINDOWS)
1059  return;
1060 #else
1061  return NULL;
1062 #endif
1063 }
1064 
1065 static THREAD_FUNC
1067 {
1068  T_MAX_HEAP_NODE *job_queue;
1069  T_MAX_HEAP_NODE cur_job;
1070  int ip_addr;
1071 #if defined(WINDOWS)
1072  int proxy_port;
1073 #else
1074  SOCKET proxy_fd;
1075  int proxy_status;
1076  int ret_val;
1077 #endif
1078 
1079  job_queue = shm_appl->job_queue;
1080 
1081  while (process_flag)
1082  {
1084  if (max_heap_delete (job_queue, &cur_job) < 0)
1085  {
1087  SLEEP_MILISEC (0, 30);
1088  continue;
1089  }
1091 
1092  max_heap_incr_priority (job_queue);
1093 
1094 #if defined(WINDOWS)
1095  memcpy (&ip_addr, cur_job.ip_addr, 4);
1096  proxy_port = broker_find_available_proxy (shm_proxy_p, ip_addr, cur_job.clt_version);
1097 
1098  CAS_SEND_ERROR_CODE (cur_job.clt_sock_fd, proxy_port);
1099  CLOSE_SOCKET (cur_job.clt_sock_fd);
1100 #else /* WINDOWS */
1101  proxy_fd = broker_find_available_proxy (shm_proxy_p);
1102  if (proxy_fd != INVALID_SOCKET)
1103  {
1104  memcpy (&ip_addr, cur_job.ip_addr, 4);
1105  ret_val = send_fd (proxy_fd, cur_job.clt_sock_fd, ip_addr, cur_job.driver_info);
1106  if (ret_val > 0)
1107  {
1108  ret_val =
1109  read_from_client_with_timeout (proxy_fd, (char *) &proxy_status, sizeof (int), SOCKET_TIMEOUT_SEC);
1110  if (ret_val < 0)
1111  {
1112  broker_delete_proxy_conn_by_fd (proxy_fd);
1113  CLOSE_SOCKET (proxy_fd);
1115  }
1116  }
1117  else
1118  {
1119  broker_delete_proxy_conn_by_fd (proxy_fd);
1120  CLOSE_SOCKET (proxy_fd);
1121 
1123  }
1124  }
1125  else
1126  {
1128  }
1129 
1130  CLOSE_SOCKET (cur_job.clt_sock_fd);
1131 #endif /* !WINDOWS */
1132  }
1133 #if defined(WINDOWS)
1134  return;
1135 #else
1136  return NULL;
1137 #endif
1138 }
1139 
1140 static THREAD_FUNC
1141 dispatch_thr_f (void *arg)
1142 {
1143  T_MAX_HEAP_NODE *job_queue;
1144  T_MAX_HEAP_NODE cur_job;
1145 #if !defined(WINDOWS)
1147 #endif /* !WINDOWS */
1148 
1149  int as_index, i;
1150 
1151  job_queue = shm_appl->job_queue;
1152 
1153  while (process_flag)
1154  {
1155  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
1156  {
1157  if (shm_appl->as_info[i].service_flag == SERVICE_OFF)
1158  {
1159  if (shm_appl->as_info[i].uts_status == UTS_STATUS_IDLE)
1160  shm_appl->as_info[i].service_flag = SERVICE_OFF_ACK;
1161  }
1162  }
1163 
1165  if (max_heap_delete (job_queue, &cur_job) < 0)
1166  {
1167  struct timespec ts;
1168  struct timeval tv;
1169  int r;
1170 
1171  gettimeofday (&tv, NULL);
1172  ts.tv_sec = tv.tv_sec;
1173  ts.tv_nsec = (tv.tv_usec + 30000) * 1000;
1174  if (ts.tv_nsec > 1000000000)
1175  {
1176  ts.tv_sec += 1;
1177  ts.tv_nsec -= 1000000000;
1178  }
1179  r = pthread_cond_timedwait (&clt_table_cond, &clt_table_mutex, &ts);
1180  if (r != 0)
1181  {
1183  continue;
1184  }
1185  r = max_heap_delete (job_queue, &cur_job);
1186  assert (r == 0);
1187  }
1188 
1189  hold_job = 1;
1190  max_heap_incr_priority (job_queue);
1192 
1193 #if !defined (WINDOWS)
1194  retry:
1195 #endif
1196  while (1)
1197  {
1198  as_index = find_idle_cas ();
1199  if (as_index < 0)
1200  {
1201  if (broker_add_new_cas ())
1202  {
1203  continue;
1204  }
1205  else
1206  {
1207  SLEEP_MILISEC (0, 30);
1208  }
1209  }
1210  else
1211  {
1212  break;
1213  }
1214  }
1215 
1216  hold_job = 0;
1217 
1218  shm_appl->as_info[as_index].num_connect_requests++;
1219 #if !defined(WIN_FW)
1220  shm_appl->as_info[as_index].clt_version = cur_job.clt_version;
1221  memcpy (shm_appl->as_info[as_index].driver_info, cur_job.driver_info, SRV_CON_CLIENT_INFO_SIZE);
1222  shm_appl->as_info[as_index].cas_client_type = cur_job.cas_client_type;
1223  memcpy (shm_appl->as_info[as_index].cas_clt_ip, cur_job.ip_addr, 4);
1224  shm_appl->as_info[as_index].cas_clt_port = cur_job.port;
1225 #if defined(WINDOWS)
1226  shm_appl->as_info[as_index].uts_status = UTS_STATUS_BUSY_WAIT;
1227  CAS_SEND_ERROR_CODE (cur_job.clt_sock_fd, shm_appl->as_info[as_index].as_port);
1228  CLOSE_SOCKET (cur_job.clt_sock_fd);
1229  shm_appl->as_info[as_index].num_request++;
1230  shm_appl->as_info[as_index].last_access_time = time (NULL);
1231  shm_appl->as_info[as_index].transaction_start_time = (time_t) 0;
1232 #else /* WINDOWS */
1233 
1234  srv_sock_fd = connect_srv (shm_br->br_info[br_index].name, as_index);
1235 
1236  if (!IS_INVALID_SOCKET (srv_sock_fd))
1237  {
1238  int ip_addr;
1239  int ret_val;
1240  int con_status, uts_status;
1241 
1242  con_status = htonl (shm_appl->as_info[as_index].con_status);
1243 
1244  ret_val = write_to_client_with_timeout (srv_sock_fd, (char *) &con_status, sizeof (int), SOCKET_TIMEOUT_SEC);
1245  if (ret_val != sizeof (int))
1246  {
1247  CLOSE_SOCKET (srv_sock_fd);
1248  goto retry;
1249  }
1250 
1251  ret_val = read_from_client_with_timeout (srv_sock_fd, (char *) &con_status, sizeof (int), SOCKET_TIMEOUT_SEC);
1252  if (ret_val != sizeof (int) || ntohl (con_status) != CON_STATUS_IN_TRAN)
1253  {
1254  CLOSE_SOCKET (srv_sock_fd);
1255  goto retry;
1256  }
1257 
1258  memcpy (&ip_addr, cur_job.ip_addr, 4);
1259  ret_val = send_fd (srv_sock_fd, cur_job.clt_sock_fd, ip_addr, cur_job.driver_info);
1260  if (ret_val > 0)
1261  {
1262  ret_val =
1263  read_from_client_with_timeout (srv_sock_fd, (char *) &uts_status, sizeof (int), SOCKET_TIMEOUT_SEC);
1264  }
1265  CLOSE_SOCKET (srv_sock_fd);
1266 
1267  if (ret_val < 0)
1268  {
1270  }
1271  else
1272  {
1273  shm_appl->as_info[as_index].num_request++;
1274  }
1275  }
1276  else
1277  {
1278  goto retry;
1279  }
1280 
1281  CLOSE_SOCKET (cur_job.clt_sock_fd);
1282 #endif /* ifdef !WINDOWS */
1283 #else /* !WIN_FW */
1284  session_request_q[as_index] = cur_job;
1285 #endif /* WIN_FW */
1286  }
1287 
1288 #if defined(WINDOWS)
1289  return;
1290 #else
1291  return NULL;
1292 #endif
1293 }
1294 
1295 #if defined(WIN_FW)
1296 static THREAD_FUNC
1297 service_thr_f (void *arg)
1298 {
1299  int self_index = *((int *) arg);
1301  int ip_addr;
1302  int cas_pid;
1303  T_MAX_HEAP_NODE cur_job;
1304 
1305  while (process_flag)
1306  {
1307  if (!IS_INVALID_SOCKET (session_request_q[self_index].clt_sock_fd))
1308  {
1309  cur_job = session_request_q[self_index];
1310  session_request_q[self_index].clt_sock_fd = INVALID_SOCKET;
1311  }
1312  else
1313  {
1314  SLEEP_MILISEC (0, 10);
1315  continue;
1316  }
1317 
1318  clt_sock_fd = cur_job.clt_sock_fd;
1319  memcpy (&ip_addr, cur_job.ip_addr, 4);
1320 
1321  shm_appl->as_info[self_index].clt_major_version = cur_job.clt_major_version;
1322  shm_appl->as_info[self_index].clt_minor_version = cur_job.clt_minor_version;
1323  shm_appl->as_info[self_index].clt_patch_version = cur_job.clt_patch_version;
1324  shm_appl->as_info[self_index].cas_client_type = cur_job.cas_client_type;
1325  shm_appl->as_info[self_index].close_flag = 0;
1326  cas_pid = shm_appl->as_info[self_index].pid;
1327 
1328  srv_sock_fd = connect_srv (shm_br->br_info[br_index].name, self_index);
1329  if (IS_INVALID_SOCKET (srv_sock_fd))
1330  {
1331  send_error_to_driver (clt_sock_fd, CAS_ER_FREE_SERVER, cur_job.driver_info);
1332  shm_appl->as_info[self_index].uts_status = UTS_STATUS_IDLE;
1333  CLOSE_SOCKET (cur_job.clt_sock_fd);
1334  continue;
1335  }
1336  else
1337  {
1338  CAS_SEND_ERROR_CODE (clt_sock_fd, 0);
1339  shm_appl->as_info[self_index].num_request++;
1340  shm_appl->as_info[self_index].last_access_time = time (NULL);
1341  shm_appl->as_info[self_index].transaction_start_time = (time_t) 0;
1342  }
1343 
1344  process_cas_request (cas_pid, self_index, clt_sock_fd, srv_sock_fd, cur_job.clt_major_version);
1345 
1346  CLOSE_SOCKET (clt_sock_fd);
1347  CLOSE_SOCKET (srv_sock_fd);
1348  }
1349 
1350  return;
1351 }
1352 #endif
1353 
1354 static int
1355 init_env (void)
1356 {
1357  char *port;
1358  int n;
1359  int one = 1;
1360 
1361  /* get a Unix stream socket */
1362  sock_fd = socket (AF_INET, SOCK_STREAM, 0);
1363  if (IS_INVALID_SOCKET (sock_fd))
1364  {
1366  return (-1);
1367  }
1368  if ((setsockopt (sock_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) < 0)
1369  {
1371  return (-1);
1372  }
1373  if ((port = getenv (PORT_NUMBER_ENV_STR)) == NULL)
1374  {
1376  return (-1);
1377  }
1378  memset (&sock_addr, 0, sizeof (struct sockaddr_in));
1379  sock_addr.sin_family = AF_INET;
1380  sock_addr.sin_port = htons ((unsigned short) (atoi (port)));
1381  sock_addr_len = sizeof (struct sockaddr_in);
1382 
1383  n = INADDR_ANY;
1384  memcpy (&sock_addr.sin_addr, &n, sizeof (int));
1385 
1386  if (bind (sock_fd, (struct sockaddr *) &sock_addr, sock_addr_len) < 0)
1387  {
1389  return (-1);
1390  }
1391 
1392  if (listen (sock_fd, shm_appl->job_queue_size) < 0)
1393  {
1395  return (-1);
1396  }
1397 
1398  return (0);
1399 }
1400 
1401 static int
1402 read_from_client (SOCKET sock_fd, char *buf, int size)
1403 {
1404  return read_from_client_with_timeout (sock_fd, buf, size, 60);
1405 }
1406 
1407 static int
1408 read_from_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec)
1409 {
1410  int read_len;
1411 #ifdef ASYNC_MODE
1412  SELECT_MASK read_mask;
1413  int nfound;
1414  int maxfd;
1415  struct timeval timeout_val, *timeout_ptr;
1416 
1417  if (timeout_sec < 0)
1418  {
1419  timeout_ptr = NULL;
1420  }
1421  else
1422  {
1423  timeout_val.tv_sec = timeout_sec;
1424  timeout_val.tv_usec = 0;
1425  timeout_ptr = &timeout_val;
1426  }
1427 #endif
1428 
1429 #ifdef ASYNC_MODE
1430  FD_ZERO (&read_mask);
1431  FD_SET (sock_fd, (fd_set *) (&read_mask));
1432  maxfd = (int) sock_fd + 1;
1433  nfound = select (maxfd, &read_mask, (SELECT_MASK *) 0, (SELECT_MASK *) 0, timeout_ptr);
1434  if (nfound < 0)
1435  {
1436  return -1;
1437  }
1438 #endif
1439 
1440 #ifdef ASYNC_MODE
1441  if (FD_ISSET (sock_fd, (fd_set *) (&read_mask)))
1442  {
1443 #endif
1444  read_len = READ_FROM_SOCKET (sock_fd, buf, size);
1445 #ifdef ASYNC_MODE
1446  }
1447  else
1448  {
1449  return -1;
1450  }
1451 #endif
1452 
1453  return read_len;
1454 }
1455 
1456 static int
1457 write_to_client (SOCKET sock_fd, char *buf, int size)
1458 {
1459  return write_to_client_with_timeout (sock_fd, buf, size, 60);
1460 }
1461 
1462 static int
1463 write_to_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec)
1464 {
1465  int write_len;
1466 #ifdef ASYNC_MODE
1467  SELECT_MASK write_mask;
1468  int nfound;
1469  int maxfd;
1470  struct timeval timeout_val, *timeout_ptr;
1471 
1472  if (timeout_sec < 0)
1473  {
1474  timeout_ptr = NULL;
1475  }
1476  else
1477  {
1478  timeout_val.tv_sec = timeout_sec;
1479  timeout_val.tv_usec = 0;
1480  timeout_ptr = &timeout_val;
1481  }
1482 #endif
1483 
1484  if (IS_INVALID_SOCKET (sock_fd))
1485  return -1;
1486 
1487 #ifdef ASYNC_MODE
1488  FD_ZERO (&write_mask);
1489  FD_SET (sock_fd, (fd_set *) (&write_mask));
1490  maxfd = (int) sock_fd + 1;
1491  nfound = select (maxfd, (SELECT_MASK *) 0, &write_mask, (SELECT_MASK *) 0, timeout_ptr);
1492  if (nfound < 0)
1493  {
1494  return -1;
1495  }
1496 #endif
1497 
1498 #ifdef ASYNC_MODE
1499  if (FD_ISSET (sock_fd, (fd_set *) (&write_mask)))
1500  {
1501 #endif
1502  write_len = WRITE_TO_SOCKET (sock_fd, buf, size);
1503 #ifdef ASYNC_MODE
1504  }
1505  else
1506  {
1507  return -1;
1508  }
1509 #endif
1510 
1511  return write_len;
1512 }
1513 
1514 /*
1515  * run_appl_server () -
1516  * return: pid
1517  * as_info_p(in): T_APPL_SERVER_INFO
1518  * br_index(in): broker index
1519  * proxy_index(in): it's only valid in SHARD! proxy index
1520  * shard_index(in): it's only valid in SHARD! shard index
1521  * as_index(in): cas index
1522  *
1523  * Note: activate CAS
1524  */
1525 static int
1526 run_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
1527 {
1528  char appl_name[APPL_SERVER_NAME_MAX_SIZE];
1529  int pid;
1530  char argv0[128];
1531 #if !defined(WINDOWS)
1532  int i;
1533 #endif
1534  char as_id_env_str[32];
1535  char appl_server_shm_key_env_str[32];
1536 
1537  while (1)
1538  {
1541  {
1543  SLEEP_MILISEC (0, 100);
1544  continue;
1545  }
1546  else
1547  {
1550  break;
1551  }
1552  }
1553 
1554  as_info_p->service_ready_flag = FALSE;
1555 
1556 #if !defined(WINDOWS)
1557  signal (SIGCHLD, SIG_IGN);
1558 
1559  /* shard_cas does not have unix-domain socket */
1560  if (br_shard_flag == OFF)
1561  {
1562  char path[BROKER_PATH_MAX];
1563 
1564  ut_get_as_port_name (path, shm_br->br_info[br_index].name, as_index, BROKER_PATH_MAX);
1565  unlink (path);
1566  }
1567 
1568  pid = fork ();
1569  if (pid == 0)
1570  {
1571  signal (SIGCHLD, SIG_DFL);
1572 
1573  for (i = 3; i <= max_open_fd; i++)
1574  {
1575  close (i);
1576  }
1577 #endif
1578  strcpy (appl_name, shm_appl->appl_server_name);
1579 
1580  sprintf (appl_server_shm_key_env_str, "%s=%d", APPL_SERVER_SHM_KEY_STR,
1581  shm_br->br_info[br_index].appl_server_shm_id);
1582  putenv (appl_server_shm_key_env_str);
1583 
1584  snprintf (as_id_env_str, sizeof (as_id_env_str), "%s=%d", AS_ID_ENV_STR, as_index);
1585  putenv (as_id_env_str);
1586 
1587  if (shm_br->br_info[br_index].appl_server == APPL_SERVER_CAS_ORACLE)
1588  {
1589  snprintf (argv0, sizeof (argv0) - 1, "%s", appl_name);
1590  }
1591  else
1592  {
1593  if (br_shard_flag == ON)
1594  {
1595  snprintf (argv0, sizeof (argv0) - 1, "%s_%s_%d_%d_%d", shm_br->br_info[br_index].name, appl_name,
1596  as_info_p->proxy_id + 1, as_info_p->shard_id, as_info_p->shard_cas_id + 1);
1597  }
1598  else
1599  {
1600  snprintf (argv0, sizeof (argv0) - 1, "%s_%s_%d", shm_br->br_info[br_index].name, appl_name, as_index + 1);
1601  }
1602  }
1603 
1604 #if defined(WINDOWS)
1605  pid = run_child (appl_name);
1606 #else
1607  execle (appl_name, argv0, NULL, environ);
1608 #endif
1609 
1610 #if !defined(WINDOWS)
1611  exit (0);
1612  }
1613 #endif
1614 
1615  if (br_shard_flag == ON)
1616  {
1617  as_info_p->uts_status = UTS_STATUS_CON_WAIT;
1618  }
1619 
1620  CON_STATUS_LOCK_DESTROY (as_info_p);
1621  CON_STATUS_LOCK_INIT (as_info_p);
1622  if (ut_is_appl_server_ready (pid, &as_info_p->service_ready_flag))
1623  {
1624  as_info_p->transaction_start_time = (time_t) 0;
1625  as_info_p->num_restarts++;
1626  }
1627  else
1628  {
1629  pid = -1;
1630  }
1631 
1633 
1634  return pid;
1635 }
1636 
1637 /*
1638  * stop_appl_server () -
1639  * return: NO_ERROR
1640  * as_info_p(in): T_APPL_SERVER_INFO
1641  * br_index(in): broker index
1642  * as_index(in): cas index
1643  *
1644  * Note: inactivate CAS
1645  */
1646 static int
1647 stop_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
1648 {
1649  ut_kill_as_process (as_info_p->pid, shm_br->br_info[br_index].name, as_index, br_shard_flag);
1650 
1651 #if defined(WINDOWS)
1652  /* [CUBRIDSUS-2068] make the broker sleep for 0.1 sec when stopping the cas in order to prevent communication error
1653  * occurred on windows. */
1654  SLEEP_MILISEC (0, 100);
1655 #endif
1656 
1657  as_info_p->pid = 0;
1658  as_info_p->last_access_time = time (NULL);
1659  as_info_p->transaction_start_time = (time_t) 0;
1660 
1661  return 0;
1662 }
1663 
1664 /*
1665  * restart_appl_server () -
1666  * return: void
1667  * as_info_p(in): T_APPL_SERVER_INFO
1668  * br_index(in): broker index
1669  * as_index(in): cas index
1670  *
1671  * Note: inactivate and activate CAS
1672  */
1673 static void
1674 restart_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
1675 {
1676  int new_pid;
1677 
1678 #if defined(WINDOWS)
1679  ut_kill_as_process (as_info_p->pid, shm_br->br_info[br_index].name, as_info_p->as_id, br_shard_flag);
1680 
1681  /* [CUBRIDSUS-2068] make the broker sleep for 0.1 sec when stopping the cas in order to prevent communication error
1682  * occurred on windows. */
1683 
1684  SLEEP_MILISEC (0, 100);
1685 
1686  new_pid = run_appl_server (as_info_p, br_index, as_index);
1687  as_info_p->pid = new_pid;
1688 #else
1689 
1690  as_info_p->psize = getsize (as_info_p->pid);
1691  if (as_info_p->psize > 1)
1692  {
1693  as_info_p->psize_time = time (NULL);
1694  }
1695  else
1696  {
1697  char pid_file_name[BROKER_PATH_MAX];
1698  FILE *fp;
1699  int old_pid;
1700 
1701  ut_get_as_pid_name (pid_file_name, shm_br->br_info[br_index].name, as_index, BROKER_PATH_MAX);
1702 
1703  fp = fopen (pid_file_name, "r");
1704  if (fp)
1705  {
1706  fscanf (fp, "%d", &old_pid);
1707  fclose (fp);
1708 
1709  as_info_p->psize = getsize (old_pid);
1710  if (as_info_p->psize > 1)
1711  {
1712  as_info_p->pid = old_pid;
1713  as_info_p->psize_time = time (NULL);
1714  }
1715  else
1716  {
1717  unlink (pid_file_name);
1718  }
1719  }
1720  }
1721 
1722  if (as_info_p->psize <= 0)
1723  {
1724  if (as_info_p->pid > 0)
1725  {
1726  ut_kill_as_process (as_info_p->pid, shm_br->br_info[br_index].name, as_index, br_shard_flag);
1727  }
1728 
1729  new_pid = run_appl_server (as_info_p, br_index, as_index);
1730  as_info_p->pid = new_pid;
1731  }
1732 #endif
1733 }
1734 
1735 static int
1737 {
1738  int total_read_size = 0, read_len;
1739 
1740  while (total_read_size < size)
1741  {
1742  read_len = read_from_client (sock_fd, buf + total_read_size, size - total_read_size);
1743  if (read_len <= 0)
1744  {
1745  total_read_size = -1;
1746  break;
1747  }
1748  total_read_size += read_len;
1749  }
1750  return total_read_size;
1751 }
1752 
1753 static SOCKET
1754 connect_srv (char *br_name, int as_index)
1755 {
1756  int sock_addr_len;
1757 #if defined(WINDOWS)
1758  struct sockaddr_in sock_addr;
1759 #else
1760  struct sockaddr_un sock_addr;
1761 #endif
1763  int one = 1;
1764  char retry_count = 0;
1765 
1766 retry:
1767 
1768 #if defined(WINDOWS)
1769  srv_sock_fd = socket (AF_INET, SOCK_STREAM, 0);
1770  if (IS_INVALID_SOCKET (srv_sock_fd))
1771  return INVALID_SOCKET;
1772 
1773  memset (&sock_addr, 0, sizeof (struct sockaddr_in));
1774  sock_addr.sin_family = AF_INET;
1775  sock_addr.sin_port = htons ((unsigned short) shm_appl->as_info[as_index].as_port);
1776  memcpy (&sock_addr.sin_addr, shm_br->my_ip_addr, 4);
1777  sock_addr_len = sizeof (struct sockaddr_in);
1778 #else
1779  srv_sock_fd = socket (AF_UNIX, SOCK_STREAM, 0);
1780  if (IS_INVALID_SOCKET (srv_sock_fd))
1781  return INVALID_SOCKET;
1782 
1783  memset (&sock_addr, 0, sizeof (struct sockaddr_un));
1784  sock_addr.sun_family = AF_UNIX;
1785 
1786  ut_get_as_port_name (sock_addr.sun_path, br_name, as_index, sizeof (sock_addr.sun_path));
1787 
1788  sock_addr_len = strlen (sock_addr.sun_path) + sizeof (sock_addr.sun_family) + 1;
1789 #endif
1790 
1791  if (connect (srv_sock_fd, (struct sockaddr *) &sock_addr, sock_addr_len) < 0)
1792  {
1793  if (retry_count < 1)
1794  {
1795  int new_pid;
1796 
1797  ut_kill_as_process (shm_appl->as_info[as_index].pid, shm_br->br_info[br_index].name, as_index, br_shard_flag);
1798 
1799  new_pid = run_appl_server (&(shm_appl->as_info[as_index]), br_index, as_index);
1800  shm_appl->as_info[as_index].pid = new_pid;
1801  retry_count++;
1802  CLOSE_SOCKET (srv_sock_fd);
1803  goto retry;
1804  }
1805  CLOSE_SOCKET (srv_sock_fd);
1806  return INVALID_SOCKET;
1807  }
1808 
1809  setsockopt (srv_sock_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof (one));
1810 
1811  return srv_sock_fd;
1812 }
1813 
1814 /*
1815  * cas_monitor_worker () -
1816  * return: void
1817  * as_info_p(in): T_APPL_SERVER_INFO
1818  * br_index(in): broker index
1819  * as_index(in): cas index
1820  * busy_uts(out): counting UTS_STATUS_BUSY status cas
1821  *
1822  * Note: monitoring CAS
1823  */
1824 static void
1825 cas_monitor_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index, int *busy_uts)
1826 {
1827  int new_pid;
1828  int restart_flag = OFF;
1830  T_SHARD_INFO *shard_info_p = NULL;
1831 
1832  if (as_info_p->service_flag != SERVICE_ON)
1833  {
1834  return;
1835  }
1836  if (as_info_p->uts_status == UTS_STATUS_BUSY)
1837  {
1838  (*busy_uts)++;
1839  }
1840 #if defined(WINDOWS)
1841  else if (as_info_p->uts_status == UTS_STATUS_BUSY_WAIT)
1842  {
1843  if (time (NULL) - as_info_p->last_access_time > 10)
1844  {
1845  as_info_p->uts_status = UTS_STATUS_IDLE;
1846  }
1847  else
1848  {
1849  (*busy_uts)++;
1850  }
1851  }
1852 #endif
1853 
1854 #if defined(WINDOWS)
1855  if (shm_appl->use_pdh_flag == TRUE)
1856  {
1857  if ((as_info_p->pid == as_info_p->pdh_pid)
1858  && (as_info_p->pdh_workset > shm_br->br_info[br_index].appl_server_hard_limit))
1859  {
1860  as_info_p->uts_status = UTS_STATUS_RESTART;
1861  }
1862  }
1863 #else
1864  if (as_info_p->psize > shm_appl->appl_server_hard_limit)
1865  {
1866  as_info_p->uts_status = UTS_STATUS_RESTART;
1867  }
1868 #endif
1869 
1870 /* if (as_info_p->service_flag != SERVICE_ON)
1871  continue; */
1872  /* check cas process status and restart it */
1873 
1874  if (br_shard_flag == ON
1875  && (as_info_p->uts_status == UTS_STATUS_BUSY || as_info_p->uts_status == UTS_STATUS_IDLE
1876  || as_info_p->uts_status == UTS_STATUS_START))
1877  {
1878  restart_flag = ON;
1879  }
1880  else if (br_shard_flag == OFF && as_info_p->uts_status == UTS_STATUS_BUSY)
1881  {
1882  restart_flag = ON;
1883  }
1884 
1885  if (restart_flag)
1886  {
1887 #if defined(WINDOWS)
1888  HANDLE phandle;
1889  phandle = OpenProcess (SYNCHRONIZE, FALSE, as_info_p->pid);
1890  if (phandle == NULL)
1891  {
1892  restart_appl_server (as_info_p, br_index, as_index);
1893  as_info_p->uts_status = UTS_STATUS_IDLE;
1894  }
1895  else
1896  {
1897  CloseHandle (phandle);
1898  }
1899 #else
1900  if (kill (as_info_p->pid, 0) < 0)
1901  {
1902  restart_appl_server (as_info_p, br_index, as_index);
1903  as_info_p->uts_status = UTS_STATUS_IDLE;
1904  }
1905 #endif
1906  }
1907 
1908  if (as_info_p->uts_status == UTS_STATUS_RESTART)
1909  {
1910  stop_appl_server (as_info_p, br_index, as_index);
1911  new_pid = run_appl_server (as_info_p, br_index, as_index);
1912 
1913  as_info_p->pid = new_pid;
1914  as_info_p->uts_status = UTS_STATUS_IDLE;
1915  }
1916  else if (br_shard_flag == ON && as_info_p->uts_status == UTS_STATUS_STOP)
1917  {
1918  proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, as_info_p->proxy_id);
1919  shard_info_p = shard_shm_find_shard_info (proxy_info_p, as_info_p->shard_id);
1920  assert (shard_info_p != NULL);
1921 
1922  (shm_br->br_info[br_index].appl_server_num)--;
1923  (shard_info_p->num_appl_server)--;
1924  (shm_appl->num_appl_server)--;
1925 
1926  as_info_p->service_flag = SERVICE_OFF;
1927  as_info_p->reset_flag = FALSE;
1928 
1929  stop_appl_server (as_info_p, br_index, as_index);
1930 
1931  as_info_p->uts_status = UTS_STATUS_IDLE;
1932  as_info_p->con_status = CON_STATUS_CLOSE;
1933  as_info_p->num_request = 0;
1934 
1935  CON_STATUS_LOCK_DESTROY (as_info_p);
1936  }
1937 }
1938 
1939 static THREAD_FUNC
1941 {
1942  int i, tmp_num_busy_uts;
1943 
1944  while (process_flag)
1945  {
1946  tmp_num_busy_uts = 0;
1947  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
1948  {
1949  cas_monitor_worker (&(shm_appl->as_info[i]), br_index, i, &tmp_num_busy_uts);
1950  }
1951 
1952  num_busy_uts = tmp_num_busy_uts;
1954  SLEEP_MILISEC (0, 100);
1955  }
1956 
1957 #if !defined(WINDOWS)
1958  return NULL;
1959 #endif
1960 }
1961 
1962 static CSS_CONN_ENTRY *
1963 connect_to_master_for_server_monitor (const char *db_name, const char *db_host)
1964 {
1965  int port_id;
1966  unsigned short rid;
1967 
1969  {
1970  return NULL;
1971  }
1972 
1973  port_id = prm_get_master_port_id ();
1974  if (port_id <= 0)
1975  {
1976  return NULL;
1977  }
1978 
1979  /* timeout : 5000 milliseconds */
1980  return (css_connect_to_master_timeout (db_host, port_id, 5000, &rid));
1981 }
1982 
1983 static int
1985 {
1986  unsigned short request_id;
1987  int error = NO_ERROR;
1988  int server_state;
1989  int buffer_size;
1990  int *buffer = NULL;
1991 
1992  if (conn == NULL)
1993  {
1994  return SERVER_STATE_DEAD;
1995  }
1996 
1997  error = css_send_request (conn, GET_SERVER_STATE, &request_id, db_name, (int) strlen (db_name) + 1);
1998  if (error != NO_ERRORS)
1999  {
2000  return SERVER_STATE_DEAD;
2001  }
2002 
2003  /* timeout : 5000 milliseconds */
2004  error = css_receive_data (conn, request_id, (char **) &buffer, &buffer_size, 5000);
2005  if (error == NO_ERRORS)
2006  {
2007  if (buffer_size == sizeof (int))
2008  {
2009  server_state = ntohl (*buffer);
2010  free_and_init (buffer);
2011 
2012  return server_state;
2013  }
2014  }
2015 
2016  if (buffer != NULL)
2017  {
2018  free_and_init (buffer);
2019  }
2020 
2021  return SERVER_STATE_UNKNOWN;
2022 }
2023 
2024 static int
2025 insert_db_server_check_list (T_DB_SERVER * list_p, int check_list_cnt, const char *db_name, const char *db_host)
2026 {
2027  int i;
2028 
2029  for (i = 0; i < check_list_cnt && i < UNUSABLE_DATABASE_MAX; i++)
2030  {
2031  if (strcmp (db_name, list_p[i].database_name) == 0 && strcmp (db_host, list_p[i].database_host) == 0)
2032  {
2033  return check_list_cnt;
2034  }
2035  }
2036 
2037  if (i == UNUSABLE_DATABASE_MAX)
2038  {
2039  return UNUSABLE_DATABASE_MAX;
2040  }
2041 
2042  strncpy_bufsize (list_p[i].database_name, db_name);
2043  strncpy_bufsize (list_p[i].database_host, db_host);
2044  list_p[i].state = -1;
2045 
2046  return i + 1;
2047 }
2048 
2049 static THREAD_FUNC
2051 {
2052  int i, j, cnt;
2053  int u_index;
2054  int check_list_cnt = 0;
2055  T_APPL_SERVER_INFO *as_info_p;
2056  T_DB_SERVER *check_list;
2057  CSS_CONN_ENTRY *conn = NULL;
2058  DB_INFO *db_info_p = NULL;
2059  char **preferred_hosts;
2060  char *unusable_db_name;
2061  char *unusable_db_host;
2062  char busy_cas_db_name[SRV_CON_DBNAME_SIZE];
2063 
2065 
2066  check_list = (T_DB_SERVER *) malloc (sizeof (T_DB_SERVER) * UNUSABLE_DATABASE_MAX);
2067 
2068  while (process_flag)
2069  {
2070  if (!shm_appl->monitor_server_flag || br_shard_flag == ON
2071  || shm_br->br_info[br_index].appl_server != APPL_SERVER_CAS || check_list == NULL)
2072  {
2073  shm_appl->unusable_databases_seq = 0;
2074  memset (shm_appl->unusable_databases_cnt, 0, sizeof (shm_appl->unusable_databases_cnt));
2076  continue;
2077  }
2078 
2079  /* 1. collect server check list */
2080  check_list_cnt = 0;
2081  u_index = shm_appl->unusable_databases_seq % 2;
2082 
2083  for (i = 0; i < shm_appl->unusable_databases_cnt[u_index]; i++)
2084  {
2085  unusable_db_name = shm_appl->unusable_databases[u_index][i].database_name;
2086  unusable_db_host = shm_appl->unusable_databases[u_index][i].database_host;
2087 
2088  check_list_cnt = insert_db_server_check_list (check_list, check_list_cnt, unusable_db_name, unusable_db_host);
2089  }
2090 
2091  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
2092  {
2093  as_info_p = &(shm_appl->as_info[i]);
2094  if (as_info_p->uts_status == UTS_STATUS_BUSY)
2095  {
2096  strncpy (busy_cas_db_name, as_info_p->database_name, SRV_CON_DBNAME_SIZE - 1);
2097 
2098  if (busy_cas_db_name[0] != '\0')
2099  {
2100  preferred_hosts = util_split_string (shm_appl->preferred_hosts, ":");
2101  if (preferred_hosts != NULL)
2102  {
2103  for (j = 0; preferred_hosts[j] != NULL; j++)
2104  {
2105  check_list_cnt =
2106  insert_db_server_check_list (check_list, check_list_cnt, busy_cas_db_name,
2107  preferred_hosts[j]);
2108  }
2109 
2110  util_free_string_array (preferred_hosts);
2111  }
2112 
2113  db_info_p = cfg_find_db (busy_cas_db_name);
2114  if (db_info_p == NULL || db_info_p->hosts == NULL)
2115  {
2116  if (db_info_p)
2117  {
2118  cfg_free_directory (db_info_p);
2119  }
2120  continue;
2121  }
2122 
2123  for (j = 0; j < db_info_p->num_hosts; j++)
2124  {
2125  check_list_cnt =
2126  insert_db_server_check_list (check_list, check_list_cnt, busy_cas_db_name, db_info_p->hosts[j]);
2127  }
2128 
2129  cfg_free_directory (db_info_p);
2130  }
2131  }
2132  }
2133 
2134  /* 2. check server state */
2135  for (i = 0; i < check_list_cnt; i++)
2136  {
2137  conn = connect_to_master_for_server_monitor (check_list[i].database_name, check_list[i].database_host);
2138  check_list[i].state = get_server_state_from_master (conn, check_list[i].database_name);
2139 
2140  if (conn != NULL)
2141  {
2142  css_free_conn (conn);
2143  conn = NULL;
2144  }
2145  }
2146 
2147  /* 3. record server state to the shared memory */
2148  cnt = 0;
2149  u_index = (shm_appl->unusable_databases_seq + 1) % 2;
2150 
2151  for (i = 0; i < check_list_cnt; i++)
2152  {
2153  if (check_list[i].state < SERVER_STATE_REGISTERED && check_list[i].state != SERVER_STATE_UNKNOWN)
2154  {
2155  strncpy (shm_appl->unusable_databases[u_index][cnt].database_name, check_list[i].database_name,
2156  SRV_CON_DBNAME_SIZE - 1);
2157  strncpy (shm_appl->unusable_databases[u_index][cnt].database_host, check_list[i].database_host,
2158  CUB_MAXHOSTNAMELEN - 1);
2159  cnt++;
2160  }
2161  }
2162 
2163  shm_appl->unusable_databases_cnt[u_index] = cnt;
2164  shm_appl->unusable_databases_seq++;
2165 
2167  }
2168 
2169  free_and_init (check_list);
2170 
2172 
2173 #if !defined(WINDOWS)
2174  return NULL;
2175 #endif
2176 }
2177 
2178 static THREAD_FUNC
2180 {
2181  unsigned int cur_index;
2182  int cur_hang_count;
2184  time_t cur_time;
2185  int collect_count_interval;
2186  int hang_count[NUM_COLLECT_COUNT_PER_INTVL] = { 0, 0, 0, 0 };
2187  float avg_hang_count;
2188 
2189  int proxy_index, i;
2191  T_APPL_SERVER_INFO *as_info_p;
2192 
2194 
2195  br_info_p = &(shm_br->br_info[br_index]);
2196  cur_hang_count = 0;
2197  cur_index = 0;
2198  avg_hang_count = 0.0;
2199  collect_count_interval = br_info_p->monitor_hang_interval / NUM_COLLECT_COUNT_PER_INTVL;
2200 
2201  while (process_flag)
2202  {
2203  cur_time = time (NULL);
2204  if (br_shard_flag == OFF)
2205  {
2206  for (i = 0; i < br_info_p->appl_server_max_num; i++)
2207  {
2208  as_info_p = &(shm_appl->as_info[i]);
2209 
2210  if ((as_info_p->service_flag != SERVICE_ON) || as_info_p->claimed_alive_time == 0)
2211  {
2212  continue;
2213  }
2214  if ((br_info_p->hang_timeout < cur_time - as_info_p->claimed_alive_time))
2215  {
2216  cur_hang_count++;
2217  }
2218  }
2219  }
2220  else
2221  {
2222  for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
2223  {
2224  proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);
2225 
2226  if ((proxy_info_p->service_flag != SERVICE_ON) || (proxy_info_p->claimed_alive_time == 0))
2227  {
2228  continue;
2229  }
2230 
2231  if ((br_info_p->hang_timeout < cur_time - proxy_info_p->claimed_alive_time))
2232  {
2233  cur_hang_count++;
2234  }
2235  }
2236  }
2237 
2238  hang_count[cur_index] = cur_hang_count;
2239 
2240  avg_hang_count = ut_get_avg_from_array (hang_count, NUM_COLLECT_COUNT_PER_INTVL);
2241 
2242  if (br_shard_flag == OFF)
2243  {
2244  br_info_p->reject_client_flag =
2245  (avg_hang_count >= (float) br_info_p->appl_server_num * HANG_COUNT_THRESHOLD_RATIO);
2246  }
2247  else
2248  {
2249  /*
2250  * reject_client_flag for shard broker
2251  * does not depend on the current number of proxies.
2252  * If one proxy hangs for the last 1 min, then
2253  * it will disable shard_broker no matter how many proxies
2254  * there are.
2255  */
2256  br_info_p->reject_client_flag = (avg_hang_count >= 1);
2257  }
2258 
2259  cur_index = (cur_index + 1) % NUM_COLLECT_COUNT_PER_INTVL;
2260  cur_hang_count = 0;
2261 
2262  SLEEP_MILISEC (collect_count_interval, 0);
2263  }
2264 #if !defined(WINDOWS)
2265  return NULL;
2266 #endif
2267 }
2268 
2269 /*
2270  * psize_check_worker () -
2271  * return: void
2272  * as_info_p(in): T_APPL_SERVER_INFO
2273  * br_index(in): broker index
2274  * proxy_index(in): it's only valid in SHARD! proxy index
2275  * shard_index(in): it's only valid in SHARD! shard index
2276  * as_index(in): cas index
2277  *
2278  * Note: check cas psize and cas log
2279  */
2280 static void
2281 psize_check_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
2282 {
2283 #if defined(WINDOWS)
2284  int pid;
2285  int cpu_time;
2286  int workset_size;
2287  float pct_cpu;
2288 #endif
2289 
2290  if (as_info_p->service_flag != SERVICE_ON)
2291  {
2292  return;
2293  }
2294 
2295 #if defined(WINDOWS)
2296  pid = as_info_p->pid;
2297 
2298  cpu_time = get_cputime_sec (pid);
2299  if (cpu_time < 0)
2300  {
2301  as_info_p->cpu_time = 0;
2302 #if 0
2303  HANDLE hProcess;
2304  hProcess = OpenProcess (PROCESS_QUERY_INFORMATION, FALSE, pid);
2305  if (hProcess == NULL)
2306  {
2307  pid = 0;
2308  as_info_p->pid = 0;
2309  as_info_p->cpu_time = 0;
2310  }
2311 #endif
2312  }
2313  else
2314  {
2315  as_info_p->cpu_time = cpu_time;
2316  }
2317 
2318  if (pdh_get_value (pid, &workset_size, &pct_cpu, NULL) >= 0)
2319  {
2320  as_info_p->pdh_pid = pid;
2321  as_info_p->pdh_workset = workset_size;
2322  as_info_p->pdh_pct_cpu = pct_cpu;
2323  }
2324 #else
2325  as_info_p->psize = getsize (as_info_p->pid);
2326 #if 0
2327  if (as_info_p->psize < 0 && as_info_p->pid > 0)
2328  {
2329  if (kill (as_info_p->pid, 0) < 0 && errno == ESRCH)
2330  {
2331  as_info_p->pid = 0;
2332  }
2333  }
2334 #endif
2335 #endif /* WINDOWS */
2336 
2337  check_cas_log (shm_br->br_info[br_index].name, as_info_p, as_index);
2338 }
2339 
2340 static void
2342 {
2344 
2345  if (proxy_info_p->cur_proxy_log_mode != PROXY_LOG_MODE_NONE)
2346  {
2347  snprintf (log_filepath, sizeof (log_filepath), "%s/%s_%d.log", shm_appl->proxy_log_dir, br_name,
2348  proxy_info_p->proxy_id + 1);
2349 
2350  if (access (log_filepath, F_OK) < 0)
2351  {
2352  FILE *fp;
2353 
2354  fp = fopen (log_filepath, "a");
2355  if (fp != NULL)
2356  {
2357  fclose (fp);
2358  }
2359  proxy_info_p->proxy_log_reset = PROXY_LOG_RESET_REOPEN;
2360  }
2361  }
2362 
2363  return;
2364 }
2365 
2366 static void
2368 {
2369  char *access_log_file;
2370  FILE *fp;
2371 
2372  access_log_file = proxy_info_p->access_log_file;
2373  if (access (access_log_file, F_OK) < 0)
2374  {
2375  fp = fopen (access_log_file, "a");
2376  if (fp != NULL)
2377  {
2378  fclose (fp);
2379  }
2381  }
2382 
2383  return;
2384 }
2385 
2386 
2387 static void
2389 {
2390  check_proxy_log (shm_br->br_info[br_index].name, proxy_info_p);
2391  check_proxy_access_log (proxy_info_p);
2392 
2393  return;
2394 }
2395 
2396 #if defined(WINDOWS)
2397 static int
2398 get_cputime_sec (int pid)
2399 {
2400  ULARGE_INTEGER ul;
2401  HANDLE hProcess;
2402  FILETIME ctime, etime, systime, usertime;
2403  int cputime = 0;
2404 
2405  if (pid <= 0)
2406  return 0;
2407 
2408  hProcess = OpenProcess (PROCESS_QUERY_INFORMATION, FALSE, pid);
2409  if (hProcess == NULL)
2410  {
2411  return -1;
2412  }
2413 
2414  if (GetProcessTimes (hProcess, &ctime, &etime, &systime, &usertime) != 0)
2415  {
2416  ul.HighPart = systime.dwHighDateTime + usertime.dwHighDateTime;
2417  ul.LowPart = systime.dwLowDateTime + usertime.dwLowDateTime;
2418  cputime = ((int) (ul.QuadPart / 10000000));
2419  }
2420  CloseHandle (hProcess);
2421 
2422  return cputime;
2423 }
2424 
2425 static THREAD_FUNC
2426 psize_check_thr_f (void *ar)
2427 {
2428  int workset_size;
2429  float pct_cpu;
2430  int cpu_time;
2431  int br_num_thr;
2432  int i;
2433  int proxy_index;
2434 
2436  T_SHARD_INFO *shard_info_p = NULL;
2437 
2438  if (pdh_init () < 0)
2439  {
2440  shm_appl->use_pdh_flag = FALSE;
2441  return;
2442  }
2443  else
2444  {
2445  shm_appl->use_pdh_flag = TRUE;
2446  }
2447 
2448  while (process_flag)
2449  {
2450  pdh_collect ();
2451 
2452  if (pdh_get_value (shm_br->br_info[br_index].pid, &workset_size, &pct_cpu, &br_num_thr) < 0)
2453  {
2454  shm_br->br_info[br_index].pdh_pct_cpu = 0;
2455  }
2456  else
2457  {
2458  cpu_time = get_cputime_sec (shm_br->br_info[br_index].pid);
2459  if (cpu_time >= 0)
2460  {
2461  shm_br->br_info[br_index].cpu_time = cpu_time;
2462  }
2463  shm_br->br_info[br_index].pdh_workset = workset_size;
2464  shm_br->br_info[br_index].pdh_pct_cpu = pct_cpu;
2465  shm_br->br_info[br_index].pdh_num_thr = br_num_thr;
2466  }
2467 
2468  if (br_shard_flag == ON)
2469  {
2470  for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
2471  {
2472  proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);
2473 
2474  proxy_check_worker (br_index, proxy_info_p);
2475  }
2476  }
2477 
2478  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
2479  {
2480  psize_check_worker (&(shm_appl->as_info[i]), br_index, i);
2481  }
2482  SLEEP_MILISEC (1, 0);
2483  }
2484 }
2485 
2486 #else /* WINDOWS */
2487 
2488 static THREAD_FUNC
2490 {
2491  int i;
2492  int proxy_index;
2493 
2495 
2496  while (process_flag)
2497  {
2498  if (br_shard_flag == ON)
2499  {
2500  for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
2501  {
2502  proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);
2503 
2504  proxy_check_worker (br_index, proxy_info_p);
2505  }
2506  }
2507 
2508  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
2509  {
2510  psize_check_worker (&(shm_appl->as_info[i]), br_index, i);
2511  }
2512 
2513  SLEEP_MILISEC (1, 0);
2514  }
2515 
2516  return NULL;
2517 }
2518 #endif /* !WINDOWS */
2519 
2520 /*
2521  * check_cas_log () -
2522  * return: void
2523  * br_name(in): broker name
2524  * as_info_p(in): T_APPL_SERVER_INFO
2525  * as_index(in): cas index
2526  * Note: check cas log and recreate
2527  */
2528 static void
2529 check_cas_log (char *br_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
2530 {
2531  char log_filename[BROKER_PATH_MAX];
2532 
2534  {
2535  return;
2536  }
2537 
2538  if (as_info_p->cur_sql_log_mode != SQL_LOG_MODE_NONE)
2539  {
2540  get_as_sql_log_filename (log_filename, BROKER_PATH_MAX, br_name, as_info_p, as_index);
2541 
2542  if (access (log_filename, F_OK) < 0)
2543  {
2544  FILE *fp;
2545  fp = fopen (log_filename, "a");
2546  if (fp != NULL)
2547  {
2548  fclose (fp);
2549  }
2550  as_info_p->cas_log_reset = CAS_LOG_RESET_REOPEN;
2551  }
2552  }
2553 
2554  if (as_info_p->cur_slow_log_mode != SLOW_LOG_MODE_OFF)
2555  {
2556  get_as_slow_log_filename (log_filename, BROKER_PATH_MAX, br_name, as_info_p, as_index);
2557 
2558  if (access (log_filename, F_OK) < 0)
2559  {
2560  FILE *fp;
2561  fp = fopen (log_filename, "a");
2562  if (fp != NULL)
2563  {
2564  fclose (fp);
2565  }
2567  }
2568  }
2569 }
2570 
2571 #ifdef WIN_FW
2572 static int
2573 process_cas_request (int cas_pid, int as_index, SOCKET clt_sock_fd, SOCKET srv_sock_fd)
2574 {
2575  char read_buf[1024];
2576  int msg_size;
2577  int read_len;
2578  int tmp_int;
2579  char *tmp_p;
2580 
2581  msg_size = SRV_CON_DB_INFO_SIZE;
2582  while (msg_size > 0)
2583  {
2584  read_len = read_from_cas_client (clt_sock_fd, read_buf, msg_size, as_index, cas_pid);
2585  if (read_len <= 0)
2586  {
2587  return -1;
2588  }
2589  if (send (srv_sock_fd, read_buf, read_len, 0) < read_len)
2590  return -1;
2591  msg_size -= read_len;
2592  }
2593 
2594  if (recv (srv_sock_fd, (char *) &msg_size, 4, 0) < 4)
2595  return -1;
2596  if (write_to_client (clt_sock_fd, (char *) &msg_size, 4) < 0)
2597  return -1;
2598  msg_size = ntohl (msg_size);
2599  while (msg_size > 0)
2600  {
2601  read_len = recv (srv_sock_fd, read_buf, (msg_size > sizeof (read_buf) ? sizeof (read_buf) : msg_size), 0);
2602  if (read_len <= 0)
2603  {
2604  return -1;
2605  }
2606  if (write_to_client (clt_sock_fd, read_buf, read_len) < 0)
2607  {
2608  return -1;
2609  }
2610  msg_size -= read_len;
2611  }
2612 
2613  while (1)
2614  {
2615  tmp_int = 4;
2616  tmp_p = (char *) &msg_size;
2617  while (tmp_int > 0)
2618  {
2619  read_len = read_from_cas_client (clt_sock_fd, tmp_p, tmp_int, as_index, cas_pid);
2620  if (read_len <= 0)
2621  {
2622  return -1;
2623  }
2624  tmp_int -= read_len;
2625  tmp_p += read_len;
2626  }
2627  if (send (srv_sock_fd, (char *) &msg_size, 4, 0) < 0)
2628  {
2629  return -1;
2630  }
2631 
2632  msg_size = ntohl (msg_size);
2633  while (msg_size > 0)
2634  {
2635  read_len =
2636  read_from_cas_client (clt_sock_fd, read_buf, (msg_size > sizeof (read_buf) ? sizeof (read_buf) : msg_size),
2637  as_index, cas_pid);
2638  if (read_len <= 0)
2639  {
2640  return -1;
2641  }
2642  if (send (srv_sock_fd, read_buf, read_len, 0) < read_len)
2643  {
2644  return -1;
2645  }
2646  msg_size -= read_len;
2647  }
2648 
2649  if (recv (srv_sock_fd, (char *) &msg_size, 4, 0) < 4)
2650  {
2651  return -1;
2652  }
2653  if (write_to_client (clt_sock_fd, (char *) &msg_size, 4) < 0)
2654  {
2655  return -1;
2656  }
2657 
2658  msg_size = ntohl (msg_size);
2659  while (msg_size > 0)
2660  {
2661  read_len = recv (srv_sock_fd, read_buf, (msg_size > sizeof (read_buf) ? sizeof (read_buf) : msg_size), 0);
2662  if (read_len <= 0)
2663  {
2664  return -1;
2665  }
2666  if (write_to_client (clt_sock_fd, read_buf, read_len) < 0)
2667  {
2668  return -1;
2669  }
2670  msg_size -= read_len;
2671  }
2672 
2673  if (shm_appl->as_info[as_index].close_flag || shm_appl->as_info[as_index].pid != cas_pid)
2674  {
2675  break;
2676  }
2677  }
2678 
2679  return 0;
2680 }
2681 
2682 static int
2683 read_from_cas_client (SOCKET sock_fd, char *buf, int size, int as_index, int cas_pid)
2684 {
2685  int read_len;
2686 #ifdef ASYNC_MODE
2687  SELECT_MASK read_mask;
2688  int nfound;
2689  int maxfd;
2690  struct timeval timeout = { 1, 0 };
2691 #endif
2692 
2693 retry:
2694 
2695 #ifdef ASYNC_MODE
2696  FD_ZERO (&read_mask);
2697  FD_SET (sock_fd, (fd_set *) (&read_mask));
2698  maxfd = sock_fd + 1;
2699  nfound = select (maxfd, &read_mask, (SELECT_MASK *) 0, (SELECT_MASK *) 0, &timeout);
2700  if (nfound < 1)
2701  {
2702  if (shm_appl->as_info[as_index].close_flag || shm_appl->as_info[as_index].pid != cas_pid)
2703  {
2704  return -1;
2705  }
2706  goto retry;
2707  }
2708 #endif
2709 
2710 #ifdef ASYNC_MODE
2711  if (FD_ISSET (sock_fd, (fd_set *) (&read_mask)))
2712  {
2713 #endif
2714  read_len = READ_FROM_SOCKET (sock_fd, buf, size);
2715 #ifdef ASYNC_MODE
2716  }
2717  else
2718  {
2719  return -1;
2720  }
2721 #endif
2722 
2723  return read_len;
2724 }
2725 #endif
2726 
2727 static int
2729 {
2730  int i;
2731  int idle_cas_id = -1;
2732  time_t max_wait_time;
2733  int wait_cas_id;
2734  time_t cur_time = time (NULL);
2735 
2737 
2738  wait_cas_id = -1;
2739  max_wait_time = 0;
2740 
2741  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
2742  {
2743  if (shm_appl->as_info[i].service_flag != SERVICE_ON)
2744  {
2745  continue;
2746  }
2747  if (shm_appl->as_info[i].uts_status == UTS_STATUS_IDLE
2748 #if !defined (WINDOWS)
2749  && kill (shm_appl->as_info[i].pid, 0) == 0
2750 #endif
2751  )
2752  {
2753  idle_cas_id = i;
2754  wait_cas_id = -1;
2755  break;
2756  }
2758  && shm_appl->as_info[i].uts_status == UTS_STATUS_BUSY && shm_appl->as_info[i].cur_keep_con == KEEP_CON_AUTO
2759  && shm_appl->as_info[i].con_status == CON_STATUS_OUT_TRAN && shm_appl->as_info[i].num_holdable_results < 1
2760  && shm_appl->as_info[i].cas_change_mode == CAS_CHANGE_MODE_AUTO)
2761  {
2762  time_t wait_time = cur_time - shm_appl->as_info[i].last_access_time;
2763  if (wait_time > max_wait_time || wait_cas_id == -1)
2764  {
2765  max_wait_time = wait_time;
2766  wait_cas_id = i;
2767  }
2768  }
2769  }
2770 
2771  if (wait_cas_id >= 0)
2772  {
2773  CON_STATUS_LOCK (&(shm_appl->as_info[wait_cas_id]), CON_STATUS_LOCK_BROKER);
2774  if (shm_appl->as_info[wait_cas_id].con_status == CON_STATUS_OUT_TRAN
2775  && shm_appl->as_info[wait_cas_id].num_holdable_results < 1
2776  && shm_appl->as_info[wait_cas_id].cas_change_mode == CAS_CHANGE_MODE_AUTO)
2777  {
2778  idle_cas_id = wait_cas_id;
2779  shm_appl->as_info[wait_cas_id].con_status = CON_STATUS_CLOSE_AND_CONNECT;
2780  }
2781  CON_STATUS_UNLOCK (&(shm_appl->as_info[wait_cas_id]), CON_STATUS_LOCK_BROKER);
2782  }
2783 
2784 #if defined(WINDOWS)
2785  if (idle_cas_id >= 0)
2786  {
2787  HANDLE h_proc;
2788  h_proc = OpenProcess (SYNCHRONIZE, FALSE, shm_appl->as_info[idle_cas_id].pid);
2789  if (h_proc == NULL)
2790  {
2791  shm_appl->as_info[i].uts_status = UTS_STATUS_RESTART;
2792  idle_cas_id = -1;
2793  }
2794  else
2795  {
2796  CloseHandle (h_proc);
2797  }
2798  }
2799 #endif
2800 
2801  if (idle_cas_id < 0)
2802  {
2804  return -1;
2805  }
2806 
2807  shm_appl->as_info[idle_cas_id].uts_status = UTS_STATUS_BUSY;
2809 
2810  return idle_cas_id;
2811 }
2812 
2813 static int
2815 {
2816  int i, drop_as_index, exist_idle_cas;
2817  time_t max_wait_time, wait_time;
2818 
2821  {
2822  drop_as_index = shm_br->br_info[br_index].appl_server_num - 1;
2823  wait_time = time (NULL) - shm_appl->as_info[drop_as_index].last_access_time;
2824  if (shm_appl->as_info[drop_as_index].uts_status == UTS_STATUS_IDLE
2825  && wait_time > shm_br->br_info[br_index].time_to_kill)
2826  {
2828  return drop_as_index;
2829  }
2831  return -1;
2832  }
2833 
2834  drop_as_index = -1;
2835  max_wait_time = -1;
2836  exist_idle_cas = 0;
2837 
2838  for (i = shm_br->br_info[br_index].appl_server_max_num - 1; i >= 0; i--)
2839  {
2840  if (shm_appl->as_info[i].service_flag != SERVICE_ON)
2841  continue;
2842 
2843  wait_time = time (NULL) - shm_appl->as_info[i].last_access_time;
2844 
2845  if (shm_appl->as_info[i].uts_status == UTS_STATUS_IDLE)
2846  {
2847  if (wait_time > shm_br->br_info[br_index].time_to_kill)
2848  {
2849  drop_as_index = i;
2850  break;
2851  }
2852  else
2853  {
2854  exist_idle_cas = 1;
2855  drop_as_index = -1;
2856  }
2857  }
2858 
2859  if (shm_appl->as_info[i].uts_status == UTS_STATUS_BUSY && shm_appl->as_info[i].con_status == CON_STATUS_OUT_TRAN
2860  && shm_appl->as_info[i].num_holdable_results < 1
2861  && shm_appl->as_info[i].cas_change_mode == CAS_CHANGE_MODE_AUTO && wait_time > max_wait_time
2862  && wait_time > shm_br->br_info[br_index].time_to_kill && exist_idle_cas == 0)
2863  {
2864  max_wait_time = wait_time;
2865  drop_as_index = i;
2866  }
2867  }
2868 
2870 
2871  return drop_as_index;
2872 }
2873 
2874 static int
2876 {
2877  int i;
2878 
2880  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
2881  {
2882  if (shm_appl->as_info[i].service_flag == SERVICE_OFF_ACK && current_dropping_as_index != i)
2883  {
2885  return i;
2886  }
2887  }
2888 
2890  return -1;
2891 }
2892 
2893 #if !defined(WINDOWS)
2894 static int
2896 {
2897  int len;
2898 
2899  if ((proxy_sock_fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
2900  {
2901  return (-1);
2902  }
2903 
2904  /* FOR DEBUG */
2905  SHARD_ERR ("<BROKER> listen to unixdoamin:[%s].\n", shm_appl->port_name);
2906 
2907  memset (&shard_sock_addr, 0, sizeof (shard_sock_addr));
2908  shard_sock_addr.sun_family = AF_UNIX;
2909  strncpy_bufsize (shard_sock_addr.sun_path, shm_appl->port_name);
2910 
2911 #ifdef _SOCKADDR_LEN /* 4.3BSD Reno and later */
2912  len = sizeof (shard_sock_addr.sun_len) + sizeof (shard_sock_addr.sun_family) + strlen (shard_sock_addr.sun_path) + 1;
2913  shard_sock_addr.sun_len = len;
2914 #else /* vanilla 4.3BSD */
2915  len = strlen (shard_sock_addr.sun_path) + sizeof (shard_sock_addr.sun_family) + 1;
2916 #endif
2917 
2918  /* bind the name to the descriptor */
2919  if (bind (proxy_sock_fd, (struct sockaddr *) &shard_sock_addr, len) < 0)
2920  {
2922  return (-2);
2923  }
2924 
2925  if (listen (proxy_sock_fd, 127) < 0)
2926  {
2927  /* tell kernel we're a server */
2929  return (-3);
2930  }
2931 
2932  return (proxy_sock_fd);
2933 }
2934 #endif /* !WINDOWS */
2935 
2936 static int
2938 {
2939  char *p;
2940  int i;
2941  int master_shm_key, as_shm_key, port_no, proxy_shm_id;
2942 
2943  p = getenv (MASTER_SHM_KEY_ENV_STR);
2944  if (p == NULL)
2945  {
2947  goto return_error;
2948  }
2949  parse_int (&master_shm_key, p, 10);
2950  SHARD_ERR ("<BROKER> MASTER_SHM_KEY_ENV_STR:[%d:%x]\n", master_shm_key, master_shm_key);
2951 
2952  shm_br = (T_SHM_BROKER *) uw_shm_open (master_shm_key, SHM_BROKER, SHM_MODE_ADMIN);
2953  if (shm_br == NULL)
2954  {
2956  goto return_error;
2957  }
2958 
2959  if ((p = getenv (PORT_NUMBER_ENV_STR)) == NULL)
2960  {
2962  goto return_error;
2963  }
2964  parse_int (&port_no, p, 10);
2965  for (i = 0, br_index = -1; i < shm_br->num_broker; i++)
2966  {
2967  if (shm_br->br_info[i].port == port_no)
2968  {
2969  br_index = i;
2970  break;
2971  }
2972  }
2973  if (br_index == -1)
2974  {
2976  goto return_error;
2977  }
2978  br_info_p = &shm_br->br_info[i];
2979 
2980  as_shm_key = br_info_p->appl_server_shm_id;
2981  SHARD_ERR ("<BROKER> APPL_SERVER_SHM_KEY_STR:[%d:%x]\n", as_shm_key, as_shm_key);
2982 
2983  shm_appl = (T_SHM_APPL_SERVER *) uw_shm_open (as_shm_key, SHM_APPL_SERVER, SHM_MODE_ADMIN);
2984  if (shm_appl == NULL)
2985  {
2987  goto return_error;
2988  }
2989 
2990  if (shm_appl->shard_flag == ON)
2991  {
2992  proxy_shm_id = br_info_p->proxy_shm_id;
2993 
2994  shm_proxy_p = (T_SHM_PROXY *) uw_shm_open (proxy_shm_id, SHM_PROXY, SHM_MODE_ADMIN);
2995  if (shm_proxy_p == NULL)
2996  {
2998  goto return_error;
2999  }
3000  }
3001 
3002  return 0;
3003 
3004 return_error:
3005  /* SHARD TODO : NOT IMPLEMENTED YET */
3006 #if 0
3008 #endif
3009 
3010  /* SHARD TODO : DETACH SHARED MEMORY */
3011 
3012  return -1;
3013 }
3014 
3015 static void
3017 {
3018  int new_pid;
3019 #if defined(WINDOWS)
3020  HANDLE phandle;
3021 #endif /* WINDOWS */
3022 
3023  if (proxy_info_p->service_flag != SERVICE_ON || proxy_info_p->pid < 0)
3024  {
3025  return;
3026  }
3027 
3028 #if defined(WINDOWS)
3029  phandle = OpenProcess (SYNCHRONIZE, FALSE, proxy_info_p->pid);
3030  if (phandle == NULL)
3031  {
3032  restart_proxy_server (proxy_info_p, br_index, proxy_index);
3033  goto shm_init;
3034  }
3035  else
3036  {
3037  CloseHandle (phandle);
3038  }
3039 #else /* WINDOWS */
3040  if (kill (proxy_info_p->pid, 0) < 0)
3041  {
3042  SLEEP_MILISEC (1, 0);
3043  if (kill (proxy_info_p->pid, 0) < 0)
3044  {
3045  restart_proxy_server (proxy_info_p, br_index, proxy_index);
3046  goto shm_init;
3047  }
3048  }
3049 #endif /* !WINDOWS */
3050 
3051  if (proxy_info_p->status == PROXY_STATUS_RESTART)
3052  {
3053  stop_proxy_server (proxy_info_p, br_index, proxy_index);
3054  new_pid = run_proxy_server (proxy_info_p, br_index, proxy_index);
3055  proxy_info_p->pid = new_pid;
3056  goto shm_init;
3057  }
3058 
3059  return;
3060 
3061 shm_init:
3062  proxy_info_p->status = PROXY_STATUS_START;
3063  proxy_info_p->cur_client = 0;
3064  proxy_info_p->stmt_waiter_count = 0;
3065 }
3066 
3067 static THREAD_FUNC
3069 {
3070  int tmp_num_busy_uts;
3071  int proxy_index;
3073 
3074  while (process_flag)
3075  {
3076  tmp_num_busy_uts = 0;
3077  for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
3078  {
3079  proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);
3080 
3081  proxy_monitor_worker (proxy_info_p, br_index, proxy_index);
3082  }
3083  SLEEP_MILISEC (0, 100);
3084  }
3085 
3086 #if !defined(WINDOWS)
3087  return NULL;
3088 #endif
3089 }
3090 
3091 #if !defined(WINDOWS)
3092 static THREAD_FUNC
3094 {
3096  struct timeval tv;
3097  struct sockaddr_in proxy_sock_addr;
3098  T_SOCKLEN proxy_sock_addr_len;
3099  SOCKET max_fd, client_fd;
3100  int proxy_id;
3101  int ret, select_ret;
3102 
3103  while (process_flag)
3104  {
3105  FD_ZERO (&allset);
3106  FD_SET (proxy_sock_fd, &allset);
3107  broker_set_proxy_fds (&allset);
3108 
3109  rset = allset;
3110 
3112  tv.tv_sec = 1;
3113  tv.tv_usec = 0;
3114  select_ret = select (max_fd, &rset, NULL, NULL, &tv);
3115  if (select_ret == 0)
3116  {
3117  continue;
3118  }
3119  else if (select_ret < 0)
3120  {
3121  if (errno == EINTR)
3122  {
3123  continue;
3124  }
3125  continue;
3126  }
3127 
3128  if (FD_ISSET (proxy_sock_fd, &rset))
3129  {
3130  proxy_sock_addr_len = sizeof (proxy_sock_addr);
3131  client_fd = accept (proxy_sock_fd, (struct sockaddr *) &proxy_sock_addr, &proxy_sock_addr_len);
3132 
3133  ret = broker_add_proxy_conn (client_fd);
3134  if (ret < 0)
3135  {
3136  CLOSE_SOCKET (client_fd);
3137  client_fd = INVALID_SOCKET;
3138  }
3139  }
3140 
3141  while ((client_fd = broker_get_readable_proxy_conn (&rset)) != INVALID_SOCKET)
3142  {
3143  ret = read_from_client (client_fd, ((char *) &proxy_id), sizeof (proxy_id));
3144  if (ret < 0)
3145  {
3146  broker_delete_proxy_conn_by_fd (client_fd);
3147 
3148  CLOSE_SOCKET (client_fd);
3149  client_fd = INVALID_SOCKET;
3150  }
3151 
3152  proxy_id = htonl (proxy_id);
3153  ret = broker_register_proxy_conn (client_fd, proxy_id);
3154  if (ret < 0)
3155  {
3156  broker_delete_proxy_conn_by_fd (client_fd);
3157 
3158  CLOSE_SOCKET (client_fd);
3159  client_fd = INVALID_SOCKET;
3160  }
3161  }
3162  }
3163 
3164  return NULL;
3165 }
3166 #endif /* !WINDOWS */
3167 
3168 /*
3169  * run_proxy_server () -
3170  * return: pid
3171  * as_info_p(in): T_APPL_SERVER_INFO
3172  * br_index(in): broker index
3173  * proxy_index(in): it's only valid in SHARD! proxy index
3174  *
3175  * Note: activate PROXY
3176  * it's only use in SHARD.
3177  */
3178 static int
3180 {
3181  const char *proxy_exe_name = NAME_PROXY;
3182  char proxy_shm_id_env_str[32], proxy_id_env_str[32];
3183  int pid;
3184 #if !defined(WINDOWS)
3185  char process_name[APPL_SERVER_NAME_MAX_SIZE];
3186  int i;
3187 #endif
3188 
3189  while (1)
3190  {
3192  if (run_proxy_flag)
3193  {
3195  SLEEP_MILISEC (0, 100);
3196  continue;
3197  }
3198  else
3199  {
3200  run_proxy_flag = 1;
3202  break;
3203  }
3204  }
3205 
3206 #if !defined(WINDOWS)
3207  signal (SIGCHLD, SIG_IGN);
3208 #endif
3209 
3210  proxy_info_p->cur_client = 0;
3211 
3212 #if !defined(WINDOWS)
3213  unlink (proxy_info_p->port_name);
3214 
3215  pid = fork ();
3216  if (pid == 0)
3217  {
3218  signal (SIGCHLD, SIG_DFL);
3219 
3220  for (i = 3; i <= max_open_fd; i++)
3221  {
3222  close (i);
3223  }
3224 #endif
3225 
3226  snprintf (proxy_shm_id_env_str, sizeof (proxy_shm_id_env_str), "%s=%d", PROXY_SHM_KEY_STR,
3227  shm_br->br_info[br_index].proxy_shm_id);
3228  putenv (proxy_shm_id_env_str);
3229 
3230  snprintf (proxy_id_env_str, sizeof (proxy_id_env_str), "%s=%d", PROXY_ID_ENV_STR, proxy_index);
3231  putenv (proxy_id_env_str);
3232 
3233 #if !defined(WINDOWS)
3234  if (snprintf (process_name, sizeof (process_name) - 1, "%s_%s_%d", shm_appl->broker_name, proxy_exe_name,
3235  proxy_index + 1) < 0)
3236  {
3237  assert (false);
3238  exit (0);
3239  }
3240 #endif /* !WINDOWS */
3241 
3242 #if defined(WINDOWS)
3243  pid = run_child (proxy_exe_name);
3244 #else
3245  execle (proxy_exe_name, process_name, NULL, environ);
3246 #endif
3247 
3248 #if !defined(WINDOWS)
3249  exit (0);
3250  }
3251 #endif
3252 
3253  run_proxy_flag = 0;
3254 
3255  return pid;
3256 }
3257 
3258 /*
3259  * stop_proxy_server () -
3260  * return: NO_ERROR
3261  * as_info_p(in): T_APPL_SERVER_INFO
3262  * br_index(in): broker index
3263  * proxy_index(in): it's only valid in SHARD! proxy index
3264  *
3265  * Note: inactivate Proxy
3266  * it's only use in SHARD.
3267  */
3268 static int
3270 {
3271  ut_kill_proxy_process (proxy_info_p->pid, shm_br->br_info[br_index].name, proxy_index);
3272 
3273 #if defined(WINDOWS)
3274  /* [CUBRIDSUS-2068] make the broker sleep for 0.1 sec when stopping the cas in order to prevent communication error
3275  * occurred on windows. */
3276  SLEEP_MILISEC (0, 100);
3277 #else /* WINDOWS */
3278 
3280 #endif /* !WINDOWS */
3281 
3282  proxy_info_p->pid = 0;
3283  proxy_info_p->cur_client = 0;
3284 
3285  return 0;
3286 }
3287 
3288 /*
3289  * restart_proxy_server () -
3290  * return: void
3291  * as_info_p(in): T_APPL_SERVER_INFO
3292  * br_index(in): broker index
3293  * proxy_index(in): it's only valid in SHARD! proxy index
3294  *
3295  * Note: inactivate and activate Proxy
3296  * it's only use in SHARD.
3297  */
3298 static void
3300 {
3301  int new_pid;
3302 
3303  stop_proxy_server (proxy_info_p, br_index, proxy_index);
3304 
3305  new_pid = run_proxy_server (proxy_info_p, br_index, proxy_index);
3306  proxy_info_p->pid = new_pid;
3307  proxy_info_p->num_restarts++;
3308 }
3309 
3310 static void
3311 get_as_sql_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
3312 {
3313  int ret;
3314  char dirname[BROKER_PATH_MAX];
3315 
3317 
3318  if (br_shard_flag == ON)
3319  {
3320  ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d_%d_%d.sql.log", dirname, broker_name,
3321  as_info_p->proxy_id + 1, as_info_p->shard_id, as_info_p->shard_cas_id + 1);
3322  }
3323  else
3324  {
3325  ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d.sql.log", dirname, broker_name, as_index + 1);
3326  }
3327  if (ret < 0)
3328  {
3329  // bad name
3330  log_filename[0] = '\0';
3331  }
3332 }
3333 
3334 static void
3335 get_as_slow_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
3336 {
3337  int ret;
3338  char dirname[BROKER_PATH_MAX];
3339 
3341 
3342  if (br_shard_flag == ON)
3343  {
3344  ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d_%d_%d.slow.log", dirname, broker_name,
3345  as_info_p->proxy_id + 1, as_info_p->shard_id, as_info_p->shard_cas_id + 1);
3346  }
3347  else
3348  {
3349  ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d.slow.log", dirname, broker_name, as_index + 1);
3350  }
3351  if (ret < 0)
3352  {
3353  // bad name
3354  log_filename[0] = '\0';
3355  }
3356 }
int proxy_shm_id
Definition: shard_proxy.c:46
#define SLEEP_MILISEC(sec, msec)
Definition: util_func.h:40
char monitor_hang_flag
int broker_init_proxy_conn(int max_proxy)
#define APPL_SERVER_CAS
Definition: broker_config.h:34
#define SOCKET_TIMEOUT_SEC
Definition: broker.c:206
char access_log_file[CONF_LOG_FILE_LEN]
Definition: broker_shm.h:492
char database_name[SRV_CON_DBNAME_SIZE]
Definition: broker_shm.h:552
#define UW_ER_NO_MORE_MEMORY
Definition: broker_error.h:40
#define UNUSABLE_DATABASE_MAX
Definition: broker_shm.h:150
char database_name[SRV_CON_DBNAME_SIZE]
Definition: broker_shm.h:352
#define NO_ERROR
Definition: error_code.h:46
int ut_set_keepalive(int sock)
Definition: broker_util.c:289
DB_INFO * cfg_find_db(const char *db_name)
#define TRUE
Definition: broker_admin.c:49
char * dirname(const char *path)
Definition: porting.c:1066
static SOCKET srv_sock_fd
Definition: cas.c:335
int max_heap_delete(T_MAX_HEAP_NODE *max_heap, T_MAX_HEAP_NODE *ret)
static THREAD_FUNC proxy_monitor_thr_f(void *arg)
Definition: broker.c:3068
static int read_from_client_with_timeout(SOCKET sock_fd, char *buf, int size, int timeout_sec)
Definition: broker.c:1408
SOCKET broker_find_available_proxy(T_SHM_PROXY *shm_proxy_p)
#define SRV_CON_CLIENT_MAGIC_LEN
Definition: cas_protocol.h:35
static THREAD_FUNC server_monitor_thr_f(void *arg)
Definition: broker.c:2050
T_BROKER_INFO br_info[1]
Definition: broker_shm.h:661
static void send_error_to_driver(int sock, int error, char *driver_info)
Definition: broker.c:736
int SOCKET
Definition: porting.h:482
T_MAX_HEAP_NODE job_queue[JOB_QUEUE_MAX_SIZE+1]
Definition: broker_shm.h:637
#define SRV_CON_CLIENT_INFO_SIZE
Definition: cas_protocol.h:34
char proxy_log_dir[CONF_LOG_FILE_LEN]
Definition: broker_shm.h:600
static CSS_CONN_ENTRY * connect_to_master_for_server_monitor(const char *db_name, const char *db_host)
Definition: broker.c:1963
T_SHARD_INFO * shard_shm_find_shard_info(T_PROXY_INFO *proxy_info_p, int shard_id)
Definition: shard_shm.c:433
#define SRV_CON_CLIENT_MAGIC_STR
Definition: cas_protocol.h:36
#define pthread_mutex_init(a, b)
Definition: area_alloc.c:48
static void proxy_monitor_worker(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
Definition: broker.c:3016
char reject_client_flag
#define CAS_CONV_ERROR_TO_OLD(V)
Definition: cas_protocol.h:301
#define SHM_BROKER
Definition: broker_shm.h:64
#define CAS_PROTO_MAKE_VER(VER)
Definition: cas_protocol.h:284
int argc
Definition: dynamic_load.c:951
int broker_add_proxy_conn(SOCKET fd)
unsigned int htonl(unsigned int from)
char ip_addr[IP_ADDR_STR_LEN]
Definition: broker.c:227
T_PROXY_INFO * shard_shm_find_proxy_info(T_SHM_PROXY *proxy_p, int proxy_id)
Definition: shard_shm.c:419
#define THREAD_BEGIN(THR_ID, FUNC, ARG)
Definition: cas_common.h:126
char port_name[SHM_APPL_SERVER_NAME_MAX]
Definition: broker_shm.h:601
int parse_int(int *ret_p, const char *str_p, int base)
Definition: porting.c:2290
unsigned char cas_clt_ip[4]
Definition: broker_shm.h:323
#define BROKER_PATH_MAX
Definition: broker_config.h:91
void css_free_conn(CSS_CONN_ENTRY *conn)
bool ut_is_appl_server_ready(int pid, char *ready_flag)
Definition: broker_util.c:498
T_BROKER_VERSION clt_version
Definition: broker_shm.h:303
static char log_filepath[BROKER_PATH_MAX]
Definition: cas_log.c:92
time_t last_access_time
Definition: broker_shm.h:317
char broker_name[BROKER_NAME_LEN]
Definition: cas.c:148
static T_BROKER_INFO * br_info_p
Definition: broker.c:312
char cas_client_type
Definition: cas.c:169
#define pthread_mutex_unlock(a)
Definition: area_alloc.c:51
static int sock_addr_len
Definition: broker.c:302
int broker_set_proxy_fds(fd_set *fds)
#define SRV_CON_MSG_IDX_MINOR_VER
Definition: cas_protocol.h:51
SERVER_STATE
Definition: broker.c:210
#define SHARD_ERR(f, a...)
#define UW_SET_ERROR_CODE(code, os_errno)
Definition: broker_error.h:87
#define SHM_PROXY
Definition: broker_shm.h:65
#define CAS_LOG_RESET_REOPEN
Definition: broker_shm.h:122
#define MASTER_SHM_KEY_ENV_STR
#define CAS_PROTO_INDICATOR
Definition: cas_protocol.h:281
#define SRV_CON_MSG_IDX_PROTO_VERSION
Definition: cas_protocol.h:46
static T_SHM_PROXY * shm_proxy_p
Definition: broker.c:313
static int stop_proxy_server(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
Definition: broker.c:3269
static THREAD_FUNC shard_dispatch_thr_f(void *arg)
Definition: broker.c:1066
static void restart_proxy_server(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
Definition: broker.c:3299
char ** environ
int unusable_databases_cnt[PAIR_LIST]
Definition: broker_shm.h:627
int wsa_initialize()
static void cas_monitor_worker(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index, int *busy_uts)
Definition: broker.c:1825
#define PROXY_LOG_RESET_REOPEN
Definition: broker_shm.h:124
static int broker_init_shm(void)
Definition: broker.c:2937
#define UTS_STATUS_RESTART
Definition: broker_shm.h:47
#define READ_FROM_SOCKET(fd, buf, size)
Definition: cas_common.h:141
#define IS_SSL_CLIENT(driver_info)
Definition: cas_protocol.h:40
#define APPL_SERVER_SHM_KEY_STR
SOCKET broker_get_proxy_conn_maxfd(SOCKET proxy_sock_fd)
static int process_flag
Definition: broker.c:333
#define UTS_STATUS_START
Definition: broker_shm.h:48
#define IP_ADDR_STR_LEN
Definition: broker.c:105
int main(int argc, char *argv[])
Definition: broker.c:463
static SOCKET connect_srv(char *br_name, int as_index)
Definition: broker.c:1754
#define SRV_CON_MSG_IDX_CLIENT_TYPE
Definition: cas_protocol.h:38
static const char * cas_client_type_str[]
Definition: broker.c:762
#define CON_STATUS_LOCK(AS_INFO, LOCK_OWNER)
Definition: broker_shm.h:106
static THREAD_FUNC dispatch_thr_f(void *arg)
Definition: broker.c:1141
#define UTS_STATUS_BUSY
Definition: broker_shm.h:45
unsigned char my_ip_addr[4]
Definition: broker_shm.h:654
#define CAS_MAKE_VER(MAJOR, MINOR, PATCH)
Definition: cas_protocol.h:304
T_DB_SERVER unusable_databases[PAIR_LIST][UNUSABLE_DATABASE_MAX]
Definition: broker_shm.h:643
static pthread_mutex_t run_appl_mutex
Definition: broker.c:324
int appl_server_hard_limit
static void check_proxy_access_log(T_PROXY_INFO *proxy_info_p)
Definition: broker.c:2367
char broker_name[BROKER_NAME_LEN]
Definition: broker_shm.h:588
static bool broker_add_new_cas(void)
Definition: broker.c:348
int max_appl_server
Definition: broker_shm.h:416
#define SRV_CON_DB_INFO_SIZE
Definition: cas_protocol.h:61
#define INVALID_SOCKET
Definition: porting.h:483
int appl_server_hard_limit
Definition: broker_shm.h:612
#define UTS_STATUS_IDLE
Definition: broker_shm.h:46
#define SRV_CON_DBNAME_SIZE
int er_init(const char *msglog_filename, int exit_ask)
#define UW_ER_CANT_CREATE_SOCKET
Definition: broker_error.h:35
#define CON_STATUS_LOCK_BROKER
Definition: broker_shm.h:72
#define PORT_NUMBER_ENV_STR
static int current_dropping_as_index
Definition: broker.c:331
#define PROXY_STATUS_RESTART
Definition: broker_shm.h:56
static void psize_check_worker(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
Definition: broker.c:2281
static int stop_appl_server(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
Definition: broker.c:1647
#define CAS_PROTO_UNPACK_NET_VER(VER)
Definition: cas_protocol.h:296
static int num_busy_uts
Definition: broker.c:335
char * get_cubrid_file(T_CUBRID_FILE_ID fid, char *buf, size_t len)
#define NUM_COLLECT_COUNT_PER_INTVL
Definition: broker.c:200
static pthread_mutex_t clt_table_mutex
Definition: broker.c:323
#define APPL_SERVER_CAS_ORACLE
Definition: broker_config.h:35
int min_appl_server
Definition: broker_shm.h:415
int maxfd
char database_host[CUB_MAXHOSTNAMELEN]
Definition: broker_shm.h:553
static int find_drop_as_index(void)
Definition: broker.c:2814
unsigned short cas_clt_port
Definition: broker_shm.h:324
int ut_kill_as_process(int pid, char *broker_name, int as_index, int shard_flag)
Definition: broker_util.c:260
#define CAS_MAKE_PROTO_VER(DRIVER_INFO)
Definition: cas_protocol.h:307
int css_send_request(CSS_CONN_ENTRY *conn, int command, unsigned short *request_id, const char *arg_buffer, int arg_buffer_size)
static void shard_broker_process(void)
Definition: broker.c:655
unsigned int unusable_databases_seq
Definition: broker_shm.h:628
#define UTS_STATUS_STOP
Definition: broker_shm.h:53
#define UTS_STATUS_CON_WAIT
Definition: broker_shm.h:52
#define assert(x)
void er_final(ER_FINAL_CODE do_global_final)
#define CON_STATUS_LOCK_INIT(AS_INFO)
Definition: broker_shm.h:100
#define NAME_PROXY
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
Definition: broker_shm.h:304
#define SELECT_MASK
Definition: cas_network.c:59
static pthread_cond_t clt_table_cond
Definition: broker.c:322
INT64 num_connect_requests
Definition: broker_shm.h:358
static THREAD_FUNC receiver_thr_f(void *arg)
Definition: broker.c:773
static int read_nbytes_from_client(SOCKET sock_fd, char *buf, int size)
Definition: broker.c:1736
int proxy_id
Definition: shard_proxy.c:45
int as_info_index_base
Definition: broker_shm.h:422
static int read_from_client(SOCKET sock_fd, char *buf, int size)
Definition: broker.c:1402
#define PROXY_ID_ENV_STR
int num_shard_conn
Definition: broker_shm.h:514
int sysprm_load_and_init(const char *db_name, const char *conf_file, const int load_flags)
int prm_get_master_port_id(void)
#define SRV_CON_CLIENT_MAGIC_STR_SSL
Definition: cas_protocol.h:37
int service_flag
Definition: broker_shm.h:462
#define IS_INVALID_SOCKET(socket)
Definition: porting.h:484
INT64 stmt_waiter_count
Definition: broker_shm.h:480
unsigned short port
#define UW_ER_CANT_BIND
Definition: broker_error.h:37
int send_fd(int server_fd, int client_fd, int rid, char *driver_info)
static void restart_appl_server(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
Definition: broker.c:1674
static int write_to_client(SOCKET sock_fd, char *buf, int size)
Definition: broker.c:1457
INT64 num_restarts
Definition: broker_shm.h:508
time_t transaction_start_time
Definition: broker_shm.h:318
static void check_cas_log(char *br_name, T_APPL_SERVER_INFO *as_info_p, int as_index)
Definition: broker.c:2529
static void cleanup(int signo)
Definition: broker.c:717
static SOCKET proxy_sock_fd
Definition: broker.c:306
#define NULL
Definition: freelistheap.h:34
#define strncpy_bufsize(buf, str)
Definition: porting.h:340
char acl_file[CONF_LOG_FILE_LEN]
pid_t pid
Definition: dynamic_load.c:955
static int init_env(void)
Definition: broker.c:1355
static THREAD_FUNC hang_check_thr_f(void *arg)
Definition: broker.c:2179
static struct sockaddr_in sock_addr
Definition: broker.c:301
void util_free_string_array(char **array)
Definition: util_func.c:292
static struct sockaddr_un shard_sock_addr
Definition: broker.c:307
char auto_add_appl_server
unsigned short htons(unsigned short from)
char script[PRE_SEND_SCRIPT_SIZE]
fd_set allset
static int get_server_state_from_master(CSS_CONN_ENTRY *conn, const char *db_name)
Definition: broker.c:1984
#define UW_ER_SHM_OPEN
Definition: broker_error.h:44
#define WRITE_TO_SOCKET(fd, buf, size)
Definition: cas_common.h:142
static T_SHM_APPL_SERVER * shm_appl
Definition: broker.c:311
static int find_idle_cas(void)
Definition: broker.c:2728
char * db_name
#define HANG_COUNT_THRESHOLD_RATIO
Definition: broker.c:201
#define SRV_CON_MSG_IDX_FUNCTION_FLAG
Definition: cas_protocol.h:47
int monitor_hang_interval
char ** util_split_string(const char *str, const char *delim)
Definition: util_func.c:247
#define CON_STATUS_LOCK_DESTROY(AS_INFO)
Definition: broker_shm.h:103
#define SET_BROKER_ERR_CODE()
Definition: broker.c:175
static int br_shard_flag
Definition: broker.c:316
unsigned int session_id
Definition: broker_shm.h:377
static THREAD_FUNC psize_check_thr_f(void *arg)
Definition: broker.c:2489
static T_SHM_BROKER * shm_br
Definition: broker.c:310
T_PROXY_INFO * proxy_info_p
Definition: shard_proxy.c:48
static void error(const char *msg)
Definition: gencat.c:331
char prg_name[PRE_SEND_PRG_NAME_SIZE]
int proxy_access_log_reset
Definition: broker_shm.h:486
static char run_appl_server_flag
Definition: broker.c:329
static int br_index
Definition: broker.c:315
int css_receive_data(CSS_CONN_ENTRY *conn, unsigned short req_id, char **buffer, int *buffer_size, int timeout)
fd_set rset
int proxy_log_reset
Definition: broker_shm.h:485
static char database_name[MAX_HA_DBINFO_LENGTH]
Definition: cas_execute.c:387
static pthread_mutex_t broker_shm_mutex
Definition: broker.c:325
static int insert_db_server_check_list(T_DB_SERVER *list_p, int check_list_cnt, const char *db_name, const char *db_host)
Definition: broker.c:2025
bool monitor_server_flag
Definition: broker_shm.h:630
time_t claimed_alive_time
Definition: broker_shm.h:319
SOCKET clt_sock_fd
Definition: broker.c:226
INT64 waiter_count
Definition: broker_shm.h:420
static int find_add_as_index(void)
Definition: broker.c:2875
unsigned short ntohs(unsigned short from)
char preferred_hosts[SHM_APPL_SERVER_NAME_MAX]
Definition: broker_shm.h:590
static char run_proxy_flag
Definition: broker.c:327
const char ** argv
Definition: dynamic_load.c:952
#define free_and_init(ptr)
Definition: memory_alloc.h:147
#define strlen(s1)
Definition: intl_support.c:43
int broker_delete_proxy_conn_by_proxy_id(int proxy_id)
unsigned char ip_addr[4]
#define PROXY_SHM_KEY_STR
static void get_as_sql_log_filename(char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO *as_info_p, int as_index)
Definition: broker.c:3311
static void get_as_slow_log_filename(char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO *as_info_p, int as_index)
Definition: broker.c:3335
static pthread_mutex_t run_proxy_mutex
Definition: broker.c:326
char name[BROKER_NAME_LEN]
#define JOB_COUNT_MAX
Definition: broker.c:197
int T_BROKER_VERSION
Definition: cas_protocol.h:342
#define CON_STATUS_UNLOCK(AS_INFO, LOCK_OWNER)
Definition: broker_shm.h:109
int broker_register_proxy_conn(SOCKET fd, int proxy_id)
static int hold_job
Definition: broker.c:345
int uw_acl_check(unsigned char *ip_addr)
#define FALSE
Definition: broker_admin.c:50
#define PROXY_STATUS_START
Definition: broker_shm.h:57
#define CLOSE_SOCKET(X)
Definition: cas_common.h:85
SOCKET broker_get_readable_proxy_conn(fd_set *fds)
#define IS_NOT_APPL_SERVER_TYPE_CAS(x)
Definition: broker_config.h:42
socklen_t T_SOCKLEN
Definition: cas_common.h:156
char appl_server_name[APPL_SERVER_NAME_MAX_SIZE]
Definition: broker_shm.h:589
static SOCKET sock_fd
Definition: broker.c:300
char driver_version[SRV_CON_VER_STR_MAX_SIZE]
Definition: broker_shm.h:305
void ut_get_as_port_name(char *port_name, char *broker_name, int as_id, int len)
Definition: broker_util.c:570
T_APPL_SERVER_INFO as_info[APPL_SERVER_NUM_LIMIT]
Definition: broker_shm.h:641
int num_appl_server
Definition: broker_shm.h:417
int i
Definition: dynamic_load.c:954
bool cas_di_understand_renewed_error_code(const char *driver_info)
Definition: cas_meta.c:203
#define THREAD_FUNC
Definition: cas_common.h:148
char port_name[SHM_PROXY_NAME_MAX]
Definition: broker_shm.h:488
static int init_proxy_env(void)
Definition: broker.c:2895
static THREAD_FUNC proxy_listener_thr_f(void *arg)
Definition: broker.c:3093
int uw_acl_make(char *acl_file)
#define SRV_CON_MSG_IDX_PATCH_VER
Definition: cas_protocol.h:52
static int run_proxy_server(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
Definition: broker.c:3179
#define SRV_CON_MSG_IDX_MAJOR_VER
Definition: cas_protocol.h:50
#define CAS_SEND_ERROR_CODE(FD, VAL)
Definition: broker.c:190
int getsize(int pid)
T_ACL * v3_acl
static void check_proxy_log(char *br_name, T_PROXY_INFO *proxy_info_p)
Definition: broker.c:2341
#define pthread_mutex_lock(a)
Definition: area_alloc.c:50
void set_cubrid_file(T_CUBRID_FILE_ID fid, char *value)
unsigned int ntohl(unsigned int from)
#define APPL_SERVER_NAME_MAX_SIZE
Definition: broker_shm.h:120
static void proxy_check_worker(int br_index, T_PROXY_INFO *proxy_info_p)
Definition: broker.c:2388
void max_heap_incr_priority(T_MAX_HEAP_NODE *max_heap)
void * uw_shm_open(int shm_key, int which_shm, T_SHM_MODE shm_mode)
Definition: broker_shm.c:139
void ut_get_as_pid_name(char *pid_name, char *br_name, int as_index, int len)
Definition: broker_util.c:702
char slow_log_dir[CONF_LOG_FILE_LEN]
Definition: broker_shm.h:586
int max_heap_insert(T_MAX_HEAP_NODE *max_heap, int max_heap_size, T_MAX_HEAP_NODE *item)
#define CUB_MAXHOSTNAMELEN
Definition: porting.h:379
time_t claimed_alive_time
Definition: broker_shm.h:511
static bool broker_drop_one_cas_by_time_to_kill(void)
Definition: broker.c:394
#define SET_BROKER_OK_CODE()
Definition: broker.c:183
static THREAD_FUNC cas_monitor_thr_f(void *arg)
Definition: broker.c:1940
#define AS_ID_ENV_STR
char log_dir[CONF_LOG_FILE_LEN]
Definition: broker_shm.h:585
static int write_to_client_with_timeout(SOCKET sock_fd, char *buf, int size, int timeout_sec)
Definition: broker.c:1463
CSS_CONN_ENTRY * css_connect_to_master_timeout(const char *host_name, int port_id, int timeout, unsigned short *rid)
int ut_kill_proxy_process(int pid, char *broker_name, int proxy_id)
Definition: broker_util.c:242
void cfg_free_directory(DB_INFO *databases)
T_BROKER_VERSION clt_version
int broker_delete_proxy_conn_by_fd(SOCKET fd)
const char ** p
Definition: dynamic_load.c:945
void broker_destroy_proxy_conn(void)
static int max_open_fd
Definition: broker.c:337
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
static int run_appl_server(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
Definition: broker.c:1526
float ut_get_avg_from_array(int array[], int size)
Definition: broker_util.c:486
#define MONITOR_SERVER_INTERVAL
Definition: broker.c:112
int cur_proxy_log_mode
Definition: broker_shm.h:464
#define SHM_APPL_SERVER
Definition: broker_shm.h:63