38 #include <sys/socket.h> 39 #include <netinet/in.h> 40 #include <arpa/inet.h> 41 #include <netinet/tcp.h> 81 #if defined(CAS_FOR_ORACLE) || defined(CAS_FOR_MYSQL) 82 #define DB_EMPTY_SESSION (0) 91 #ifdef BROKER_RESTART_DEBUG 92 #define PS_CHK_PERIOD 30 94 #define PS_CHK_PERIOD 600 99 #define SELECT_MASK int 101 #define SELECT_MASK fd_set 105 #define IP_ADDR_STR_LEN 20 106 #define BUFFER_SIZE ONE_K 108 #define ENV_BUF_INIT_SIZE 512 109 #define ALIGN_ENV_BUF_SIZE(X) \ 110 ((((X) + ENV_BUF_INIT_SIZE) / ENV_BUF_INIT_SIZE) * ENV_BUF_INIT_SIZE) 112 #define MONITOR_SERVER_INTERVAL 5 115 #define BROKER_LOG(X) \ 118 fp = fopen("broker.log", "a"); \ 121 gettimeofday(&tv, NULL); \ 122 fprintf(fp, "%d %d.%06d %s\n", __LINE__, (int)tv.tv_sec, (int)tv.tv_usec, X); \ 126 #define BROKER_LOG_INT(X) \ 130 gettimeofday(&tv, NULL); \ 131 fp = fopen("broker.log", "a"); \ 133 fprintf(fp, "%d %d.%06d %s=%d\n", __LINE__, (int)tv.tv_sec, (int)tv.tv_usec, #X, X); \ 140 #define SESSION_LOG_WRITE(IP, SID, APPL, INDEX) \ 145 gettimeofday(&tv, NULL); \ 146 ip2str(IP, ip_str); \ 147 fp = fopen("session.log", "a"); \ 149 fprintf(fp, "%d %-15s %d %d.%06d %s %d \n", br_index, ip_str, (int) SID, tv.tv_sec, tv.tv_usec, APPL, INDEX); \ 156 #define EDU_KEY "86999522480552846466422480899195252860256028745" 159 #define V3_WRITE_HEADER_OK_FILE_SOCK(sock_fd) \ 161 char buf[V3_RESPONSE_HEADER_SIZE]; \ 162 memset(buf, '\0', sizeof(buf)); \ 163 sprintf(buf, V3_HEADER_OK); \ 164 write_to_client(sock_fd, buf, sizeof(buf)); \ 167 #define V3_WRITE_HEADER_ERR_SOCK(sockfd) \ 169 char buf[V3_RESPONSE_HEADER_SIZE]; \ 170 memset(buf, '\0', sizeof(buf)); \ 171 sprintf(buf, V3_HEADER_ERR); \ 172 write_to_client(sockfd, buf, sizeof(buf)); \ 175 #define SET_BROKER_ERR_CODE() \ 177 if (shm_br && br_index >= 0) { \ 178 shm_br->br_info[br_index].err_code = uw_get_error_code(); \ 179 shm_br->br_info[br_index].os_err_code = uw_get_os_error_code(); \ 183 #define SET_BROKER_OK_CODE() \ 185 if (shm_br && br_index >= 0) { \ 186 shm_br->br_info[br_index].err_code = 0; \ 190 #define CAS_SEND_ERROR_CODE(FD, VAL) \ 193 write_val = htonl(VAL); \ 194 write_to_client(FD, (char*) &write_val, 4); \ 197 #define JOB_COUNT_MAX 1000000 200 #define NUM_COLLECT_COUNT_PER_INTVL 4 201 #define HANG_COUNT_THRESHOLD_RATIO 0.5 206 #define SOCKET_TIMEOUT_SEC 2 230 static void cleanup (
int signo);
232 #if !defined(WINDOWS) 251 #if !defined(WINDOWS) 261 static int read_from_cas_client (
SOCKET sock_fd,
char *buf,
int size,
int as_index,
int cas_pid);
294 const char *db_host);
297 static int get_cputime_sec (
int pid);
340 static int last_job_fetch_time;
341 static time_t last_session_id = 0;
350 int cur_appl_server_num;
363 if (add_as_index < 0)
396 int cur_appl_server_num, wait_job_cnt;
405 if (cur_appl_server_num <= shm_br->br_info[
br_index].appl_server_min_num || wait_job_cnt > 0)
411 if (drop_as_index < 0)
418 drop_as_info = &shm_appl->
as_info[drop_as_index];
440 if (drop_as_index >= 0)
456 WinMain (HINSTANCE hInstance,
457 HINSTANCE hPrevInstance,
466 pthread_t receiver_thread;
467 pthread_t dispatch_thread;
468 pthread_t cas_monitor_thread;
469 pthread_t psize_check_thread;
470 pthread_t hang_check_thread;
471 pthread_t server_monitor_thread;
472 pthread_t proxy_monitor_thread;
473 #if !defined(WINDOWS) 474 pthread_t proxy_listener_thread;
478 pthread_t service_thread;
494 #if !defined(WINDOWS) 495 signal (SIGCHLD, SIG_IGN);
496 signal (SIGPIPE, SIG_IGN);
528 #if !defined(WINDOWS) 545 thr_index = (
int *) malloc (
sizeof (
int) * num_thr);
546 if (thr_index ==
NULL)
554 if (session_request_q ==
NULL)
559 for (i = 0; i < num_thr; i++)
591 #if !defined(WINDOWS) 610 for (i = 0; i < num_thr; i++)
613 THREAD_BEGIN (service_thread, service_thr_f, thr_index + i);
616 if (i < shm_br->br_info[
br_index].appl_server_min_num)
645 #if !defined(WINDOWS) 657 int proxy_index, shard_index,
i;
672 for (proxy_index = 0; proxy_index < shm_proxy_p->
num_proxy; proxy_index++)
676 for (shard_index = 0; shard_index < proxy_info_p->
num_shard_conn; shard_index++)
682 for (i = shard_info_p->
min_appl_server; i < shard_info_p->max_appl_server; i++)
719 signal (signo, SIG_IGN);
728 #if !defined(WINDOWS) 751 write_val =
htonl (error);
776 struct sockaddr_in clt_sock_addr;
796 #if !defined(WINDOWS) 797 signal (SIGPIPE, SIG_IGN);
802 setsockopt (
sock_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, (
char *) &timeout,
sizeof (timeout));
807 clt_sock_addr_len =
sizeof (clt_sock_addr);
808 clt_sock_fd = accept (
sock_fd, (
struct sockaddr *) &clt_sock_addr, &clt_sock_addr_len);
821 #if !defined(WINDOWS) && defined(ASYNC_MODE) 822 if (fcntl (clt_sock_fd, F_SETFL, FNDELAY) < 0)
829 setsockopt (clt_sock_fd, IPPROTO_TCP, TCP_NODELAY, (
char *) &one,
sizeof (one));
842 if (strncmp (cas_req_header,
"PING", 4) == 0)
850 if (strncmp (cas_req_header,
"ST", 2) == 0)
854 unsigned int session_id;
856 memcpy ((
char *) &pid, cas_req_header + 2, 4);
858 memcpy ((
char *) &session_id, cas_req_header + 6, 4);
859 session_id =
ntohl (session_id);
892 else if (strncmp (cas_req_header,
"QC", 2) == 0 || strncmp (cas_req_header,
"CANCEL", 6) == 0
893 || strncmp (cas_req_header,
"X1", 2) == 0)
896 #if !defined(WINDOWS) 898 unsigned short client_port = 0;
901 #if !defined(WINDOWS) 902 if (cas_req_header[0] ==
'Q')
904 memcpy ((
char *) &pid, cas_req_header + 2, 4);
905 memcpy ((
char *) &client_port, cas_req_header + 6, 2);
907 client_port =
ntohs (client_port);
911 memcpy ((
char *) &pid, cas_req_header + 6, 4);
924 if (cas_req_header[0] ==
'Q' && client_port > 0
942 if (cas_req_header[0] ==
'X')
1006 memcpy (ip_addr, &(clt_sock_addr.sin_addr), 4);
1016 if (job_queue[0].
id == job_queue_size)
1028 job_count = (job_count >=
JOB_COUNT_MAX) ? 1 : job_count + 1;
1029 new_job.
id = job_count;
1033 new_job.
script[0] =
'\0';
1035 new_job.
port =
ntohs (clt_sock_addr.sin_port);
1036 memcpy (new_job.
ip_addr, &(clt_sock_addr.sin_addr), 4);
1037 strcpy (new_job.
prg_name, cas_client_type_str[(
int) cas_client_type]);
1058 #if defined(WINDOWS) 1071 #if defined(WINDOWS) 1094 #if defined(WINDOWS) 1095 memcpy (&ip_addr, cur_job.
ip_addr, 4);
1104 memcpy (&ip_addr, cur_job.
ip_addr, 4);
1133 #if defined(WINDOWS) 1145 #if !defined(WINDOWS) 1171 gettimeofday (&tv,
NULL);
1172 ts.tv_sec = tv.tv_sec;
1173 ts.tv_nsec = (tv.tv_usec + 30000) * 1000;
1174 if (ts.tv_nsec > 1000000000)
1177 ts.tv_nsec -= 1000000000;
1193 #if !defined (WINDOWS) 1219 #if !defined(WIN_FW) 1225 #if defined(WINDOWS) 1240 int con_status, uts_status;
1245 if (ret_val !=
sizeof (
int))
1258 memcpy (&ip_addr, cur_job.
ip_addr, 4);
1284 session_request_q[as_index] = cur_job;
1288 #if defined(WINDOWS) 1297 service_thr_f (
void *arg)
1299 int self_index = *((
int *) arg);
1309 cur_job = session_request_q[self_index];
1319 memcpy (&ip_addr, cur_job.
ip_addr, 4);
1321 shm_appl->
as_info[self_index].clt_major_version = cur_job.clt_major_version;
1322 shm_appl->
as_info[self_index].clt_minor_version = cur_job.clt_minor_version;
1323 shm_appl->
as_info[self_index].clt_patch_version = cur_job.clt_patch_version;
1325 shm_appl->
as_info[self_index].close_flag = 0;
1344 process_cas_request (cas_pid, self_index, clt_sock_fd, srv_sock_fd, cur_job.clt_major_version);
1362 sock_fd = socket (AF_INET, SOCK_STREAM, 0);
1368 if ((setsockopt (
sock_fd, SOL_SOCKET, SO_REUSEADDR, (
char *) &one,
sizeof (one))) < 0)
1378 memset (&
sock_addr, 0,
sizeof (
struct sockaddr_in));
1384 memcpy (&
sock_addr.sin_addr, &n, sizeof (
int));
1415 struct timeval timeout_val, *timeout_ptr;
1417 if (timeout_sec < 0)
1423 timeout_val.tv_sec = timeout_sec;
1424 timeout_val.tv_usec = 0;
1425 timeout_ptr = &timeout_val;
1430 FD_ZERO (&read_mask);
1431 FD_SET (sock_fd, (fd_set *) (&read_mask));
1432 maxfd = (int) sock_fd + 1;
1441 if (FD_ISSET (sock_fd, (fd_set *) (&read_mask)))
1470 struct timeval timeout_val, *timeout_ptr;
1472 if (timeout_sec < 0)
1478 timeout_val.tv_sec = timeout_sec;
1479 timeout_val.tv_usec = 0;
1480 timeout_ptr = &timeout_val;
1488 FD_ZERO (&write_mask);
1489 FD_SET (sock_fd, (fd_set *) (&write_mask));
1490 maxfd = (int) sock_fd + 1;
1499 if (FD_ISSET (sock_fd, (fd_set *) (&write_mask)))
1531 #if !defined(WINDOWS) 1534 char as_id_env_str[32];
1535 char appl_server_shm_key_env_str[32];
1556 #if !defined(WINDOWS) 1557 signal (SIGCHLD, SIG_IGN);
1571 signal (SIGCHLD, SIG_DFL);
1582 putenv (appl_server_shm_key_env_str);
1584 snprintf (as_id_env_str,
sizeof (as_id_env_str),
"%s=%d",
AS_ID_ENV_STR, as_index);
1585 putenv (as_id_env_str);
1589 snprintf (argv0,
sizeof (argv0) - 1,
"%s", appl_name);
1595 snprintf (argv0,
sizeof (argv0) - 1,
"%s_%s_%d_%d_%d", shm_br->
br_info[br_index].
name, appl_name,
1600 snprintf (argv0,
sizeof (argv0) - 1,
"%s_%s_%d", shm_br->
br_info[br_index].
name, appl_name, as_index + 1);
1604 #if defined(WINDOWS) 1605 pid = run_child (appl_name);
1610 #if !defined(WINDOWS) 1651 #if defined(WINDOWS) 1678 #if defined(WINDOWS) 1687 as_info_p->
pid = new_pid;
1691 if (as_info_p->
psize > 1)
1703 fp = fopen (pid_file_name,
"r");
1706 fscanf (fp,
"%d", &old_pid);
1710 if (as_info_p->
psize > 1)
1712 as_info_p->
pid = old_pid;
1717 unlink (pid_file_name);
1722 if (as_info_p->
psize <= 0)
1724 if (as_info_p->
pid > 0)
1730 as_info_p->
pid = new_pid;
1738 int total_read_size = 0, read_len;
1740 while (total_read_size < size)
1742 read_len =
read_from_client (sock_fd, buf + total_read_size, size - total_read_size);
1745 total_read_size = -1;
1748 total_read_size += read_len;
1750 return total_read_size;
1757 #if defined(WINDOWS) 1758 struct sockaddr_in sock_addr;
1760 struct sockaddr_un sock_addr;
1764 char retry_count = 0;
1768 #if defined(WINDOWS) 1769 srv_sock_fd = socket (AF_INET, SOCK_STREAM, 0);
1773 memset (&sock_addr, 0,
sizeof (
struct sockaddr_in));
1774 sock_addr.sin_family = AF_INET;
1775 sock_addr.sin_port =
htons ((
unsigned short) shm_appl->
as_info[as_index].as_port);
1776 memcpy (&sock_addr.sin_addr, shm_br->
my_ip_addr, 4);
1777 sock_addr_len =
sizeof (
struct sockaddr_in);
1779 srv_sock_fd = socket (AF_UNIX, SOCK_STREAM, 0);
1783 memset (&sock_addr, 0,
sizeof (
struct sockaddr_un));
1784 sock_addr.sun_family = AF_UNIX;
1788 sock_addr_len =
strlen (sock_addr.sun_path) +
sizeof (sock_addr.sun_family) + 1;
1791 if (connect (srv_sock_fd, (
struct sockaddr *) &sock_addr, sock_addr_len) < 0)
1793 if (retry_count < 1)
1809 setsockopt (srv_sock_fd, IPPROTO_TCP, TCP_NODELAY, (
char *) &one,
sizeof (one));
1828 int restart_flag =
OFF;
1840 #if defined(WINDOWS) 1841 else if (as_info_p->
uts_status == UTS_STATUS_BUSY_WAIT)
1854 #if defined(WINDOWS) 1855 if (shm_appl->use_pdh_flag ==
TRUE)
1857 if ((as_info_p->
pid == as_info_p->pdh_pid)
1887 #if defined(WINDOWS) 1889 phandle = OpenProcess (SYNCHRONIZE,
FALSE, as_info_p->
pid);
1890 if (phandle ==
NULL)
1897 CloseHandle (phandle);
1900 if (kill (as_info_p->
pid, 0) < 0)
1913 as_info_p->
pid = new_pid;
1942 int i, tmp_num_busy_uts;
1946 tmp_num_busy_uts = 0;
1957 #if !defined(WINDOWS) 1986 unsigned short request_id;
2004 error =
css_receive_data (conn, request_id, (
char **) &buffer, &buffer_size, 5000);
2007 if (buffer_size ==
sizeof (
int))
2009 server_state =
ntohl (*buffer);
2012 return server_state;
2031 if (strcmp (db_name, list_p[i].
database_name) == 0 && strcmp (db_host, list_p[i].database_host) == 0)
2033 return check_list_cnt;
2037 if (i == UNUSABLE_DATABASE_MAX)
2054 int check_list_cnt = 0;
2059 char **preferred_hosts;
2060 char *unusable_db_name;
2061 char *unusable_db_host;
2093 as_info_p = &(shm_appl->
as_info[
i]);
2098 if (busy_cas_db_name[0] !=
'\0')
2101 if (preferred_hosts !=
NULL)
2103 for (j = 0; preferred_hosts[j] !=
NULL; j++)
2107 preferred_hosts[j]);
2123 for (j = 0; j < db_info_p->
num_hosts; j++)
2135 for (i = 0; i < check_list_cnt; i++)
2151 for (i = 0; i < check_list_cnt; i++)
2173 #if !defined(WINDOWS) 2181 unsigned int cur_index;
2185 int collect_count_interval;
2187 float avg_hang_count;
2198 avg_hang_count = 0.0;
2203 cur_time = time (
NULL);
2208 as_info_p = &(shm_appl->
as_info[
i]);
2222 for (proxy_index = 0; proxy_index < shm_proxy_p->
num_proxy; proxy_index++)
2238 hang_count[cur_index] = cur_hang_count;
2264 #if !defined(WINDOWS) 2283 #if defined(WINDOWS) 2295 #if defined(WINDOWS) 2296 pid = as_info_p->
pid;
2298 cpu_time = get_cputime_sec (pid);
2301 as_info_p->cpu_time = 0;
2304 hProcess = OpenProcess (PROCESS_QUERY_INFORMATION,
FALSE, pid);
2305 if (hProcess ==
NULL)
2309 as_info_p->cpu_time = 0;
2315 as_info_p->cpu_time = cpu_time;
2318 if (pdh_get_value (pid, &workset_size, &pct_cpu,
NULL) >= 0)
2320 as_info_p->pdh_pid =
pid;
2321 as_info_p->pdh_workset = workset_size;
2322 as_info_p->pdh_pct_cpu = pct_cpu;
2327 if (as_info_p->
psize < 0 && as_info_p->
pid > 0)
2329 if (kill (as_info_p->
pid, 0) < 0 && errno == ESRCH)
2347 snprintf (log_filepath,
sizeof (log_filepath),
"%s/%s_%d.log", shm_appl->
proxy_log_dir, br_name,
2350 if (access (log_filepath, F_OK) < 0)
2354 fp = fopen (log_filepath,
"a");
2369 char *access_log_file;
2373 if (access (access_log_file, F_OK) < 0)
2375 fp = fopen (access_log_file,
"a");
2396 #if defined(WINDOWS) 2398 get_cputime_sec (
int pid)
2402 FILETIME ctime, etime, systime, usertime;
2408 hProcess = OpenProcess (PROCESS_QUERY_INFORMATION,
FALSE, pid);
2409 if (hProcess ==
NULL)
2414 if (GetProcessTimes (hProcess, &ctime, &etime, &systime, &usertime) != 0)
2416 ul.HighPart = systime.dwHighDateTime + usertime.dwHighDateTime;
2417 ul.LowPart = systime.dwLowDateTime + usertime.dwLowDateTime;
2418 cputime = ((int) (ul.QuadPart / 10000000));
2420 CloseHandle (hProcess);
2438 if (pdh_init () < 0)
2440 shm_appl->use_pdh_flag =
FALSE;
2445 shm_appl->use_pdh_flag =
TRUE;
2452 if (pdh_get_value (shm_br->
br_info[
br_index].
pid, &workset_size, &pct_cpu, &br_num_thr) < 0)
2470 for (proxy_index = 0; proxy_index < shm_proxy_p->
num_proxy; proxy_index++)
2500 for (proxy_index = 0; proxy_index < shm_proxy_p->
num_proxy; proxy_index++)
2542 if (access (log_filename, F_OK) < 0)
2545 fp = fopen (log_filename,
"a");
2558 if (access (log_filename, F_OK) < 0)
2561 fp = fopen (log_filename,
"a");
2575 char read_buf[1024];
2582 while (msg_size > 0)
2584 read_len = read_from_cas_client (clt_sock_fd, read_buf, msg_size, as_index, cas_pid);
2589 if (send (srv_sock_fd, read_buf, read_len, 0) < read_len)
2591 msg_size -= read_len;
2594 if (recv (srv_sock_fd, (
char *) &msg_size, 4, 0) < 4)
2598 msg_size =
ntohl (msg_size);
2599 while (msg_size > 0)
2601 read_len = recv (srv_sock_fd, read_buf, (msg_size >
sizeof (read_buf) ?
sizeof (read_buf) : msg_size), 0);
2610 msg_size -= read_len;
2616 tmp_p = (
char *) &msg_size;
2619 read_len = read_from_cas_client (clt_sock_fd, tmp_p, tmp_int, as_index, cas_pid);
2624 tmp_int -= read_len;
2627 if (send (srv_sock_fd, (
char *) &msg_size, 4, 0) < 0)
2632 msg_size =
ntohl (msg_size);
2633 while (msg_size > 0)
2636 read_from_cas_client (clt_sock_fd, read_buf, (msg_size >
sizeof (read_buf) ?
sizeof (read_buf) : msg_size),
2642 if (send (srv_sock_fd, read_buf, read_len, 0) < read_len)
2646 msg_size -= read_len;
2649 if (recv (srv_sock_fd, (
char *) &msg_size, 4, 0) < 4)
2658 msg_size =
ntohl (msg_size);
2659 while (msg_size > 0)
2661 read_len = recv (srv_sock_fd, read_buf, (msg_size >
sizeof (read_buf) ?
sizeof (read_buf) : msg_size), 0);
2670 msg_size -= read_len;
2673 if (shm_appl->
as_info[as_index].close_flag || shm_appl->
as_info[as_index].
pid != cas_pid)
2683 read_from_cas_client (
SOCKET sock_fd,
char *buf,
int size,
int as_index,
int cas_pid)
2690 struct timeval timeout = { 1, 0 };
2696 FD_ZERO (&read_mask);
2697 FD_SET (sock_fd, (fd_set *) (&read_mask));
2698 maxfd = sock_fd + 1;
2702 if (shm_appl->
as_info[as_index].close_flag || shm_appl->
as_info[as_index].
pid != cas_pid)
2711 if (FD_ISSET (sock_fd, (fd_set *) (&read_mask)))
2731 int idle_cas_id = -1;
2732 time_t max_wait_time;
2734 time_t cur_time = time (
NULL);
2748 #
if !defined (WINDOWS)
2763 if (wait_time > max_wait_time || wait_cas_id == -1)
2765 max_wait_time = wait_time;
2771 if (wait_cas_id >= 0)
2778 idle_cas_id = wait_cas_id;
2784 #if defined(WINDOWS) 2785 if (idle_cas_id >= 0)
2788 h_proc = OpenProcess (SYNCHRONIZE,
FALSE, shm_appl->
as_info[idle_cas_id].
pid);
2796 CloseHandle (h_proc);
2801 if (idle_cas_id < 0)
2816 int i, drop_as_index, exist_idle_cas;
2817 time_t max_wait_time, wait_time;
2828 return drop_as_index;
2864 max_wait_time = wait_time;
2871 return drop_as_index;
2893 #if !defined(WINDOWS) 2911 #ifdef _SOCKADDR_LEN 2950 SHARD_ERR (
"<BROKER> MASTER_SHM_KEY_ENV_STR:[%d:%x]\n", master_shm_key, master_shm_key);
2981 SHARD_ERR (
"<BROKER> APPL_SERVER_SHM_KEY_STR:[%d:%x]\n", as_shm_key, as_shm_key);
2984 if (shm_appl ==
NULL)
2995 if (shm_proxy_p ==
NULL)
3019 #if defined(WINDOWS) 3028 #if defined(WINDOWS) 3029 phandle = OpenProcess (SYNCHRONIZE,
FALSE, proxy_info_p->
pid);
3030 if (phandle ==
NULL)
3037 CloseHandle (phandle);
3040 if (kill (proxy_info_p->
pid, 0) < 0)
3043 if (kill (proxy_info_p->
pid, 0) < 0)
3055 proxy_info_p->
pid = new_pid;
3070 int tmp_num_busy_uts;
3076 tmp_num_busy_uts = 0;
3077 for (proxy_index = 0; proxy_index < shm_proxy_p->
num_proxy; proxy_index++)
3086 #if !defined(WINDOWS) 3091 #if !defined(WINDOWS) 3097 struct sockaddr_in proxy_sock_addr;
3099 SOCKET max_fd, client_fd;
3101 int ret, select_ret;
3114 select_ret = select (max_fd, &rset,
NULL,
NULL, &tv);
3115 if (select_ret == 0)
3119 else if (select_ret < 0)
3130 proxy_sock_addr_len =
sizeof (proxy_sock_addr);
3131 client_fd = accept (
proxy_sock_fd, (
struct sockaddr *) &proxy_sock_addr, &proxy_sock_addr_len);
3143 ret =
read_from_client (client_fd, ((
char *) &proxy_id),
sizeof (proxy_id));
3152 proxy_id =
htonl (proxy_id);
3182 char proxy_shm_id_env_str[32], proxy_id_env_str[32];
3184 #if !defined(WINDOWS) 3206 #if !defined(WINDOWS) 3207 signal (SIGCHLD, SIG_IGN);
3212 #if !defined(WINDOWS) 3218 signal (SIGCHLD, SIG_DFL);
3226 snprintf (proxy_shm_id_env_str,
sizeof (proxy_shm_id_env_str),
"%s=%d",
PROXY_SHM_KEY_STR,
3228 putenv (proxy_shm_id_env_str);
3230 snprintf (proxy_id_env_str,
sizeof (proxy_id_env_str),
"%s=%d",
PROXY_ID_ENV_STR, proxy_index);
3231 putenv (proxy_id_env_str);
3233 #if !defined(WINDOWS) 3234 if (snprintf (process_name,
sizeof (process_name) - 1,
"%s_%s_%d", shm_appl->
broker_name, proxy_exe_name,
3235 proxy_index + 1) < 0)
3242 #if defined(WINDOWS) 3243 pid = run_child (proxy_exe_name);
3245 execle (proxy_exe_name, process_name,
NULL,
environ);
3248 #if !defined(WINDOWS) 3273 #if defined(WINDOWS) 3282 proxy_info_p->
pid = 0;
3306 proxy_info_p->
pid = new_pid;
3320 ret = snprintf (log_filename,
BROKER_PATH_MAX - 1,
"%s%s_%d_%d_%d.sql.log", dirname, broker_name,
3325 ret = snprintf (log_filename,
BROKER_PATH_MAX - 1,
"%s%s_%d.sql.log", dirname, broker_name, as_index + 1);
3330 log_filename[0] =
'\0';
3344 ret = snprintf (log_filename,
BROKER_PATH_MAX - 1,
"%s%s_%d_%d_%d.slow.log", dirname, broker_name,
3349 ret = snprintf (log_filename,
BROKER_PATH_MAX - 1,
"%s%s_%d.slow.log", dirname, broker_name, as_index + 1);
3354 log_filename[0] =
'\0';
#define SLEEP_MILISEC(sec, msec)
int broker_init_proxy_conn(int max_proxy)
#define SOCKET_TIMEOUT_SEC
char access_log_file[CONF_LOG_FILE_LEN]
char database_name[SRV_CON_DBNAME_SIZE]
#define UW_ER_NO_MORE_MEMORY
#define UNUSABLE_DATABASE_MAX
char database_name[SRV_CON_DBNAME_SIZE]
int ut_set_keepalive(int sock)
DB_INFO * cfg_find_db(const char *db_name)
char * dirname(const char *path)
static SOCKET srv_sock_fd
int max_heap_delete(T_MAX_HEAP_NODE *max_heap, T_MAX_HEAP_NODE *ret)
static THREAD_FUNC proxy_monitor_thr_f(void *arg)
static int read_from_client_with_timeout(SOCKET sock_fd, char *buf, int size, int timeout_sec)
SOCKET broker_find_available_proxy(T_SHM_PROXY *shm_proxy_p)
#define SRV_CON_CLIENT_MAGIC_LEN
static THREAD_FUNC server_monitor_thr_f(void *arg)
static void send_error_to_driver(int sock, int error, char *driver_info)
T_MAX_HEAP_NODE job_queue[JOB_QUEUE_MAX_SIZE+1]
#define SRV_CON_CLIENT_INFO_SIZE
char proxy_log_dir[CONF_LOG_FILE_LEN]
static CSS_CONN_ENTRY * connect_to_master_for_server_monitor(const char *db_name, const char *db_host)
T_SHARD_INFO * shard_shm_find_shard_info(T_PROXY_INFO *proxy_info_p, int shard_id)
#define SRV_CON_CLIENT_MAGIC_STR
#define pthread_mutex_init(a, b)
static void proxy_monitor_worker(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
#define CAS_CONV_ERROR_TO_OLD(V)
#define CAS_PROTO_MAKE_VER(VER)
int broker_add_proxy_conn(SOCKET fd)
unsigned int htonl(unsigned int from)
char ip_addr[IP_ADDR_STR_LEN]
T_PROXY_INFO * shard_shm_find_proxy_info(T_SHM_PROXY *proxy_p, int proxy_id)
#define THREAD_BEGIN(THR_ID, FUNC, ARG)
char port_name[SHM_APPL_SERVER_NAME_MAX]
int parse_int(int *ret_p, const char *str_p, int base)
unsigned char cas_clt_ip[4]
void css_free_conn(CSS_CONN_ENTRY *conn)
bool ut_is_appl_server_ready(int pid, char *ready_flag)
T_BROKER_VERSION clt_version
static char log_filepath[BROKER_PATH_MAX]
char broker_name[BROKER_NAME_LEN]
static T_BROKER_INFO * br_info_p
#define pthread_mutex_unlock(a)
int broker_set_proxy_fds(fd_set *fds)
#define SRV_CON_MSG_IDX_MINOR_VER
#define SHARD_ERR(f, a...)
#define UW_SET_ERROR_CODE(code, os_errno)
#define CAS_LOG_RESET_REOPEN
#define MASTER_SHM_KEY_ENV_STR
#define CAS_PROTO_INDICATOR
#define SRV_CON_MSG_IDX_PROTO_VERSION
static T_SHM_PROXY * shm_proxy_p
static int stop_proxy_server(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
static THREAD_FUNC shard_dispatch_thr_f(void *arg)
static void restart_proxy_server(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
int unusable_databases_cnt[PAIR_LIST]
static void cas_monitor_worker(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index, int *busy_uts)
#define PROXY_LOG_RESET_REOPEN
static int broker_init_shm(void)
#define UTS_STATUS_RESTART
#define READ_FROM_SOCKET(fd, buf, size)
#define IS_SSL_CLIENT(driver_info)
#define APPL_SERVER_SHM_KEY_STR
SOCKET broker_get_proxy_conn_maxfd(SOCKET proxy_sock_fd)
int main(int argc, char *argv[])
static SOCKET connect_srv(char *br_name, int as_index)
#define SRV_CON_MSG_IDX_CLIENT_TYPE
static const char * cas_client_type_str[]
#define CON_STATUS_LOCK(AS_INFO, LOCK_OWNER)
static THREAD_FUNC dispatch_thr_f(void *arg)
unsigned char my_ip_addr[4]
#define CAS_MAKE_VER(MAJOR, MINOR, PATCH)
T_DB_SERVER unusable_databases[PAIR_LIST][UNUSABLE_DATABASE_MAX]
static pthread_mutex_t run_appl_mutex
int appl_server_hard_limit
static void check_proxy_access_log(T_PROXY_INFO *proxy_info_p)
char broker_name[BROKER_NAME_LEN]
static bool broker_add_new_cas(void)
#define SRV_CON_DB_INFO_SIZE
int appl_server_hard_limit
#define SRV_CON_DBNAME_SIZE
int er_init(const char *msglog_filename, int exit_ask)
#define UW_ER_CANT_CREATE_SOCKET
#define CON_STATUS_LOCK_BROKER
#define PORT_NUMBER_ENV_STR
static int current_dropping_as_index
#define PROXY_STATUS_RESTART
static void psize_check_worker(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
static int stop_appl_server(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
#define CAS_PROTO_UNPACK_NET_VER(VER)
char * get_cubrid_file(T_CUBRID_FILE_ID fid, char *buf, size_t len)
#define NUM_COLLECT_COUNT_PER_INTVL
static pthread_mutex_t clt_table_mutex
#define APPL_SERVER_CAS_ORACLE
char database_host[CUB_MAXHOSTNAMELEN]
static int find_drop_as_index(void)
unsigned short cas_clt_port
int ut_kill_as_process(int pid, char *broker_name, int as_index, int shard_flag)
#define CAS_MAKE_PROTO_VER(DRIVER_INFO)
int css_send_request(CSS_CONN_ENTRY *conn, int command, unsigned short *request_id, const char *arg_buffer, int arg_buffer_size)
static void shard_broker_process(void)
unsigned int unusable_databases_seq
#define UTS_STATUS_CON_WAIT
void er_final(ER_FINAL_CODE do_global_final)
#define CON_STATUS_LOCK_INIT(AS_INFO)
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
static pthread_cond_t clt_table_cond
INT64 num_connect_requests
static THREAD_FUNC receiver_thr_f(void *arg)
static int read_nbytes_from_client(SOCKET sock_fd, char *buf, int size)
static int read_from_client(SOCKET sock_fd, char *buf, int size)
int sysprm_load_and_init(const char *db_name, const char *conf_file, const int load_flags)
int prm_get_master_port_id(void)
#define SRV_CON_CLIENT_MAGIC_STR_SSL
#define IS_INVALID_SOCKET(socket)
int send_fd(int server_fd, int client_fd, int rid, char *driver_info)
static void restart_appl_server(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
static int write_to_client(SOCKET sock_fd, char *buf, int size)
time_t transaction_start_time
static void check_cas_log(char *br_name, T_APPL_SERVER_INFO *as_info_p, int as_index)
static void cleanup(int signo)
static SOCKET proxy_sock_fd
#define strncpy_bufsize(buf, str)
char acl_file[CONF_LOG_FILE_LEN]
static int init_env(void)
static THREAD_FUNC hang_check_thr_f(void *arg)
static struct sockaddr_in sock_addr
void util_free_string_array(char **array)
static struct sockaddr_un shard_sock_addr
char auto_add_appl_server
unsigned short htons(unsigned short from)
char script[PRE_SEND_SCRIPT_SIZE]
static int get_server_state_from_master(CSS_CONN_ENTRY *conn, const char *db_name)
#define WRITE_TO_SOCKET(fd, buf, size)
static T_SHM_APPL_SERVER * shm_appl
static int find_idle_cas(void)
#define HANG_COUNT_THRESHOLD_RATIO
#define SRV_CON_MSG_IDX_FUNCTION_FLAG
int monitor_hang_interval
char ** util_split_string(const char *str, const char *delim)
#define CON_STATUS_LOCK_DESTROY(AS_INFO)
#define SET_BROKER_ERR_CODE()
static THREAD_FUNC psize_check_thr_f(void *arg)
static T_SHM_BROKER * shm_br
T_PROXY_INFO * proxy_info_p
static void error(const char *msg)
char prg_name[PRE_SEND_PRG_NAME_SIZE]
int proxy_access_log_reset
static char run_appl_server_flag
int css_receive_data(CSS_CONN_ENTRY *conn, unsigned short req_id, char **buffer, int *buffer_size, int timeout)
static char database_name[MAX_HA_DBINFO_LENGTH]
static pthread_mutex_t broker_shm_mutex
static int insert_db_server_check_list(T_DB_SERVER *list_p, int check_list_cnt, const char *db_name, const char *db_host)
time_t claimed_alive_time
static int find_add_as_index(void)
unsigned short ntohs(unsigned short from)
char preferred_hosts[SHM_APPL_SERVER_NAME_MAX]
static char run_proxy_flag
#define free_and_init(ptr)
int broker_delete_proxy_conn_by_proxy_id(int proxy_id)
#define PROXY_SHM_KEY_STR
static void get_as_sql_log_filename(char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO *as_info_p, int as_index)
static void get_as_slow_log_filename(char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO *as_info_p, int as_index)
static pthread_mutex_t run_proxy_mutex
char name[BROKER_NAME_LEN]
#define CON_STATUS_UNLOCK(AS_INFO, LOCK_OWNER)
int broker_register_proxy_conn(SOCKET fd, int proxy_id)
int uw_acl_check(unsigned char *ip_addr)
#define PROXY_STATUS_START
SOCKET broker_get_readable_proxy_conn(fd_set *fds)
#define IS_NOT_APPL_SERVER_TYPE_CAS(x)
char appl_server_name[APPL_SERVER_NAME_MAX_SIZE]
char driver_version[SRV_CON_VER_STR_MAX_SIZE]
void ut_get_as_port_name(char *port_name, char *broker_name, int as_id, int len)
T_APPL_SERVER_INFO as_info[APPL_SERVER_NUM_LIMIT]
char port_name[SHM_PROXY_NAME_MAX]
static int init_proxy_env(void)
static THREAD_FUNC proxy_listener_thr_f(void *arg)
int uw_acl_make(char *acl_file)
#define SRV_CON_MSG_IDX_PATCH_VER
static int run_proxy_server(T_PROXY_INFO *proxy_info_p, int br_index, int proxy_index)
#define SRV_CON_MSG_IDX_MAJOR_VER
#define CAS_SEND_ERROR_CODE(FD, VAL)
static void check_proxy_log(char *br_name, T_PROXY_INFO *proxy_info_p)
#define pthread_mutex_lock(a)
void set_cubrid_file(T_CUBRID_FILE_ID fid, char *value)
unsigned int ntohl(unsigned int from)
#define APPL_SERVER_NAME_MAX_SIZE
static void proxy_check_worker(int br_index, T_PROXY_INFO *proxy_info_p)
void max_heap_incr_priority(T_MAX_HEAP_NODE *max_heap)
void * uw_shm_open(int shm_key, int which_shm, T_SHM_MODE shm_mode)
void ut_get_as_pid_name(char *pid_name, char *br_name, int as_index, int len)
char slow_log_dir[CONF_LOG_FILE_LEN]
int max_heap_insert(T_MAX_HEAP_NODE *max_heap, int max_heap_size, T_MAX_HEAP_NODE *item)
#define CUB_MAXHOSTNAMELEN
time_t claimed_alive_time
static bool broker_drop_one_cas_by_time_to_kill(void)
#define SET_BROKER_OK_CODE()
static THREAD_FUNC cas_monitor_thr_f(void *arg)
char log_dir[CONF_LOG_FILE_LEN]
static int write_to_client_with_timeout(SOCKET sock_fd, char *buf, int size, int timeout_sec)
CSS_CONN_ENTRY * css_connect_to_master_timeout(const char *host_name, int port_id, int timeout, unsigned short *rid)
int ut_kill_proxy_process(int pid, char *broker_name, int proxy_id)
void cfg_free_directory(DB_INFO *databases)
T_BROKER_VERSION clt_version
int broker_delete_proxy_conn_by_fd(SOCKET fd)
void broker_destroy_proxy_conn(void)
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
static int run_appl_server(T_APPL_SERVER_INFO *as_info_p, int br_index, int as_index)
float ut_get_avg_from_array(int array[], int size)
#define MONITOR_SERVER_INTERVAL