Skip to content

File broker.c

File List > broker > broker.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * broker.c -
 */

#ident "$Id$"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <fcntl.h>
#include <assert.h>

#if !defined(WINDOWS)
#include <pthread.h>
#include <unistd.h>
#include <sys/file.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/time.h>
#include <sys/un.h>
#else
#include  <io.h>
#endif

#ifdef BROKER_DEBUG
#include <sys/time.h>
#endif

#include "connection_defs.h"
#include "client_support.h"

#include "system_parameter.h"
#include "databases_file.h"
#include "util_func.h"

#include "cas_error.h"
#include "cas_common.h"
#include "broker_error.h"
#include "broker_env_def.h"
#include "broker_shm.h"
#include "broker_msg.h"
#include "broker_process_size.h"
#include "broker_util.h"
#include "broker_access_list.h"
#include "broker_filename.h"
#include "broker_er_html.h"
#include "broker_send_fd.h"
#include "error_manager.h"
#include "shard_shm.h"
#include "shard_metadata.h"
#include "broker_proxy_conn.h"
#include "dbtype_def.h"

#if defined(WINDOWS)
#include "broker_wsa_init.h"
#endif

#ifdef WIN_FW
#if !defined(WINDOWS)
#error DEFINE ERROR
#endif
#endif

#ifdef BROKER_RESTART_DEBUG
#define     PS_CHK_PERIOD       30
#else
#define     PS_CHK_PERIOD       600
#endif

#ifdef ASYNC_MODE
#ifdef HPUX10_2
#define     SELECT_MASK     int
#else
#define     SELECT_MASK     fd_set
#endif
#endif

#define     IP_ADDR_STR_LEN     20
#define     BUFFER_SIZE     ONE_K

#define     ENV_BUF_INIT_SIZE   512
#define     ALIGN_ENV_BUF_SIZE(X)   \
    ((((X) + ENV_BUF_INIT_SIZE) / ENV_BUF_INIT_SIZE) * ENV_BUF_INIT_SIZE)

#define         MONITOR_SERVER_INTERVAL 5

#ifdef BROKER_DEBUG
#define     BROKER_LOG(X)           \
        do {                \
          FILE      *fp;        \
          fp = fopen("broker.log", "a");    \
          if (fp) {         \
            struct timeval tv;      \
            gettimeofday(&tv, NULL);    \
            fprintf(fp, "%d %d.%06d %s\n", __LINE__, (int)tv.tv_sec, (int)tv.tv_usec, X);   \
            fclose(fp);         \
          }             \
        } while (0)
#define     BROKER_LOG_INT(X)           \
        do {                \
          FILE  *fp;            \
          struct timeval tv;        \
          gettimeofday(&tv, NULL);  \
          fp = fopen("broker.log", "a");        \
          if (fp) {         \
            fprintf(fp, "%d %d.%06d %s=%d\n", __LINE__, (int)tv.tv_sec, (int)tv.tv_usec, #X, X);            \
            fclose(fp);         \
          }             \
        } while (0)
#endif

#ifdef SESSION_LOG
#define     SESSION_LOG_WRITE(IP, SID, APPL, INDEX) \
        do {                \
          FILE  *fp;            \
          struct timeval tv;        \
          char  ip_str[64];     \
          gettimeofday(&tv, NULL);  \
          ip2str(IP, ip_str);       \
          fp = fopen("session.log", "a");   \
          if (fp) {         \
            fprintf(fp, "%d %-15s %d %d.%06d %s %d \n", br_index, ip_str, (int) SID, tv.tv_sec, tv.tv_usec, APPL, INDEX); \
            fclose(fp);         \
          }             \
        } while (0)
#endif

#ifdef _EDU_
#define     EDU_KEY "86999522480552846466422480899195252860256028745"
#endif

#define V3_WRITE_HEADER_OK_FILE_SOCK(sock_fd)   \
    do {                \
      char          buf[V3_RESPONSE_HEADER_SIZE];   \
      memset(buf, '\0', sizeof(buf));   \
      sprintf(buf, V3_HEADER_OK);   \
      write_to_client(sock_fd, buf, sizeof(buf));   \
    } while(0);

#define V3_WRITE_HEADER_ERR_SOCK(sockfd)        \
    do {                \
      char          buf[V3_RESPONSE_HEADER_SIZE];   \
      memset(buf, '\0', sizeof(buf));   \
      sprintf(buf, V3_HEADER_ERR);      \
      write_to_client(sockfd, buf, sizeof(buf)); \
    } while(0);

#define SET_BROKER_ERR_CODE()           \
    do {                    \
      if (shm_br && br_index >= 0) {    \
        shm_br->br_info[br_index].err_code = uw_get_error_code();   \
        shm_br->br_info[br_index].os_err_code = uw_get_os_error_code(); \
      }                 \
    } while (0)

#define SET_BROKER_OK_CODE()                \
    do {                        \
      if (shm_br && br_index >= 0) {        \
        shm_br->br_info[br_index].err_code = 0; \
      }                     \
    } while (0)

#define CAS_SEND_ERROR_CODE(FD, VAL)    \
    do {                \
      int   write_val;      \
      write_val = htonl(VAL);   \
      write_to_client(FD, (char*) &write_val, 4);   \
    } while (0)

#define JOB_COUNT_MAX       130000000

/* num of collecting counts per monitoring interval */
#define NUM_COLLECT_COUNT_PER_INTVL     4
#define HANG_COUNT_THRESHOLD_RATIO      0.5

#if defined(WINDOWS)
#define F_OK    0
#else
#define SOCKET_TIMEOUT_SEC  2
#endif

/* server state */
enum SERVER_STATE
{
  SERVER_STATE_UNKNOWN = 0,
  SERVER_STATE_DEAD = 1,
  SERVER_STATE_DEREGISTERED = 2,
  SERVER_STATE_STARTED = 3,
  SERVER_STATE_NOT_REGISTERED = 4,
  SERVER_STATE_REGISTERED = 5,
  SERVER_STATE_REGISTERED_AND_TO_BE_STANDBY = 6,
  SERVER_STATE_REGISTERED_AND_ACTIVE = 7,
  SERVER_STATE_REGISTERED_AND_TO_BE_ACTIVE = 8
};

typedef struct t_clt_table T_CLT_TABLE;
struct t_clt_table
{
  SOCKET clt_sock_fd;
  char ip_addr[IP_ADDR_STR_LEN];
};
static void shard_broker_process (void);
static void cleanup (int signo);
static int init_env (void);
#if !defined(WINDOWS)
static int init_proxy_env (void);
#endif /* !WINDOWS */
static int broker_init_shm (void);

static void cas_monitor_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index, int *busy_uts);
static void psize_check_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);

static void proxy_check_worker (int br_index, T_PROXY_INFO * proxy_info_p);

static void proxy_monitor_worker (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);

static THREAD_FUNC receiver_thr_f (void *arg);
static THREAD_FUNC dispatch_thr_f (void *arg);
static THREAD_FUNC shard_dispatch_thr_f (void *arg);
static THREAD_FUNC psize_check_thr_f (void *arg);
static THREAD_FUNC cas_monitor_thr_f (void *arg);
static THREAD_FUNC hang_check_thr_f (void *arg);
static THREAD_FUNC proxy_monitor_thr_f (void *arg);
#if !defined(WINDOWS)
static THREAD_FUNC proxy_listener_thr_f (void *arg);
#endif /* !WINDOWS */
static THREAD_FUNC server_monitor_thr_f (void *arg);

static int read_nbytes_from_client (SOCKET sock_fd, char *buf, int size);

#if defined(WIN_FW)
static THREAD_FUNC service_thr_f (void *arg);
static int process_cas_request (int cas_pid, int as_index, SOCKET clt_sock_fd, SOCKET srv_sock_fd);
static int read_from_cas_client (SOCKET sock_fd, char *buf, int size, int as_index, int cas_pid);
#endif

static int write_to_client (SOCKET sock_fd, char *buf, int size);
static int write_to_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec);
static int read_from_client (SOCKET sock_fd, char *buf, int size);
static int read_from_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec);
static int read_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout);
static int write_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout);
static int run_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
static int stop_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);
static void restart_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index);

static int run_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);
static int stop_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);
static void restart_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index);
static SOCKET connect_srv (char *br_name, int as_index);
static int find_idle_cas (void);
static int find_drop_as_index (void);
static int find_add_as_index (void);
static bool broker_add_new_cas (void);

static void check_cas_log (char *br_name, T_APPL_SERVER_INFO * as_info_p, int as_index);
static void check_proxy_log (char *br_name, T_PROXY_INFO * proxy_info_p);
static void check_proxy_access_log (T_PROXY_INFO * proxy_info_p);

static void get_as_sql_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p,
                     int as_index);
static void get_as_slow_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p,
                      int as_index);

static CSS_CONN_ENTRY *connect_to_master_for_server_monitor (const char *db_name, const char *db_host);
static int get_server_state_from_master (CSS_CONN_ENTRY * conn, const char *db_name);

static int insert_db_server_check_list (T_DB_SERVER * list_p, int check_list_cnt, const char *db_name,
                    const char *db_host);

#if defined(WINDOWS)
static int get_cputime_sec (int pid);
#endif

static SOCKET sock_fd;
static struct sockaddr_in sock_addr;
static int sock_addr_len;
#if defined(WINDOWS)
static struct sockaddr_in shard_sock_addr;
#else /* WINDOWS */
static SOCKET proxy_sock_fd;
static struct sockaddr_un shard_sock_addr;
#endif /* !WINDOWS */

static T_SHM_BROKER *shm_br = NULL;
static T_SHM_APPL_SERVER *shm_appl;
static T_BROKER_INFO *br_info_p = NULL;
static T_SHM_PROXY *shm_proxy_p = NULL;

static int br_index = -1;
static int br_shard_flag = OFF;

#if defined(WIN_FW)
static int num_thr;
#endif

static pthread_cond_t clt_table_cond;
static pthread_mutex_t clt_table_mutex;
static pthread_mutex_t run_appl_mutex;
static pthread_mutex_t broker_shm_mutex;
static pthread_mutex_t run_proxy_mutex;
static char run_proxy_flag = 0;

static char run_appl_server_flag = 0;

static int current_dropping_as_index = -1;

static int process_flag = 1;

static int num_busy_uts = 0;

static int max_open_fd = 128;

#if defined(WIN_FW)
static int last_job_fetch_time;
static time_t last_session_id = 0;
static T_MAX_HEAP_NODE *session_request_q;
#endif

static int hold_job = 0;

static bool
broker_add_new_cas (void)
{
  int cur_appl_server_num;
  int add_as_index;
  int pid;

  cur_appl_server_num = shm_br->br_info[br_index].appl_server_num;

  /* ADD UTS */
  if (cur_appl_server_num >= shm_br->br_info[br_index].appl_server_max_num)
    {
      return false;
    }

  add_as_index = find_add_as_index ();
  if (add_as_index < 0)
    {
      return false;
    }

  pid = run_appl_server (&(shm_appl->as_info[add_as_index]), br_index, add_as_index);
  if (pid <= 0)
    {
      return false;
    }

  pthread_mutex_lock (&broker_shm_mutex);
  shm_appl->as_info[add_as_index].pid = pid;
  shm_appl->as_info[add_as_index].psize = getsize (pid);
  shm_appl->as_info[add_as_index].psize_time = time (NULL);
  shm_appl->as_info[add_as_index].uts_status = UTS_STATUS_IDLE;
  shm_appl->as_info[add_as_index].service_flag = SERVICE_ON;
  shm_appl->as_info[add_as_index].reset_flag = FALSE;

  memset (&shm_appl->as_info[add_as_index].cas_clt_ip[0], 0x0, sizeof (shm_appl->as_info[add_as_index].cas_clt_ip));
  shm_appl->as_info[add_as_index].cas_clt_port = 0;
  shm_appl->as_info[add_as_index].driver_version[0] = '\0';

  (shm_br->br_info[br_index].appl_server_num)++;
  (shm_appl->num_appl_server)++;
  pthread_mutex_unlock (&broker_shm_mutex);

  return true;
}

static bool
broker_drop_one_cas_by_time_to_kill (void)
{
  int cur_appl_server_num, wait_job_cnt;
  int drop_as_index;
  T_APPL_SERVER_INFO *drop_as_info;

  /* DROP UTS */
  cur_appl_server_num = shm_br->br_info[br_index].appl_server_num;
  wait_job_cnt = shm_appl->job_queue[0].id + hold_job;
  wait_job_cnt -= (cur_appl_server_num - num_busy_uts);

  if (cur_appl_server_num <= shm_br->br_info[br_index].appl_server_min_num || wait_job_cnt > 0)
    {
      return false;
    }

  drop_as_index = find_drop_as_index ();
  if (drop_as_index < 0)
    {
      return false;
    }

  pthread_mutex_lock (&broker_shm_mutex);
  current_dropping_as_index = drop_as_index;
  drop_as_info = &shm_appl->as_info[drop_as_index];
  drop_as_info->service_flag = SERVICE_OFF_ACK;
  pthread_mutex_unlock (&broker_shm_mutex);

  CON_STATUS_LOCK (drop_as_info, CON_STATUS_LOCK_BROKER);
  if (drop_as_info->uts_status == UTS_STATUS_IDLE)
    {
      /* do nothing */
    }
  else if (drop_as_info->cur_keep_con == KEEP_CON_AUTO && drop_as_info->uts_status == UTS_STATUS_BUSY
       && drop_as_info->con_status == CON_STATUS_OUT_TRAN
       && time (NULL) - drop_as_info->last_access_time > shm_br->br_info[br_index].time_to_kill)
    {
      drop_as_info->con_status = CON_STATUS_CLOSE;
    }
  else
    {
      drop_as_info->service_flag = SERVICE_ON;
      drop_as_index = -1;
    }
  CON_STATUS_UNLOCK (drop_as_info, CON_STATUS_LOCK_BROKER);

  if (drop_as_index >= 0)
    {
      pthread_mutex_lock (&broker_shm_mutex);
      (shm_br->br_info[br_index].appl_server_num)--;
      (shm_appl->num_appl_server)--;
      pthread_mutex_unlock (&broker_shm_mutex);

      stop_appl_server (drop_as_info, br_index, drop_as_index);
    }
  current_dropping_as_index = -1;

  return true;
}

#if defined(WINDOWS)
int WINAPI
WinMain (HINSTANCE hInstance,   // handle to current instance
     HINSTANCE hPrevInstance,   // handle to previous instance
     LPSTR lpCmdLine,   // pointer to command line
     int nShowCmd       // show state of window
  )
#else
int
main (int argc, char *argv[])
#endif
{
  pthread_t receiver_thread;
  pthread_t dispatch_thread;
  pthread_t cas_monitor_thread;
  pthread_t psize_check_thread;
  pthread_t hang_check_thread;
  pthread_t server_monitor_thread;
  pthread_t proxy_monitor_thread;
#if !defined(WINDOWS)
  pthread_t proxy_listener_thread;
#endif /* !WINDOWS */

#if defined(WIN_FW)
  pthread_t service_thread;
  int *thr_index;
  int i;
#endif
  int error;

  error = broker_init_shm ();
  if (error)
    {
      goto error1;
    }

  br_shard_flag = br_info_p->shard_flag;

  signal (SIGTERM, cleanup);
  signal (SIGINT, cleanup);
#if !defined(WINDOWS)
  signal (SIGCHLD, SIG_IGN);
  signal (SIGPIPE, SIG_IGN);
#endif

  pthread_cond_init (&clt_table_cond, NULL);
  pthread_mutex_init (&clt_table_mutex, NULL);
  pthread_mutex_init (&run_appl_mutex, NULL);
  pthread_mutex_init (&broker_shm_mutex, NULL);
  if (br_shard_flag == ON)
    {
      pthread_mutex_init (&run_proxy_mutex, NULL);
    }

#if defined(WINDOWS)
  if (wsa_initialize () < 0)
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_CREATE_SOCKET, 0);
      goto error1;
    }
#endif /* WINDOWS */

  if (br_shard_flag == OFF && uw_acl_make (shm_br->br_info[br_index].acl_file) < 0)
    {
      goto error1;
    }

  if (init_env () == -1)
    {
      goto error1;
    }

  if (br_shard_flag == ON)
    {
#if !defined(WINDOWS)
      if (init_proxy_env () == -1)
    {
      goto error1;
    }

      if (broker_init_proxy_conn (br_info_p->num_proxy) < 0)
    {
      goto error1;
    }
#endif /* !WINDOWS */
    }
  else
    {
#if defined(WIN_FW)
      num_thr = shm_br->br_info[br_index].appl_server_max_num;

      thr_index = (int *) malloc (sizeof (int) * num_thr);
      if (thr_index == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_NO_MORE_MEMORY, 0);
      goto error1;
    }

      /* initialize session request queue. queue size is 1 */
      session_request_q = (T_MAX_HEAP_NODE *) malloc (sizeof (T_MAX_HEAP_NODE) * num_thr);
      if (session_request_q == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_NO_MORE_MEMORY, 0);
      goto error1;
    }
      for (i = 0; i < num_thr; i++)
    {
      session_request_q[i].clt_sock_fd = INVALID_SOCKET;
    }
#endif
    }

  set_cubrid_file (FID_SQL_LOG_DIR, shm_appl->log_dir);
  set_cubrid_file (FID_SLOW_LOG_DIR, shm_appl->slow_log_dir);

  while (shm_br->br_info[br_index].ready_to_service != true)
    {
      SLEEP_MILISEC (0, 200);
    }

  THREAD_BEGIN (receiver_thread, receiver_thr_f, NULL);

  if (br_shard_flag == ON)
    {
      THREAD_BEGIN (dispatch_thread, shard_dispatch_thr_f, NULL);
    }
  else
    {
      THREAD_BEGIN (dispatch_thread, dispatch_thr_f, NULL);
    }
  THREAD_BEGIN (psize_check_thread, psize_check_thr_f, NULL);
  THREAD_BEGIN (cas_monitor_thread, cas_monitor_thr_f, NULL);
  THREAD_BEGIN (server_monitor_thread, server_monitor_thr_f, NULL);

  if (br_shard_flag == ON)
    {
      THREAD_BEGIN (proxy_monitor_thread, proxy_monitor_thr_f, NULL);
#if !defined(WINDOWS)
      THREAD_BEGIN (proxy_listener_thread, proxy_listener_thr_f, NULL);
#endif /* !WINDOWS */
    }

  if (shm_br->br_info[br_index].monitor_hang_flag)
    {
      THREAD_BEGIN (hang_check_thread, hang_check_thr_f, NULL);
    }

  if (br_shard_flag == ON)
    {
      br_info_p->err_code = 0;  /* DO NOT DELETE!!! : reset error code */

      shard_broker_process ();
    }
  else
    {
#if defined(WIN_FW)
      for (i = 0; i < num_thr; i++)
    {
      thr_index[i] = i;
      THREAD_BEGIN (service_thread, service_thr_f, thr_index + i);
      shm_appl->as_info[i].last_access_time = time (NULL);
      shm_appl->as_info[i].transaction_start_time = (time_t) 0;
      if (i < shm_br->br_info[br_index].appl_server_min_num)
        {
          shm_appl->as_info[i].service_flag = SERVICE_ON;
        }
      else
        {
          shm_appl->as_info[i].service_flag = SERVICE_OFF_ACK;
        }
    }
#endif

      SET_BROKER_OK_CODE ();

      while (process_flag)
    {
      SLEEP_MILISEC (0, 100);

      if (shm_br->br_info[br_index].auto_add_appl_server == OFF)
        {
          continue;
        }

      broker_drop_one_cas_by_time_to_kill ();
    }           /* end of while (process_flag) */
    }               /* end of if (SHARD == OFF) */

error1:
  if (br_shard_flag == ON)
    {
#if !defined(WINDOWS)
      broker_destroy_proxy_conn ();
#endif /* !WINDOWS */
    }

  SET_BROKER_ERR_CODE ();
  return -1;
}

static void
shard_broker_process (void)
{
  int proxy_index, shard_index, i;
  T_PROXY_INFO *proxy_info_p;
  T_SHARD_INFO *shard_info_p;
  T_APPL_SERVER_INFO *as_info_p;

  while (process_flag)
    {
      SLEEP_MILISEC (0, 100);

      if (shm_br->br_info[br_index].auto_add_appl_server == OFF)
    {
      continue;
    }

      /* ADD UTS */
      for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
    {
      proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);

      for (shard_index = 0; shard_index < proxy_info_p->num_shard_conn; shard_index++)
        {
          shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_index);

          if ((shard_info_p->waiter_count > 0) && (shard_info_p->num_appl_server < shard_info_p->max_appl_server))
        {
          for (i = shard_info_p->min_appl_server; i < shard_info_p->max_appl_server; i++)
            {
              as_info_p = &(shm_appl->as_info[i + shard_info_p->as_info_index_base]);
              if (as_info_p->service_flag == SERVICE_OFF)
            {
              int pid;

              as_info_p->uts_status = UTS_STATUS_START;
              as_info_p->cur_sql_log_mode = shm_appl->sql_log_mode;
              as_info_p->cur_slow_log_mode = shm_appl->slow_log_mode;

              pid = run_appl_server (as_info_p, br_index, i + shard_info_p->as_info_index_base);
              if (pid > 0)
                {
                  as_info_p->pid = pid;
                  as_info_p->psize = getsize (pid);
                  as_info_p->psize_time = time (NULL);
                  as_info_p->service_flag = SERVICE_ON;
                  as_info_p->reset_flag = FALSE;

                  (shm_br->br_info[br_index].appl_server_num)++;
                  (shard_info_p->num_appl_server)++;
                  (shm_appl->num_appl_server)++;
                }
              break;
            }
            }
        }
        }
    }
    }
  return;
}

static void
cleanup (int signo)
{
  signal (signo, SIG_IGN);

  process_flag = 0;
#ifdef SOLARIS
  SLEEP_MILISEC (1, 0);
#endif
  CLOSE_SOCKET (sock_fd);
  if (br_shard_flag == ON)
    {
#if !defined(WINDOWS)
      CLOSE_SOCKET (proxy_sock_fd);
#endif /* !WINDOWS */
    }
  exit (0);
}

static bool
di_understand_renewed_error_code (const char *driver_info)
{
#define IS_SET_BIT(C,B) (((C) & (B)) == (B))
  if (!IS_SET_BIT (driver_info[SRV_CON_MSG_IDX_PROTO_VERSION], CAS_PROTO_INDICATOR))
    {
      return false;
    }

  return IS_SET_BIT (driver_info[DRIVER_INFO_FUNCTION_FLAG], BROKER_RENEWED_ERROR_CODE);
}

static void
send_error_to_driver (int sock, int error, char *driver_info)
{
  int write_val;
  int driver_version;

  driver_version = CAS_MAKE_PROTO_VER (driver_info);

  if (error == NO_ERROR)
    {
      write_val = 0;
    }
  else
    {
      if (driver_version == CAS_PROTO_MAKE_VER (PROTOCOL_V2) || di_understand_renewed_error_code (driver_info))
    {
      write_val = htonl (error);
    }
      else
    {
      write_val = htonl (CAS_CONV_ERROR_TO_OLD (error));
    }
    }

  write_to_client (sock, (char *) &write_val, sizeof (int));
}

static const char *cas_client_type_str[] = {
  "UNKNOWN",            /* CAS_CLIENT_NONE */
  "CCI",            /* CAS_CLIENT_CCI */
  "ODBC",           /* CAS_CLIENT_ODBC */
  "JDBC",           /* CAS_CLIENT_JDBC */
  "PHP",            /* CAS_CLIENT_PHP */
  "OLEDB",          /* CAS_CLIENT_OLEDB */
  "INTERNAL_JDBC",      /* CAS_CLIENT_SERVER_SIDE_JDBC */
  "GATEWAY_CCI"         /* CAS_CLIENT_GATEWAY */
};

static THREAD_FUNC
receiver_thr_f (void *arg)
{
  T_SOCKLEN clt_sock_addr_len;
  struct sockaddr_in clt_sock_addr;
  SOCKET clt_sock_fd;
  int job_queue_size;
  T_MAX_HEAP_NODE *job_queue;
  T_MAX_HEAP_NODE new_job;
  int job_count;
  int read_len;
  int one = 1;
  char cas_req_header[SRV_CON_CLIENT_INFO_SIZE];
  char cas_client_type;
  char driver_version;
  T_BROKER_VERSION client_version;
#if defined(LINUX)
  int timeout;
#endif /* LINUX */

  job_queue_size = shm_appl->job_queue_size;
  job_queue = shm_appl->job_queue;
  job_count = 1;

#if !defined(WINDOWS)
  signal (SIGPIPE, SIG_IGN);
#endif

#if defined(LINUX)
  timeout = 5;
  setsockopt (sock_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, (char *) &timeout, sizeof (timeout));
#endif /* LINUX */

  while (process_flag)
    {
      clt_sock_addr_len = sizeof (clt_sock_addr);
      clt_sock_fd = accept (sock_fd, (struct sockaddr *) &clt_sock_addr, &clt_sock_addr_len);
      if (IS_INVALID_SOCKET (clt_sock_fd))
    {
      continue;
    }

      if (shm_br->br_info[br_index].monitor_hang_flag && shm_br->br_info[br_index].reject_client_flag)
    {
      shm_br->br_info[br_index].reject_client_count++;
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

#if !defined(WINDOWS) && defined(ASYNC_MODE)
      if (fcntl (clt_sock_fd, F_SETFL, FNDELAY) < 0)
    {
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }
#endif

      setsockopt (clt_sock_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof (one));
      ut_set_keepalive (clt_sock_fd);

      cas_client_type = CAS_CLIENT_NONE;

      /* read header */
      read_len = read_nbytes_from_client (clt_sock_fd, cas_req_header, SRV_CON_CLIENT_INFO_SIZE);
      if (read_len < 0)
    {
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

      if (strncmp (cas_req_header, "PING", 4) == 0)
    {
      int ret_code = 0;
      CAS_SEND_ERROR_CODE (clt_sock_fd, ret_code);
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

      if (strncmp (cas_req_header, "ST", 2) == 0)
    {
      int status = FN_STATUS_NONE;
      int pid, i;
      unsigned int session_id;

      memcpy ((char *) &pid, cas_req_header + 2, 4);
      pid = ntohl (pid);
      memcpy ((char *) &session_id, cas_req_header + 6, 4);
      session_id = ntohl (session_id);

      if (shm_br->br_info[br_index].shard_flag == ON)
        {
          status = FN_STATUS_BUSY;
        }
      else
        {
          for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
        {
          if (shm_appl->as_info[i].service_flag == SERVICE_ON && shm_appl->as_info[i].pid == pid)
            {
              if (session_id == shm_appl->as_info[i].session_id)
            {
              status = shm_appl->as_info[i].fn_status;
            }
              break;
            }
        }
        }

      CAS_SEND_ERROR_CODE (clt_sock_fd, status);
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

      /*
       * Query cancel message (size in bytes)
       *
       * - For client version 8.4.0 patch 1 or below:
       *   |COMMAND("CANCEL",6)|PID(4)|
       *
       * - For CAS protocol version 1 or above:
       *   |COMMAND("QC",2)|PID(4)|CLIENT_PORT(2)|RESERVED(2)|
       *
       *   CLIENT_PORT can be 0 if the client failed to get its local port.
       */
      else if (strncmp (cas_req_header, "QC", 2) == 0 || strncmp (cas_req_header, "CANCEL", 6) == 0
           || strncmp (cas_req_header, "X1", 2) == 0)
    {
      int ret_code = 0;
#if !defined(WINDOWS)
      int pid, i;
      unsigned short client_port = 0;
#endif

#if !defined(WINDOWS)
      if (cas_req_header[0] == 'Q')
        {
          memcpy ((char *) &pid, cas_req_header + 2, 4);
          memcpy ((char *) &client_port, cas_req_header + 6, 2);
          pid = ntohl (pid);
          client_port = ntohs (client_port);
        }
      else
        {
          memcpy ((char *) &pid, cas_req_header + 6, 4);
          pid = ntohl (pid);
        }

      ret_code = CAS_ER_QUERY_CANCEL;
      if (shm_br->br_info[br_index].shard_flag == OFF)
        {

          for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
        {
          if (shm_appl->as_info[i].service_flag == SERVICE_ON && shm_appl->as_info[i].pid == pid
              && shm_appl->as_info[i].uts_status == UTS_STATUS_BUSY)
            {
              if (cas_req_header[0] == 'Q' && client_port > 0
              && shm_appl->as_info[i].cas_clt_port != client_port
              && memcmp (&shm_appl->as_info[i].cas_clt_ip, &clt_sock_addr.sin_addr, 4) != 0)
            {
              continue;
            }

              ret_code = 0;
              kill (pid, SIGUSR1);
              break;
            }
        }
        }
      else
        {
          /* SHARD TODO : not implemented yet */
        }
#endif
      if (cas_req_header[0] == 'X')
        {
          char driver_info[SRV_CON_CLIENT_INFO_SIZE];

          driver_info[SRV_CON_MSG_IDX_PROTO_VERSION] = cas_req_header[2];
          driver_info[SRV_CON_MSG_IDX_FUNCTION_FLAG] = cas_req_header[3];
          send_error_to_driver (clt_sock_fd, ret_code, driver_info);
        }
      else
        {
          ret_code = CAS_CONV_ERROR_TO_OLD (ret_code);
          CAS_SEND_ERROR_CODE (clt_sock_fd, ret_code);
        }
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

      cas_client_type = cas_req_header[SRV_CON_MSG_IDX_CLIENT_TYPE];
      if (!(strncmp (cas_req_header, SRV_CON_CLIENT_MAGIC_STR, SRV_CON_CLIENT_MAGIC_LEN) == 0
        || strncmp (cas_req_header, SRV_CON_CLIENT_MAGIC_STR_SSL, SRV_CON_CLIENT_MAGIC_LEN) == 0)
      || cas_client_type < CAS_CLIENT_TYPE_MIN || cas_client_type > CAS_CLIENT_TYPE_MAX)
    {
      send_error_to_driver (clt_sock_fd, CAS_ER_NOT_AUTHORIZED_CLIENT, cas_req_header);
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

      if ((IS_SSL_CLIENT (cas_req_header) && shm_br->br_info[br_index].use_SSL == OFF)
      || (!IS_SSL_CLIENT (cas_req_header) && shm_br->br_info[br_index].use_SSL == ON))
    {
      send_error_to_driver (clt_sock_fd, CAS_ER_SSL_TYPE_NOT_ALLOWED, cas_req_header);
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

      driver_version = cas_req_header[SRV_CON_MSG_IDX_PROTO_VERSION];
      if (driver_version & CAS_PROTO_INDICATOR)
    {
      /* Protocol version */
      client_version = CAS_PROTO_UNPACK_NET_VER (driver_version);
    }
      else
    {
      /* Build version; major, minor, and patch */
      client_version =
        CAS_MAKE_VER (cas_req_header[SRV_CON_MSG_IDX_MAJOR_VER], cas_req_header[SRV_CON_MSG_IDX_MINOR_VER],
              cas_req_header[SRV_CON_MSG_IDX_PATCH_VER]);
    }

      if (br_shard_flag == ON)
    {
      /* SHARD ONLY SUPPORT client_version.8.2.0 ~ */
      if (client_version < CAS_MAKE_VER (8, 2, 0))
        {
          CAS_SEND_ERROR_CODE (clt_sock_fd, CAS_ER_COMMUNICATION);
          CLOSE_SOCKET (clt_sock_fd);
          continue;
        }
    }

      if (v3_acl != NULL)
    {
      unsigned char ip_addr[4];

      memcpy (ip_addr, &(clt_sock_addr.sin_addr), 4);

      if (uw_acl_check (ip_addr) < 0)
        {
          send_error_to_driver (clt_sock_fd, CAS_ER_NOT_AUTHORIZED_CLIENT, cas_req_header);
          CLOSE_SOCKET (clt_sock_fd);
          continue;
        }
    }

      if (job_queue[0].id == job_queue_size)
    {
      send_error_to_driver (clt_sock_fd, CAS_ER_FREE_SERVER, cas_req_header);
      CLOSE_SOCKET (clt_sock_fd);
      continue;
    }

      if (max_open_fd < clt_sock_fd)
    {
      max_open_fd = clt_sock_fd;
    }

      job_count = (job_count >= JOB_COUNT_MAX) ? 1 : job_count + 1;
      new_job.id = job_count;
      new_job.clt_sock_fd = clt_sock_fd;
      new_job.recv_time = time (NULL);
      new_job.priority = 0;
      new_job.script[0] = '\0';
      new_job.cas_client_type = cas_client_type;
      new_job.port = ntohs (clt_sock_addr.sin_port);
      memcpy (new_job.ip_addr, &(clt_sock_addr.sin_addr), 4);
      strcpy (new_job.prg_name, cas_client_type_str[(int) cas_client_type]);
      new_job.clt_version = client_version;
      memcpy (new_job.driver_info, cas_req_header, SRV_CON_CLIENT_INFO_SIZE);

      while (1)
    {
      pthread_mutex_lock (&clt_table_mutex);
      if (max_heap_insert (job_queue, job_queue_size, &new_job) < 0)
        {
          pthread_mutex_unlock (&clt_table_mutex);
          SLEEP_MILISEC (0, 100);
        }
      else
        {
          pthread_cond_signal (&clt_table_cond);
          pthread_mutex_unlock (&clt_table_mutex);
          break;
        }
    }
    }

#if defined(WINDOWS)
  return;
#else
  return NULL;
#endif
}

static THREAD_FUNC
shard_dispatch_thr_f (void *arg)
{
  T_MAX_HEAP_NODE *job_queue;
  T_MAX_HEAP_NODE cur_job;
  int ip_addr;
#if defined(WINDOWS)
  int proxy_port;
#else
  SOCKET proxy_fd;
  int proxy_status;
  int ret_val;
#endif

  job_queue = shm_appl->job_queue;

  while (process_flag)
    {
      pthread_mutex_lock (&clt_table_mutex);
      if (max_heap_delete (job_queue, &cur_job) < 0)
    {
      pthread_mutex_unlock (&clt_table_mutex);
      SLEEP_MILISEC (0, 30);
      continue;
    }
      pthread_mutex_unlock (&clt_table_mutex);

      max_heap_incr_priority (job_queue);

#if defined(WINDOWS)
      memcpy (&ip_addr, cur_job.ip_addr, 4);
      proxy_port = broker_find_available_proxy (shm_proxy_p, ip_addr, cur_job.clt_version);

      CAS_SEND_ERROR_CODE (cur_job.clt_sock_fd, proxy_port);
      CLOSE_SOCKET (cur_job.clt_sock_fd);
#else /* WINDOWS */
      proxy_fd = broker_find_available_proxy (shm_proxy_p);
      if (proxy_fd != INVALID_SOCKET)
    {
      memcpy (&ip_addr, cur_job.ip_addr, 4);
      ret_val = send_fd (proxy_fd, cur_job.clt_sock_fd, ip_addr, cur_job.driver_info);
      if (ret_val > 0)
        {
          ret_val =
        read_from_client_with_timeout (proxy_fd, (char *) &proxy_status, sizeof (int), SOCKET_TIMEOUT_SEC);
          if (ret_val < 0)
        {
          broker_delete_proxy_conn_by_fd (proxy_fd);
          CLOSE_SOCKET (proxy_fd);
          send_error_to_driver (cur_job.clt_sock_fd, CAS_ER_FREE_SERVER, cur_job.driver_info);
        }
        }
      else
        {
          broker_delete_proxy_conn_by_fd (proxy_fd);
          CLOSE_SOCKET (proxy_fd);

          send_error_to_driver (cur_job.clt_sock_fd, CAS_ER_FREE_SERVER, cur_job.driver_info);
        }
    }
      else
    {
      send_error_to_driver (cur_job.clt_sock_fd, CAS_ER_FREE_SERVER, cur_job.driver_info);
    }

      CLOSE_SOCKET (cur_job.clt_sock_fd);
#endif /* !WINDOWS */
    }
#if defined(WINDOWS)
  return;
#else
  return NULL;
#endif
}

static THREAD_FUNC
dispatch_thr_f (void *arg)
{
  T_MAX_HEAP_NODE *job_queue;
  T_MAX_HEAP_NODE cur_job;
#if !defined(WINDOWS)
  SOCKET srv_sock_fd;
#endif /* !WINDOWS */

  int as_index, i;

  job_queue = shm_appl->job_queue;

  while (process_flag)
    {
      for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
    {
      if (shm_appl->as_info[i].service_flag == SERVICE_OFF)
        {
          if (shm_appl->as_info[i].uts_status == UTS_STATUS_IDLE)
        shm_appl->as_info[i].service_flag = SERVICE_OFF_ACK;
        }
    }

      pthread_mutex_lock (&clt_table_mutex);
      if (max_heap_delete (job_queue, &cur_job) < 0)
    {
      struct timespec ts;
      struct timeval tv;
      int r;

      gettimeofday (&tv, NULL);
      ts.tv_sec = tv.tv_sec;
      ts.tv_nsec = (tv.tv_usec + 30000) * 1000;
      if (ts.tv_nsec > 1000000000)
        {
          ts.tv_sec += 1;
          ts.tv_nsec -= 1000000000;
        }
      r = pthread_cond_timedwait (&clt_table_cond, &clt_table_mutex, &ts);
      if (r != 0)
        {
          pthread_mutex_unlock (&clt_table_mutex);
          continue;
        }
      r = max_heap_delete (job_queue, &cur_job);
      assert (r == 0);
    }

      hold_job = 1;
      max_heap_incr_priority (job_queue);
      pthread_mutex_unlock (&clt_table_mutex);

#if !defined (WINDOWS)
    retry:
#endif
      while (1)
    {
      as_index = find_idle_cas ();
      if (as_index < 0)
        {
          if (broker_add_new_cas ())
        {
          continue;
        }
          else
        {
          SLEEP_MILISEC (0, 30);
        }
        }
      else
        {
          break;
        }
    }

      hold_job = 0;

      shm_appl->as_info[as_index].num_connect_requests++;
#if !defined(WIN_FW)
      shm_appl->as_info[as_index].clt_version = cur_job.clt_version;
      memcpy (shm_appl->as_info[as_index].driver_info, cur_job.driver_info, SRV_CON_CLIENT_INFO_SIZE);
      shm_appl->as_info[as_index].cas_client_type = cur_job.cas_client_type;
      memcpy (shm_appl->as_info[as_index].cas_clt_ip, cur_job.ip_addr, 4);
      shm_appl->as_info[as_index].cas_clt_port = cur_job.port;
#if defined(WINDOWS)
      shm_appl->as_info[as_index].uts_status = UTS_STATUS_BUSY_WAIT;
      CAS_SEND_ERROR_CODE (cur_job.clt_sock_fd, shm_appl->as_info[as_index].as_port);
      CLOSE_SOCKET (cur_job.clt_sock_fd);
      shm_appl->as_info[as_index].num_request++;
      shm_appl->as_info[as_index].last_access_time = time (NULL);
      shm_appl->as_info[as_index].transaction_start_time = (time_t) 0;
#else /* WINDOWS */

      srv_sock_fd = connect_srv (shm_br->br_info[br_index].name, as_index);

      if (!IS_INVALID_SOCKET (srv_sock_fd))
    {
      int ip_addr;
      int ret_val;
      int con_status, uts_status;

      con_status = htonl (shm_appl->as_info[as_index].con_status);

      ret_val = write_to_client_with_timeout (srv_sock_fd, (char *) &con_status, sizeof (int), SOCKET_TIMEOUT_SEC);
      if (ret_val != sizeof (int))
        {
          CLOSE_SOCKET (srv_sock_fd);
          goto retry;
        }

      ret_val = read_from_client_with_timeout (srv_sock_fd, (char *) &con_status, sizeof (int), SOCKET_TIMEOUT_SEC);
      if (ret_val != sizeof (int) || ntohl (con_status) != CON_STATUS_IN_TRAN)
        {
          CLOSE_SOCKET (srv_sock_fd);
          goto retry;
        }

      memcpy (&ip_addr, cur_job.ip_addr, 4);
      ret_val = send_fd (srv_sock_fd, cur_job.clt_sock_fd, ip_addr, cur_job.driver_info);
      if (ret_val > 0)
        {
          ret_val =
        read_from_client_with_timeout (srv_sock_fd, (char *) &uts_status, sizeof (int), SOCKET_TIMEOUT_SEC);
        }
      CLOSE_SOCKET (srv_sock_fd);

      if (ret_val < 0)
        {
          send_error_to_driver (cur_job.clt_sock_fd, CAS_ER_FREE_SERVER, cur_job.driver_info);
        }
      else
        {
          shm_appl->as_info[as_index].num_request++;
        }
    }
      else
    {
      goto retry;
    }

      CLOSE_SOCKET (cur_job.clt_sock_fd);
#endif /* ifdef !WINDOWS */
#else /* !WIN_FW */
      session_request_q[as_index] = cur_job;
#endif /* WIN_FW */
    }

#if defined(WINDOWS)
  return;
#else
  return NULL;
#endif
}

#if defined(WIN_FW)
static THREAD_FUNC
service_thr_f (void *arg)
{
  int self_index = *((int *) arg);
  SOCKET clt_sock_fd, srv_sock_fd;
  int ip_addr;
  int cas_pid;
  T_MAX_HEAP_NODE cur_job;

  while (process_flag)
    {
      if (!IS_INVALID_SOCKET (session_request_q[self_index].clt_sock_fd))
    {
      cur_job = session_request_q[self_index];
      session_request_q[self_index].clt_sock_fd = INVALID_SOCKET;
    }
      else
    {
      SLEEP_MILISEC (0, 10);
      continue;
    }

      clt_sock_fd = cur_job.clt_sock_fd;
      memcpy (&ip_addr, cur_job.ip_addr, 4);

      shm_appl->as_info[self_index].clt_major_version = cur_job.clt_major_version;
      shm_appl->as_info[self_index].clt_minor_version = cur_job.clt_minor_version;
      shm_appl->as_info[self_index].clt_patch_version = cur_job.clt_patch_version;
      shm_appl->as_info[self_index].cas_client_type = cur_job.cas_client_type;
      shm_appl->as_info[self_index].close_flag = 0;
      cas_pid = shm_appl->as_info[self_index].pid;

      srv_sock_fd = connect_srv (shm_br->br_info[br_index].name, self_index);
      if (IS_INVALID_SOCKET (srv_sock_fd))
    {
      send_error_to_driver (clt_sock_fd, CAS_ER_FREE_SERVER, cur_job.driver_info);
      shm_appl->as_info[self_index].uts_status = UTS_STATUS_IDLE;
      CLOSE_SOCKET (cur_job.clt_sock_fd);
      continue;
    }
      else
    {
      CAS_SEND_ERROR_CODE (clt_sock_fd, 0);
      shm_appl->as_info[self_index].num_request++;
      shm_appl->as_info[self_index].last_access_time = time (NULL);
      shm_appl->as_info[self_index].transaction_start_time = (time_t) 0;
    }

      process_cas_request (cas_pid, self_index, clt_sock_fd, srv_sock_fd, cur_job.clt_major_version);

      CLOSE_SOCKET (clt_sock_fd);
      CLOSE_SOCKET (srv_sock_fd);
    }

  return;
}
#endif

static int
init_env (void)
{
  char *port;
  int n;
  int one = 1;

  /* get a Unix stream socket */
  sock_fd = socket (AF_INET, SOCK_STREAM, 0);
  if (IS_INVALID_SOCKET (sock_fd))
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_CREATE_SOCKET, errno);
      return (-1);
    }
  if ((setsockopt (sock_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) < 0)
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_CREATE_SOCKET, errno);
      return (-1);
    }
  if ((port = getenv (PORT_NUMBER_ENV_STR)) == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_CREATE_SOCKET, 0);
      return (-1);
    }
  memset (&sock_addr, 0, sizeof (struct sockaddr_in));
  sock_addr.sin_family = AF_INET;
  sock_addr.sin_port = htons ((unsigned short) (atoi (port)));
  sock_addr_len = sizeof (struct sockaddr_in);

  n = INADDR_ANY;
  memcpy (&sock_addr.sin_addr, &n, sizeof (int));

  if (bind (sock_fd, (struct sockaddr *) &sock_addr, sock_addr_len) < 0)
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_BIND, errno);
      return (-1);
    }

#if defined (WINDOWS)
  if (listen (sock_fd, SOMAXCONN_HINT (shm_appl->job_queue_size)) < 0)
#else
  if (listen (sock_fd, shm_appl->job_queue_size) < 0)
#endif
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_BIND, 0);
      return (-1);
    }

  return (0);
}

static int
read_from_client (SOCKET sock_fd, char *buf, int size)
{
  return read_from_client_with_timeout (sock_fd, buf, size, 60);
}

static int
read_from_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec)
{
  int len = size;
  int read_len;

  if (IS_INVALID_SOCKET (sock_fd))
    {
      return -1;
    }

#ifdef ASYNC_MODE
  while (size > 0)
    {
      read_len = read_buffer_async (sock_fd, buf, size, timeout_sec);
      if (read_len <= 0)
    {
      return -1;
    }

      buf += read_len;
      size -= read_len;
    }
#else
  read_len = READ_FROM_SOCKET (sock_fd, buf, size);
#endif

  return len;
}

static int
write_to_client (SOCKET sock_fd, char *buf, int size)
{
  return write_to_client_with_timeout (sock_fd, buf, size, 60);
}

static int
write_to_client_with_timeout (SOCKET sock_fd, char *buf, int size, int timeout_sec)
{
  int len = size;
  int write_len = -1;

  if (IS_INVALID_SOCKET (sock_fd))
    {
      return -1;
    }

#ifdef ASYNC_MODE
  while (size > 0)
    {
      write_len = write_buffer_async (sock_fd, buf, size, timeout_sec);

      if (write_len <= 0)
    {
      return -1;
    }

      buf += write_len;
      size -= write_len;
    }
#else
  len = WRITE_TO_SOCKET (sock_fd, buf, size);
#endif

  return len;
}

/*
 * run_appl_server () -
 *   return: pid
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   proxy_index(in): it's only valid in SHARD! proxy index
 *   shard_index(in): it's only valid in SHARD! shard index
 *   as_index(in): cas index
 *
 * Note: activate CAS
 */
static int
run_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
{
  char appl_name[APPL_SERVER_NAME_MAX_SIZE];
  int pid;
  char argv0[128];
#if !defined(WINDOWS)
  int i;
#endif
  char as_id_env_str[32];
  char appl_server_shm_key_env_str[32];

  while (1)
    {
      pthread_mutex_lock (&run_appl_mutex);
      if (run_appl_server_flag)
    {
      pthread_mutex_unlock (&run_appl_mutex);
      SLEEP_MILISEC (0, 100);
      continue;
    }
      else
    {
      run_appl_server_flag = 1;
      pthread_mutex_unlock (&run_appl_mutex);
      break;
    }
    }

  as_info_p->service_ready_flag = FALSE;

#if !defined(WINDOWS)
  signal (SIGCHLD, SIG_IGN);

  /* shard_cas does not have unix-domain socket */
  if (br_shard_flag == OFF)
    {
      char path[BROKER_PATH_MAX];

      ut_get_as_port_name (path, shm_br->br_info[br_index].name, as_index, BROKER_PATH_MAX);
      unlink (path);
    }

  pid = fork ();
  if (pid == 0)
    {
      signal (SIGCHLD, SIG_DFL);

      for (i = 3; i <= max_open_fd; i++)
    {
      close (i);
    }
#endif
      strcpy (appl_name, shm_appl->appl_server_name);

      sprintf (appl_server_shm_key_env_str, "%s=%d", APPL_SERVER_SHM_KEY_STR,
           shm_br->br_info[br_index].appl_server_shm_id);
      putenv (appl_server_shm_key_env_str);

      snprintf (as_id_env_str, sizeof (as_id_env_str), "%s=%d", AS_ID_ENV_STR, as_index);
      putenv (as_id_env_str);

      if (br_shard_flag == ON)
    {
      snprintf (argv0, sizeof (argv0) - 1, "%s_%s_%d_%d_%d", shm_br->br_info[br_index].name, appl_name,
            as_info_p->proxy_id + 1, as_info_p->shard_id, as_info_p->shard_cas_id + 1);
    }
      else
    {
      snprintf (argv0, sizeof (argv0) - 1, "%s_%s_%d", shm_br->br_info[br_index].name, appl_name, as_index + 1);
    }

#if defined(WINDOWS)
      pid = run_child (appl_name);
#else
      execle (appl_name, argv0, NULL, environ);
#endif

#if !defined(WINDOWS)
      exit (0);
    }
#endif

  if (br_shard_flag == ON)
    {
      as_info_p->uts_status = UTS_STATUS_CON_WAIT;
    }

  CON_STATUS_LOCK_DESTROY (as_info_p);
  CON_STATUS_LOCK_INIT (as_info_p);
  if (ut_is_appl_server_ready (pid, &as_info_p->service_ready_flag))
    {
      as_info_p->transaction_start_time = (time_t) 0;
      as_info_p->num_restarts++;
    }
  else
    {
      pid = -1;
    }

  run_appl_server_flag = 0;

  return pid;
}

/*
 * stop_appl_server () -
 *   return: NO_ERROR
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   as_index(in): cas index
 *
 * Note: inactivate CAS
 */
static int
stop_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
{
  ut_kill_as_process (as_info_p->pid, shm_br->br_info[br_index].name, as_index, br_shard_flag);

#if defined(WINDOWS)
  /* [CUBRIDSUS-2068] make the broker sleep for 0.1 sec when stopping the cas in order to prevent communication error
   * occurred on windows. */
  SLEEP_MILISEC (0, 100);
#endif

  as_info_p->pid = 0;
  as_info_p->last_access_time = time (NULL);
  as_info_p->transaction_start_time = (time_t) 0;

  return 0;
}

/*
 * restart_appl_server () -
 *   return: void
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   as_index(in): cas index
 *
 * Note: inactivate and activate CAS
 */
static void
restart_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
{
  int new_pid;

#if defined(WINDOWS)
  ut_kill_as_process (as_info_p->pid, shm_br->br_info[br_index].name, as_info_p->as_id, br_shard_flag);

  /* [CUBRIDSUS-2068] make the broker sleep for 0.1 sec when stopping the cas in order to prevent communication error
   * occurred on windows. */

  SLEEP_MILISEC (0, 100);

  new_pid = run_appl_server (as_info_p, br_index, as_index);
  as_info_p->pid = new_pid;
#else

  as_info_p->psize = getsize (as_info_p->pid);
  if (as_info_p->psize > 1)
    {
      as_info_p->psize_time = time (NULL);
    }
  else
    {
      char pid_file_name[BROKER_PATH_MAX];
      FILE *fp;
      int old_pid;

      ut_get_as_pid_name (pid_file_name, shm_br->br_info[br_index].name, as_index, BROKER_PATH_MAX);

      fp = fopen (pid_file_name, "r");
      if (fp)
    {
      fscanf (fp, "%d", &old_pid);
      fclose (fp);

      as_info_p->psize = getsize (old_pid);
      if (as_info_p->psize > 1)
        {
          as_info_p->pid = old_pid;
          as_info_p->psize_time = time (NULL);
        }
      else
        {
          unlink (pid_file_name);
        }
    }
    }

  if (as_info_p->psize <= 0)
    {
      if (as_info_p->pid > 0)
    {
      ut_kill_as_process (as_info_p->pid, shm_br->br_info[br_index].name, as_index, br_shard_flag);
    }

      new_pid = run_appl_server (as_info_p, br_index, as_index);
      as_info_p->pid = new_pid;
    }
#endif
}

static int
read_nbytes_from_client (SOCKET sock_fd, char *buf, int size)
{
  int total_read_size = 0, read_len;

  while (total_read_size < size)
    {
      read_len = read_from_client (sock_fd, buf + total_read_size, size - total_read_size);
      if (read_len <= 0)
    {
      total_read_size = -1;
      break;
    }
      total_read_size += read_len;
    }
  return total_read_size;
}

static SOCKET
connect_srv (char *br_name, int as_index)
{
  int sock_addr_len;
#if defined(WINDOWS)
  struct sockaddr_in sock_addr;
#else
  struct sockaddr_un sock_addr;
#endif
  SOCKET srv_sock_fd;
  int one = 1;
  char retry_count = 0;

retry:

#if defined(WINDOWS)
  srv_sock_fd = socket (AF_INET, SOCK_STREAM, 0);
  if (IS_INVALID_SOCKET (srv_sock_fd))
    return INVALID_SOCKET;

  memset (&sock_addr, 0, sizeof (struct sockaddr_in));
  sock_addr.sin_family = AF_INET;
  sock_addr.sin_port = htons ((unsigned short) shm_appl->as_info[as_index].as_port);
  memcpy (&sock_addr.sin_addr, shm_br->my_ip_addr, 4);
  sock_addr_len = sizeof (struct sockaddr_in);
#else
  srv_sock_fd = socket (AF_UNIX, SOCK_STREAM, 0);
  if (IS_INVALID_SOCKET (srv_sock_fd))
    return INVALID_SOCKET;

  memset (&sock_addr, 0, sizeof (struct sockaddr_un));
  sock_addr.sun_family = AF_UNIX;

  ut_get_as_port_name (sock_addr.sun_path, br_name, as_index, sizeof (sock_addr.sun_path));

  sock_addr_len = strlen (sock_addr.sun_path) + sizeof (sock_addr.sun_family) + 1;
#endif

  if (connect (srv_sock_fd, (struct sockaddr *) &sock_addr, sock_addr_len) < 0)
    {
      if (retry_count < 1)
    {
      int new_pid;

      ut_kill_as_process (shm_appl->as_info[as_index].pid, shm_br->br_info[br_index].name, as_index, br_shard_flag);

      new_pid = run_appl_server (&(shm_appl->as_info[as_index]), br_index, as_index);
      shm_appl->as_info[as_index].pid = new_pid;
      retry_count++;
      CLOSE_SOCKET (srv_sock_fd);
      goto retry;
    }
      CLOSE_SOCKET (srv_sock_fd);
      return INVALID_SOCKET;
    }

  setsockopt (srv_sock_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof (one));

  return srv_sock_fd;
}

/*
 * cas_monitor_worker () -
 *   return: void
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   as_index(in): cas index
 *   busy_uts(out): counting UTS_STATUS_BUSY status cas
 *
 * Note: monitoring CAS
 */
static void
cas_monitor_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index, int *busy_uts)
{
  int new_pid;
  int restart_flag = OFF;
  T_PROXY_INFO *proxy_info_p = NULL;
  T_SHARD_INFO *shard_info_p = NULL;

  if (as_info_p->service_flag != SERVICE_ON)
    {
      return;
    }
  if (as_info_p->uts_status == UTS_STATUS_BUSY)
    {
      (*busy_uts)++;
    }
#if defined(WINDOWS)
  else if (as_info_p->uts_status == UTS_STATUS_BUSY_WAIT)
    {
      if (time (NULL) - as_info_p->last_access_time > 10)
    {
      as_info_p->uts_status = UTS_STATUS_IDLE;
    }
      else
    {
      (*busy_uts)++;
    }
    }
#endif

#if defined(WINDOWS)
  if (shm_appl->use_pdh_flag == TRUE)
    {
      if ((as_info_p->pid == as_info_p->pdh_pid)
      && (as_info_p->pdh_workset > shm_br->br_info[br_index].appl_server_hard_limit))
    {
      as_info_p->uts_status = UTS_STATUS_RESTART;
    }
    }
#else
  if (as_info_p->psize > shm_appl->appl_server_hard_limit)
    {
      as_info_p->uts_status = UTS_STATUS_RESTART;
    }
#endif

/*  if (as_info_p->service_flag != SERVICE_ON)
        continue;  */
  /* check cas process status and restart it */

  if (br_shard_flag == ON
      && (as_info_p->uts_status == UTS_STATUS_BUSY || as_info_p->uts_status == UTS_STATUS_IDLE
      || as_info_p->uts_status == UTS_STATUS_START))
    {
      restart_flag = ON;
    }
  else if (br_shard_flag == OFF && as_info_p->uts_status == UTS_STATUS_BUSY)
    {
      restart_flag = ON;
    }

  if (restart_flag)
    {
#if defined(WINDOWS)
      HANDLE phandle;
      phandle = OpenProcess (SYNCHRONIZE, FALSE, as_info_p->pid);
      if (phandle == NULL)
    {
      restart_appl_server (as_info_p, br_index, as_index);
      as_info_p->uts_status = UTS_STATUS_IDLE;
    }
      else
    {
      CloseHandle (phandle);
    }
#else
      if (kill (as_info_p->pid, 0) < 0)
    {
      restart_appl_server (as_info_p, br_index, as_index);
      as_info_p->uts_status = UTS_STATUS_IDLE;
    }
#endif
    }

  if (as_info_p->uts_status == UTS_STATUS_RESTART)
    {
      stop_appl_server (as_info_p, br_index, as_index);
      new_pid = run_appl_server (as_info_p, br_index, as_index);

      as_info_p->pid = new_pid;
      as_info_p->uts_status = UTS_STATUS_IDLE;
    }
  else if (br_shard_flag == ON && as_info_p->uts_status == UTS_STATUS_STOP)
    {
      proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, as_info_p->proxy_id);
      shard_info_p = shard_shm_find_shard_info (proxy_info_p, as_info_p->shard_id);
      assert (shard_info_p != NULL);

      (shm_br->br_info[br_index].appl_server_num)--;
      (shard_info_p->num_appl_server)--;
      (shm_appl->num_appl_server)--;

      as_info_p->service_flag = SERVICE_OFF;
      as_info_p->reset_flag = FALSE;

      stop_appl_server (as_info_p, br_index, as_index);

      as_info_p->uts_status = UTS_STATUS_IDLE;
      as_info_p->con_status = CON_STATUS_CLOSE;
      as_info_p->num_request = 0;

      CON_STATUS_LOCK_DESTROY (as_info_p);
    }
}

static THREAD_FUNC
cas_monitor_thr_f (void *ar)
{
  int i, tmp_num_busy_uts;

  while (process_flag)
    {
      tmp_num_busy_uts = 0;
      for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
    {
      cas_monitor_worker (&(shm_appl->as_info[i]), br_index, i, &tmp_num_busy_uts);
    }

      num_busy_uts = tmp_num_busy_uts;
      shm_br->br_info[br_index].num_busy_count = num_busy_uts;
      SLEEP_MILISEC (0, 100);
    }

#if !defined(WINDOWS)
  return NULL;
#endif
}

static CSS_CONN_ENTRY *
connect_to_master_for_server_monitor (const char *db_name, const char *db_host)
{
  int port_id;
  unsigned short rid;

  if (sysprm_load_and_init (db_name, NULL, SYSPRM_IGNORE_INTL_PARAMS) != NO_ERROR)
    {
      return NULL;
    }

  port_id = prm_get_master_port_id ();
  if (port_id <= 0)
    {
      return NULL;
    }

  /* timeout : 5000 milliseconds */
  return (__gv_cvar.css_connect_to_master_timeout (db_host, port_id, 5000, &rid));
}

static int
get_server_state_from_master (CSS_CONN_ENTRY * conn, const char *db_name)
{
  unsigned short request_id;
  int error = NO_ERROR;
  int server_state;
  int buffer_size;
  int *buffer = NULL;

  if (conn == NULL)
    {
      return SERVER_STATE_DEAD;
    }

  error = __gv_cvar.css_send_request (conn, GET_SERVER_STATE, &request_id, db_name, (int) strlen (db_name) + 1);
  if (error != NO_ERRORS)
    {
      return SERVER_STATE_DEAD;
    }

  /* timeout : 5000 milliseconds */
  error = __gv_cvar.css_receive_data (conn, request_id, (char **) &buffer, &buffer_size, 5000);
  if (error == NO_ERRORS)
    {
      if (buffer_size == sizeof (int))
    {
      server_state = ntohl (*buffer);
      free_and_init (buffer);

      return server_state;
    }
    }

  if (buffer != NULL)
    {
      free_and_init (buffer);
    }

  return SERVER_STATE_UNKNOWN;
}

static int
insert_db_server_check_list (T_DB_SERVER * list_p, int check_list_cnt, const char *db_name, const char *db_host)
{
  int i;

  for (i = 0; i < check_list_cnt && i < UNUSABLE_DATABASE_MAX; i++)
    {
      if (strcmp (db_name, list_p[i].database_name) == 0 && strcmp (db_host, list_p[i].database_host) == 0)
    {
      return check_list_cnt;
    }
    }

  if (i == UNUSABLE_DATABASE_MAX)
    {
      return UNUSABLE_DATABASE_MAX;
    }

  strncpy_bufsize (list_p[i].database_name, db_name);
  strncpy_bufsize (list_p[i].database_host, db_host);
  list_p[i].state = -1;

  return i + 1;
}

static THREAD_FUNC
server_monitor_thr_f (void *arg)
{
  int i, j, cnt;
  int u_index;
  int check_list_cnt = 0;
  T_APPL_SERVER_INFO *as_info_p;
  T_DB_SERVER *check_list;
  CSS_CONN_ENTRY *conn = NULL;
  DB_INFO *db_info_p = NULL;
  char **preferred_hosts;
  char *unusable_db_name;
  char *unusable_db_host;
  char busy_cas_db_name[SRV_CON_DBNAME_SIZE];

  er_init (NULL, ER_NEVER_EXIT);

  check_list = (T_DB_SERVER *) malloc (sizeof (T_DB_SERVER) * UNUSABLE_DATABASE_MAX);

  while (process_flag)
    {
      if (!shm_appl->monitor_server_flag || br_shard_flag == ON
      || shm_br->br_info[br_index].appl_server != APPL_SERVER_CAS || check_list == NULL)
    {
      shm_appl->unusable_databases_seq = 0;
      memset (shm_appl->unusable_databases_cnt, 0, sizeof (shm_appl->unusable_databases_cnt));
      SLEEP_MILISEC (MONITOR_SERVER_INTERVAL, 0);
      continue;
    }

      /* 1. collect server check list */
      check_list_cnt = 0;
      u_index = shm_appl->unusable_databases_seq % 2;

      for (i = 0; i < shm_appl->unusable_databases_cnt[u_index]; i++)
    {
      unusable_db_name = shm_appl->unusable_databases[u_index][i].database_name;
      unusable_db_host = shm_appl->unusable_databases[u_index][i].database_host;

      check_list_cnt = insert_db_server_check_list (check_list, check_list_cnt, unusable_db_name, unusable_db_host);
    }

      for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
    {
      as_info_p = &(shm_appl->as_info[i]);
      if (as_info_p->uts_status == UTS_STATUS_BUSY)
        {
          strncpy (busy_cas_db_name, as_info_p->database_name, SRV_CON_DBNAME_SIZE - 1);

          if (busy_cas_db_name[0] != '\0')
        {
          preferred_hosts = util_split_string (shm_appl->preferred_hosts, ":");
          if (preferred_hosts != NULL)
            {
              for (j = 0; preferred_hosts[j] != NULL; j++)
            {
              check_list_cnt =
                insert_db_server_check_list (check_list, check_list_cnt, busy_cas_db_name,
                             preferred_hosts[j]);
            }

              util_free_string_array (preferred_hosts);
            }

          db_info_p = cfg_find_db (busy_cas_db_name);
          if (db_info_p == NULL || db_info_p->hosts == NULL)
            {
              if (db_info_p)
            {
              cfg_free_directory (db_info_p);
            }
              continue;
            }

          for (j = 0; j < db_info_p->num_hosts; j++)
            {
              check_list_cnt =
            insert_db_server_check_list (check_list, check_list_cnt, busy_cas_db_name, db_info_p->hosts[j]);
            }

          cfg_free_directory (db_info_p);
        }
        }
    }

      /* 2. check server state */
      for (i = 0; i < check_list_cnt; i++)
    {
      conn = connect_to_master_for_server_monitor (check_list[i].database_name, check_list[i].database_host);
      check_list[i].state = get_server_state_from_master (conn, check_list[i].database_name);

      if (conn != NULL)
        {
          __gv_cvar.css_free_conn (conn);
          conn = NULL;
        }
    }

      /* 3. record server state to the shared memory */
      cnt = 0;
      u_index = (shm_appl->unusable_databases_seq + 1) % 2;

      for (i = 0; i < check_list_cnt; i++)
    {
      if (check_list[i].state < SERVER_STATE_REGISTERED && check_list[i].state != SERVER_STATE_UNKNOWN)
        {
          strncpy (shm_appl->unusable_databases[u_index][cnt].database_name, check_list[i].database_name,
               SRV_CON_DBNAME_SIZE - 1);
          strncpy (shm_appl->unusable_databases[u_index][cnt].database_host, check_list[i].database_host,
               CUB_MAXHOSTNAMELEN - 1);
          cnt++;
        }
    }

      shm_appl->unusable_databases_cnt[u_index] = cnt;
      shm_appl->unusable_databases_seq++;

      SLEEP_MILISEC (MONITOR_SERVER_INTERVAL, 0);
    }

  free_and_init (check_list);

  er_final (ER_ALL_FINAL);

#if !defined(WINDOWS)
  return NULL;
#endif
}

static THREAD_FUNC
hang_check_thr_f (void *ar)
{
  unsigned int cur_index;
  int cur_hang_count;
  T_BROKER_INFO *br_info_p;
  time_t cur_time;
  int collect_count_interval;
  int hang_count[NUM_COLLECT_COUNT_PER_INTVL] = { 0, 0, 0, 0 };
  float avg_hang_count;

  int proxy_index, i;
  T_PROXY_INFO *proxy_info_p = NULL;
  T_APPL_SERVER_INFO *as_info_p;

  SLEEP_MILISEC (shm_br->br_info[br_index].monitor_hang_interval, 0);

  br_info_p = &(shm_br->br_info[br_index]);
  cur_hang_count = 0;
  cur_index = 0;
  avg_hang_count = 0.0;
  collect_count_interval = br_info_p->monitor_hang_interval / NUM_COLLECT_COUNT_PER_INTVL;

  while (process_flag)
    {
      cur_time = time (NULL);
      if (br_shard_flag == OFF)
    {
      for (i = 0; i < br_info_p->appl_server_max_num; i++)
        {
          as_info_p = &(shm_appl->as_info[i]);

          if ((as_info_p->service_flag != SERVICE_ON) || as_info_p->claimed_alive_time == 0)
        {
          continue;
        }
          if ((br_info_p->hang_timeout < cur_time - as_info_p->claimed_alive_time))
        {
          cur_hang_count++;
        }
        }
    }
      else
    {
      for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
        {
          proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);

          if ((proxy_info_p->service_flag != SERVICE_ON) || (proxy_info_p->claimed_alive_time == 0))
        {
          continue;
        }

          if ((br_info_p->hang_timeout < cur_time - proxy_info_p->claimed_alive_time))
        {
          cur_hang_count++;
        }
        }
    }

      hang_count[cur_index] = cur_hang_count;

      avg_hang_count = ut_get_avg_from_array (hang_count, NUM_COLLECT_COUNT_PER_INTVL);

      if (br_shard_flag == OFF)
    {
      br_info_p->reject_client_flag =
        (avg_hang_count >= (float) br_info_p->appl_server_num * HANG_COUNT_THRESHOLD_RATIO);
    }
      else
    {
      /*
       * reject_client_flag for shard broker
       * does not depend on the current number of proxies.
       * If one proxy hangs for the last 1 min, then
       * it will disable shard_broker no matter how many proxies
       * there are.
       */
      br_info_p->reject_client_flag = (avg_hang_count >= 1);
    }

      cur_index = (cur_index + 1) % NUM_COLLECT_COUNT_PER_INTVL;
      cur_hang_count = 0;

      SLEEP_MILISEC (collect_count_interval, 0);
    }
#if !defined(WINDOWS)
  return NULL;
#endif
}

/*
 * psize_check_worker () -
 *   return: void
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   proxy_index(in): it's only valid in SHARD! proxy index
 *   shard_index(in): it's only valid in SHARD! shard index
 *   as_index(in): cas index
 *
 * Note: check cas psize and cas log
 */
static void
psize_check_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
{
#if defined(WINDOWS)
  int pid;
  int cpu_time;
  int workset_size;
  float pct_cpu;
#endif

  if (as_info_p->service_flag != SERVICE_ON)
    {
      return;
    }

#if defined(WINDOWS)
  pid = as_info_p->pid;

  cpu_time = get_cputime_sec (pid);
  if (cpu_time < 0)
    {
      as_info_p->cpu_time = 0;
#if 0
      HANDLE hProcess;
      hProcess = OpenProcess (PROCESS_QUERY_INFORMATION, FALSE, pid);
      if (hProcess == NULL)
    {
      pid = 0;
      as_info_p->pid = 0;
      as_info_p->cpu_time = 0;
    }
#endif
    }
  else
    {
      as_info_p->cpu_time = cpu_time;
    }

  if (pdh_get_value (pid, &workset_size, &pct_cpu, NULL) >= 0)
    {
      as_info_p->pdh_pid = pid;
      as_info_p->pdh_workset = workset_size;
      as_info_p->pdh_pct_cpu = pct_cpu;
    }
#else
  as_info_p->psize = getsize (as_info_p->pid);
#if 0
  if (as_info_p->psize < 0 && as_info_p->pid > 0)
    {
      if (kill (as_info_p->pid, 0) < 0 && errno == ESRCH)
    {
      as_info_p->pid = 0;
    }
    }
#endif
#endif /* WINDOWS */

  check_cas_log (shm_br->br_info[br_index].name, as_info_p, as_index);
}

static void
check_proxy_log (char *br_name, T_PROXY_INFO * proxy_info_p)
{
  char log_filepath[BROKER_PATH_MAX];

  if (proxy_info_p->cur_proxy_log_mode != PROXY_LOG_MODE_NONE)
    {
      snprintf (log_filepath, sizeof (log_filepath), "%s/%s_%d.log", shm_appl->proxy_log_dir, br_name,
        proxy_info_p->proxy_id + 1);

      if (access (log_filepath, F_OK) < 0)
    {
      FILE *fp;

      fp = fopen (log_filepath, "a");
      if (fp != NULL)
        {
          fclose (fp);
        }
      proxy_info_p->proxy_log_reset = PROXY_LOG_RESET_REOPEN;
    }
    }

  return;
}

static void
check_proxy_access_log (T_PROXY_INFO * proxy_info_p)
{
  char *access_log_file;
  FILE *fp;

  access_log_file = proxy_info_p->access_log_file;
  if (access (access_log_file, F_OK) < 0)
    {
      fp = fopen (access_log_file, "a");
      if (fp != NULL)
    {
      fclose (fp);
    }
      proxy_info_p->proxy_access_log_reset = PROXY_LOG_RESET_REOPEN;
    }

  return;
}


static void
proxy_check_worker (int br_index, T_PROXY_INFO * proxy_info_p)
{
  check_proxy_log (shm_br->br_info[br_index].name, proxy_info_p);
  check_proxy_access_log (proxy_info_p);

  return;
}

#if defined(WINDOWS)
static int
get_cputime_sec (int pid)
{
  ULARGE_INTEGER ul;
  HANDLE hProcess;
  FILETIME ctime, etime, systime, usertime;
  int cputime = 0;

  if (pid <= 0)
    return 0;

  hProcess = OpenProcess (PROCESS_QUERY_INFORMATION, FALSE, pid);
  if (hProcess == NULL)
    {
      return -1;
    }

  if (GetProcessTimes (hProcess, &ctime, &etime, &systime, &usertime) != 0)
    {
      ul.HighPart = systime.dwHighDateTime + usertime.dwHighDateTime;
      ul.LowPart = systime.dwLowDateTime + usertime.dwLowDateTime;
      cputime = ((int) (ul.QuadPart / 10000000));
    }
  CloseHandle (hProcess);

  return cputime;
}

static THREAD_FUNC
psize_check_thr_f (void *ar)
{
  int workset_size;
  float pct_cpu;
  int cpu_time;
  int br_num_thr;
  int i;
  int proxy_index;

  T_PROXY_INFO *proxy_info_p = NULL;
  T_SHARD_INFO *shard_info_p = NULL;

  if (pdh_init () < 0)
    {
      shm_appl->use_pdh_flag = FALSE;
      return;
    }
  else
    {
      shm_appl->use_pdh_flag = TRUE;
    }

  while (process_flag)
    {
      pdh_collect ();

      if (pdh_get_value (shm_br->br_info[br_index].pid, &workset_size, &pct_cpu, &br_num_thr) < 0)
    {
      shm_br->br_info[br_index].pdh_pct_cpu = 0;
    }
      else
    {
      cpu_time = get_cputime_sec (shm_br->br_info[br_index].pid);
      if (cpu_time >= 0)
        {
          shm_br->br_info[br_index].cpu_time = cpu_time;
        }
      shm_br->br_info[br_index].pdh_workset = workset_size;
      shm_br->br_info[br_index].pdh_pct_cpu = pct_cpu;
      shm_br->br_info[br_index].pdh_num_thr = br_num_thr;
    }

      if (br_shard_flag == ON)
    {
      for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
        {
          proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);

          proxy_check_worker (br_index, proxy_info_p);
        }
    }

      for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
    {
      psize_check_worker (&(shm_appl->as_info[i]), br_index, i);
    }
      SLEEP_MILISEC (1, 0);
    }
}

#else /* WINDOWS */

static THREAD_FUNC
psize_check_thr_f (void *ar)
{
  int i;
  int proxy_index;

  T_PROXY_INFO *proxy_info_p = NULL;

  while (process_flag)
    {
      if (br_shard_flag == ON)
    {
      for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
        {
          proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);

          proxy_check_worker (br_index, proxy_info_p);
        }
    }

      for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
    {
      psize_check_worker (&(shm_appl->as_info[i]), br_index, i);
    }

      SLEEP_MILISEC (1, 0);
    }

  return NULL;
}
#endif /* !WINDOWS */

/*
 * check_cas_log () -
 *   return: void
 *   br_name(in): broker name
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   as_index(in): cas index
 * Note: check cas log and recreate
 */
static void
check_cas_log (char *br_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
{
  char log_filename[BROKER_PATH_MAX];

  if (IS_NOT_APPL_SERVER_TYPE_CAS (shm_br->br_info[br_index].appl_server))
    {
      return;
    }

  if (as_info_p->cur_sql_log_mode != SQL_LOG_MODE_NONE)
    {
      get_as_sql_log_filename (log_filename, BROKER_PATH_MAX, br_name, as_info_p, as_index);

      if (access (log_filename, F_OK) < 0)
    {
      FILE *fp;
      fp = fopen (log_filename, "a");
      if (fp != NULL)
        {
          fclose (fp);
        }
      as_info_p->cas_log_reset = CAS_LOG_RESET_REOPEN;
    }
    }

  if (as_info_p->cur_slow_log_mode != SLOW_LOG_MODE_OFF)
    {
      get_as_slow_log_filename (log_filename, BROKER_PATH_MAX, br_name, as_info_p, as_index);

      if (access (log_filename, F_OK) < 0)
    {
      FILE *fp;
      fp = fopen (log_filename, "a");
      if (fp != NULL)
        {
          fclose (fp);
        }
      as_info_p->cas_slow_log_reset = CAS_LOG_RESET_REOPEN;
    }
    }
}

#ifdef WIN_FW
static int
process_cas_request (int cas_pid, int as_index, SOCKET clt_sock_fd, SOCKET srv_sock_fd)
{
  char read_buf[1024];
  int msg_size;
  int read_len;
  int tmp_int;
  char *tmp_p;

  msg_size = SRV_CON_DB_INFO_SIZE;
  while (msg_size > 0)
    {
      read_len = read_from_cas_client (clt_sock_fd, read_buf, msg_size, as_index, cas_pid);
      if (read_len <= 0)
    {
      return -1;
    }
      if (send (srv_sock_fd, read_buf, read_len, 0) < read_len)
    return -1;
      msg_size -= read_len;
    }

  if (recv (srv_sock_fd, (char *) &msg_size, 4, 0) < 4)
    return -1;
  if (write_to_client (clt_sock_fd, (char *) &msg_size, 4) < 0)
    return -1;
  msg_size = ntohl (msg_size);
  while (msg_size > 0)
    {
      read_len = recv (srv_sock_fd, read_buf, (msg_size > sizeof (read_buf) ? sizeof (read_buf) : msg_size), 0);
      if (read_len <= 0)
    {
      return -1;
    }
      if (write_to_client (clt_sock_fd, read_buf, read_len) < 0)
    {
      return -1;
    }
      msg_size -= read_len;
    }

  while (1)
    {
      tmp_int = 4;
      tmp_p = (char *) &msg_size;
      while (tmp_int > 0)
    {
      read_len = read_from_cas_client (clt_sock_fd, tmp_p, tmp_int, as_index, cas_pid);
      if (read_len <= 0)
        {
          return -1;
        }
      tmp_int -= read_len;
      tmp_p += read_len;
    }
      if (send (srv_sock_fd, (char *) &msg_size, 4, 0) < 0)
    {
      return -1;
    }

      msg_size = ntohl (msg_size);
      while (msg_size > 0)
    {
      read_len =
        read_from_cas_client (clt_sock_fd, read_buf, (msg_size > sizeof (read_buf) ? sizeof (read_buf) : msg_size),
                  as_index, cas_pid);
      if (read_len <= 0)
        {
          return -1;
        }
      if (send (srv_sock_fd, read_buf, read_len, 0) < read_len)
        {
          return -1;
        }
      msg_size -= read_len;
    }

      if (recv (srv_sock_fd, (char *) &msg_size, 4, 0) < 4)
    {
      return -1;
    }
      if (write_to_client (clt_sock_fd, (char *) &msg_size, 4) < 0)
    {
      return -1;
    }

      msg_size = ntohl (msg_size);
      while (msg_size > 0)
    {
      read_len = recv (srv_sock_fd, read_buf, (msg_size > sizeof (read_buf) ? sizeof (read_buf) : msg_size), 0);
      if (read_len <= 0)
        {
          return -1;
        }
      if (write_to_client (clt_sock_fd, read_buf, read_len) < 0)
        {
          return -1;
        }
      msg_size -= read_len;
    }

      if (shm_appl->as_info[as_index].close_flag || shm_appl->as_info[as_index].pid != cas_pid)
    {
      break;
    }
    }

  return 0;
}

static int
read_from_cas_client (SOCKET sock_fd, char *buf, int size, int as_index, int cas_pid)
{
  int read_len;
#ifdef ASYNC_MODE
  SELECT_MASK read_mask;
  int nfound;
  int maxfd;
  struct timeval timeout = { 1, 0 };
#endif

retry:

#ifdef ASYNC_MODE
  FD_ZERO (&read_mask);
  FD_SET (sock_fd, (fd_set *) (&read_mask));
  maxfd = sock_fd + 1;
  nfound = select (maxfd, &read_mask, (SELECT_MASK *) 0, (SELECT_MASK *) 0, &timeout);
  if (nfound < 1)
    {
      if (shm_appl->as_info[as_index].close_flag || shm_appl->as_info[as_index].pid != cas_pid)
    {
      return -1;
    }
      goto retry;
    }
#endif

#ifdef ASYNC_MODE
  if (FD_ISSET (sock_fd, (fd_set *) (&read_mask)))
    {
#endif
      read_len = READ_FROM_SOCKET (sock_fd, buf, size);
#ifdef ASYNC_MODE
    }
  else
    {
      return -1;
    }
#endif

  return read_len;
}
#endif

static int
find_idle_cas (void)
{
  int i;
  int idle_cas_id = -1;
  time_t max_wait_time;
  int wait_cas_id;
  time_t cur_time = time (NULL);

  pthread_mutex_lock (&broker_shm_mutex);

  wait_cas_id = -1;
  max_wait_time = 0;

  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
    {
      if (shm_appl->as_info[i].service_flag != SERVICE_ON)
    {
      continue;
    }
      if (shm_appl->as_info[i].uts_status == UTS_STATUS_IDLE
#if !defined (WINDOWS)
      && kill (shm_appl->as_info[i].pid, 0) == 0
#endif
    )
    {
      idle_cas_id = i;
      wait_cas_id = -1;
      break;
    }
      if (shm_br->br_info[br_index].appl_server_num == shm_br->br_info[br_index].appl_server_max_num
      && shm_appl->as_info[i].uts_status == UTS_STATUS_BUSY && shm_appl->as_info[i].cur_keep_con == KEEP_CON_AUTO
      && shm_appl->as_info[i].con_status == CON_STATUS_OUT_TRAN && shm_appl->as_info[i].num_holdable_results < 1
      && shm_appl->as_info[i].cas_change_mode == CAS_CHANGE_MODE_AUTO)
    {
      time_t wait_time = cur_time - shm_appl->as_info[i].last_access_time;
      if (wait_time > max_wait_time || wait_cas_id == -1)
        {
          max_wait_time = wait_time;
          wait_cas_id = i;
        }
    }
    }

  if (wait_cas_id >= 0)
    {
      CON_STATUS_LOCK (&(shm_appl->as_info[wait_cas_id]), CON_STATUS_LOCK_BROKER);
      if (shm_appl->as_info[wait_cas_id].con_status == CON_STATUS_OUT_TRAN
      && shm_appl->as_info[wait_cas_id].num_holdable_results < 1
      && shm_appl->as_info[wait_cas_id].cas_change_mode == CAS_CHANGE_MODE_AUTO)
    {
      idle_cas_id = wait_cas_id;
      shm_appl->as_info[wait_cas_id].con_status = CON_STATUS_CLOSE_AND_CONNECT;
    }
      CON_STATUS_UNLOCK (&(shm_appl->as_info[wait_cas_id]), CON_STATUS_LOCK_BROKER);
    }

#if defined(WINDOWS)
  if (idle_cas_id >= 0)
    {
      HANDLE h_proc;
      h_proc = OpenProcess (SYNCHRONIZE, FALSE, shm_appl->as_info[idle_cas_id].pid);
      if (h_proc == NULL)
    {
      shm_appl->as_info[i].uts_status = UTS_STATUS_RESTART;
      idle_cas_id = -1;
    }
      else
    {
      CloseHandle (h_proc);
    }
    }
#endif

  if (idle_cas_id < 0)
    {
      pthread_mutex_unlock (&broker_shm_mutex);
      return -1;
    }

  shm_appl->as_info[idle_cas_id].uts_status = UTS_STATUS_BUSY;
  pthread_mutex_unlock (&broker_shm_mutex);

  return idle_cas_id;
}

static int
find_drop_as_index (void)
{
  int i, drop_as_index, exist_idle_cas;
  time_t max_wait_time, wait_time;

  pthread_mutex_lock (&broker_shm_mutex);
  if (IS_NOT_APPL_SERVER_TYPE_CAS (shm_br->br_info[br_index].appl_server))
    {
      drop_as_index = shm_br->br_info[br_index].appl_server_num - 1;
      wait_time = time (NULL) - shm_appl->as_info[drop_as_index].last_access_time;
      if (shm_appl->as_info[drop_as_index].uts_status == UTS_STATUS_IDLE
      && wait_time > shm_br->br_info[br_index].time_to_kill)
    {
      pthread_mutex_unlock (&broker_shm_mutex);
      return drop_as_index;
    }
      pthread_mutex_unlock (&broker_shm_mutex);
      return -1;
    }

  drop_as_index = -1;
  max_wait_time = -1;
  exist_idle_cas = 0;

  for (i = shm_br->br_info[br_index].appl_server_max_num - 1; i >= 0; i--)
    {
      if (shm_appl->as_info[i].service_flag != SERVICE_ON)
    continue;

      wait_time = time (NULL) - shm_appl->as_info[i].last_access_time;

      if (shm_appl->as_info[i].uts_status == UTS_STATUS_IDLE)
    {
      if (wait_time > shm_br->br_info[br_index].time_to_kill)
        {
          drop_as_index = i;
          break;
        }
      else
        {
          exist_idle_cas = 1;
          drop_as_index = -1;
        }
    }

      if (shm_appl->as_info[i].uts_status == UTS_STATUS_BUSY && shm_appl->as_info[i].con_status == CON_STATUS_OUT_TRAN
      && shm_appl->as_info[i].num_holdable_results < 1
      && shm_appl->as_info[i].cas_change_mode == CAS_CHANGE_MODE_AUTO && wait_time > max_wait_time
      && wait_time > shm_br->br_info[br_index].time_to_kill && exist_idle_cas == 0)
    {
      max_wait_time = wait_time;
      drop_as_index = i;
    }
    }

  pthread_mutex_unlock (&broker_shm_mutex);

  return drop_as_index;
}

static int
find_add_as_index ()
{
  int i;

  pthread_mutex_lock (&broker_shm_mutex);
  for (i = 0; i < shm_br->br_info[br_index].appl_server_max_num; i++)
    {
      if (shm_appl->as_info[i].service_flag == SERVICE_OFF_ACK && current_dropping_as_index != i)
    {
      pthread_mutex_unlock (&broker_shm_mutex);
      return i;
    }
    }

  pthread_mutex_unlock (&broker_shm_mutex);
  return -1;
}

#if !defined(WINDOWS)
static int
init_proxy_env ()
{
  int len;

  if ((proxy_sock_fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
    {
      return (-1);
    }

  /* FOR DEBUG */
  SHARD_ERR ("<BROKER> listen to unixdoamin:[%s].\n", shm_appl->port_name);

  memset (&shard_sock_addr, 0, sizeof (shard_sock_addr));
  shard_sock_addr.sun_family = AF_UNIX;
  strncpy_bufsize (shard_sock_addr.sun_path, shm_appl->port_name);

#ifdef  _SOCKADDR_LEN       /* 4.3BSD Reno and later */
  len = sizeof (shard_sock_addr.sun_len) + sizeof (shard_sock_addr.sun_family) + strlen (shard_sock_addr.sun_path) + 1;
  shard_sock_addr.sun_len = len;
#else /* vanilla 4.3BSD */
  len = strlen (shard_sock_addr.sun_path) + sizeof (shard_sock_addr.sun_family) + 1;
#endif

  /* bind the name to the descriptor */
  if (bind (proxy_sock_fd, (struct sockaddr *) &shard_sock_addr, len) < 0)
    {
      CLOSE_SOCKET (proxy_sock_fd);
      return (-2);
    }

  if (listen (proxy_sock_fd, 127) < 0)
    {
      /* tell kernel we're a server */
      CLOSE_SOCKET (proxy_sock_fd);
      return (-3);
    }

  return (proxy_sock_fd);
}
#endif /* !WINDOWS */

static int
broker_init_shm (void)
{
  char *p;
  int i;
  int master_shm_key, as_shm_key, port_no, proxy_shm_id;

  p = getenv (MASTER_SHM_KEY_ENV_STR);
  if (p == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_SHM_OPEN, 0);
      goto return_error;
    }
  parse_int (&master_shm_key, p, 10);
  SHARD_ERR ("<BROKER> MASTER_SHM_KEY_ENV_STR:[%d:%x]\n", master_shm_key, master_shm_key);

  shm_br = (T_SHM_BROKER *) uw_shm_open (master_shm_key, SHM_BROKER, SHM_MODE_ADMIN);
  if (shm_br == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_SHM_OPEN, 0);
      goto return_error;
    }

  if ((p = getenv (PORT_NUMBER_ENV_STR)) == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_CREATE_SOCKET, 0);
      goto return_error;
    }
  parse_int (&port_no, p, 10);
  for (i = 0, br_index = -1; i < shm_br->num_broker; i++)
    {
      if (shm_br->br_info[i].port == port_no)
    {
      br_index = i;
      break;
    }
    }
  if (br_index == -1)
    {
      UW_SET_ERROR_CODE (UW_ER_CANT_CREATE_SOCKET, 0);
      goto return_error;
    }
  br_info_p = &shm_br->br_info[i];

  as_shm_key = br_info_p->appl_server_shm_id;
  SHARD_ERR ("<BROKER> APPL_SERVER_SHM_KEY_STR:[%d:%x]\n", as_shm_key, as_shm_key);

  shm_appl = (T_SHM_APPL_SERVER *) uw_shm_open (as_shm_key, SHM_APPL_SERVER, SHM_MODE_ADMIN);
  if (shm_appl == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_SHM_OPEN, 0);
      goto return_error;
    }

  if (shm_appl->shard_flag == ON)
    {
      proxy_shm_id = br_info_p->proxy_shm_id;

      shm_proxy_p = (T_SHM_PROXY *) uw_shm_open (proxy_shm_id, SHM_PROXY, SHM_MODE_ADMIN);
      if (shm_proxy_p == NULL)
    {
      UW_SET_ERROR_CODE (UW_ER_SHM_OPEN, 0);
      goto return_error;
    }
    }

  return 0;

return_error:
  /* SHARD TODO : NOT IMPLEMENTED YET */
#if 0
  SET_BROKER_ERR_CODE ();
#endif

  /* SHARD TODO : DETACH SHARED MEMORY */

  return -1;
}

static void
proxy_monitor_worker (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
{
  int new_pid;
#if defined(WINDOWS)
  HANDLE phandle;
#endif /* WINDOWS */

  if (proxy_info_p->service_flag != SERVICE_ON || proxy_info_p->pid < 0)
    {
      return;
    }

#if defined(WINDOWS)
  phandle = OpenProcess (SYNCHRONIZE, FALSE, proxy_info_p->pid);
  if (phandle == NULL)
    {
      restart_proxy_server (proxy_info_p, br_index, proxy_index);
      goto shm_init;
    }
  else
    {
      CloseHandle (phandle);
    }
#else /* WINDOWS */
  if (kill (proxy_info_p->pid, 0) < 0)
    {
      SLEEP_MILISEC (1, 0);
      if (kill (proxy_info_p->pid, 0) < 0)
    {
      restart_proxy_server (proxy_info_p, br_index, proxy_index);
      goto shm_init;
    }
    }
#endif /* !WINDOWS */

  if (proxy_info_p->status == PROXY_STATUS_RESTART)
    {
      stop_proxy_server (proxy_info_p, br_index, proxy_index);
      new_pid = run_proxy_server (proxy_info_p, br_index, proxy_index);
      proxy_info_p->pid = new_pid;
      goto shm_init;
    }

  return;

shm_init:
  proxy_info_p->status = PROXY_STATUS_START;
  proxy_info_p->cur_client = 0;
  proxy_info_p->stmt_waiter_count = 0;
}

static THREAD_FUNC
proxy_monitor_thr_f (void *arg)
{
  int tmp_num_busy_uts;
  int proxy_index;
  T_PROXY_INFO *proxy_info_p = NULL;

  while (process_flag)
    {
      tmp_num_busy_uts = 0;
      for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
    {
      proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);

      proxy_monitor_worker (proxy_info_p, br_index, proxy_index);
    }
      SLEEP_MILISEC (0, 100);
    }

#if !defined(WINDOWS)
  return NULL;
#endif
}

#if !defined(WINDOWS)
static THREAD_FUNC
proxy_listener_thr_f (void *arg)
{
  SELECT_MASK allset, rset;
  struct timeval tv;
  struct sockaddr_in proxy_sock_addr;
  T_SOCKLEN proxy_sock_addr_len;
  SOCKET max_fd, client_fd;
  int proxy_id;
  int ret, select_ret;

  while (process_flag)
    {
      FD_ZERO (&allset);
      FD_SET (proxy_sock_fd, &allset);
      broker_set_proxy_fds (&allset);

      rset = allset;

      max_fd = broker_get_proxy_conn_maxfd (proxy_sock_fd);
      tv.tv_sec = 1;
      tv.tv_usec = 0;
      select_ret = select (max_fd, &rset, NULL, NULL, &tv);
      if (select_ret == 0)
    {
      continue;
    }
      else if (select_ret < 0)
    {
      if (errno == EINTR)
        {
          continue;
        }
      continue;
    }

      if (FD_ISSET (proxy_sock_fd, &rset))
    {
      proxy_sock_addr_len = sizeof (proxy_sock_addr);
      client_fd = accept (proxy_sock_fd, (struct sockaddr *) &proxy_sock_addr, &proxy_sock_addr_len);

      ret = broker_add_proxy_conn (client_fd);
      if (ret < 0)
        {
          CLOSE_SOCKET (client_fd);
          client_fd = INVALID_SOCKET;
        }
    }

      while ((client_fd = broker_get_readable_proxy_conn (&rset)) != INVALID_SOCKET)
    {
      ret = read_from_client (client_fd, ((char *) &proxy_id), sizeof (proxy_id));
      if (ret < 0)
        {
          broker_delete_proxy_conn_by_fd (client_fd);

          CLOSE_SOCKET (client_fd);
          client_fd = INVALID_SOCKET;
        }

      proxy_id = htonl (proxy_id);
      ret = broker_register_proxy_conn (client_fd, proxy_id);
      if (ret < 0)
        {
          broker_delete_proxy_conn_by_fd (client_fd);

          CLOSE_SOCKET (client_fd);
          client_fd = INVALID_SOCKET;
        }
    }
    }

  return NULL;
}
#endif /* !WINDOWS */

/*
 * run_proxy_server () -
 *   return: pid
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   proxy_index(in): it's only valid in SHARD! proxy index
 *
 * Note: activate PROXY
 * it's only use in SHARD.
 */
static int
run_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
{
  const char *proxy_exe_name = NAME_PROXY;
  char proxy_shm_id_env_str[32], proxy_id_env_str[32];
  int pid;
#if !defined(WINDOWS)
  char process_name[APPL_SERVER_NAME_MAX_SIZE];
  int i;
#endif

  while (1)
    {
      pthread_mutex_lock (&run_proxy_mutex);
      if (run_proxy_flag)
    {
      pthread_mutex_unlock (&run_proxy_mutex);
      SLEEP_MILISEC (0, 100);
      continue;
    }
      else
    {
      run_proxy_flag = 1;
      pthread_mutex_unlock (&run_proxy_mutex);
      break;
    }
    }

#if !defined(WINDOWS)
  signal (SIGCHLD, SIG_IGN);
#endif

  proxy_info_p->cur_client = 0;

#if !defined(WINDOWS)
  unlink (proxy_info_p->port_name);

  pid = fork ();
  if (pid == 0)
    {
      signal (SIGCHLD, SIG_DFL);

      for (i = 3; i <= max_open_fd; i++)
    {
      close (i);
    }
#endif

      snprintf (proxy_shm_id_env_str, sizeof (proxy_shm_id_env_str), "%s=%d", PROXY_SHM_KEY_STR,
        shm_br->br_info[br_index].proxy_shm_id);
      putenv (proxy_shm_id_env_str);

      snprintf (proxy_id_env_str, sizeof (proxy_id_env_str), "%s=%d", PROXY_ID_ENV_STR, proxy_index);
      putenv (proxy_id_env_str);

#if !defined(WINDOWS)
      if (snprintf (process_name, sizeof (process_name) - 1, "%s_%s_%d", shm_appl->broker_name, proxy_exe_name,
            proxy_index + 1) < 0)
    {
      assert (false);
      exit (0);
    }
#endif /* !WINDOWS */

#if defined(WINDOWS)
      pid = run_child (proxy_exe_name);
#else
      execle (proxy_exe_name, process_name, NULL, environ);
#endif

#if !defined(WINDOWS)
      exit (0);
    }
#endif

  run_proxy_flag = 0;

  return pid;
}

/*
 * stop_proxy_server () -
 *   return: NO_ERROR
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   proxy_index(in): it's only valid in SHARD! proxy index
 *
 * Note: inactivate Proxy
 * it's only use in SHARD.
 */
static int
stop_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
{
  ut_kill_proxy_process (proxy_info_p->pid, shm_br->br_info[br_index].name, proxy_index);

#if defined(WINDOWS)
  /* [CUBRIDSUS-2068] make the broker sleep for 0.1 sec when stopping the cas in order to prevent communication error
   * occurred on windows. */
  SLEEP_MILISEC (0, 100);
#else /* WINDOWS */

  broker_delete_proxy_conn_by_proxy_id (proxy_info_p->proxy_id);
#endif /* !WINDOWS */

  proxy_info_p->pid = 0;
  proxy_info_p->cur_client = 0;

  return 0;
}

/*
 * restart_proxy_server () -
 *   return: void
 *   as_info_p(in): T_APPL_SERVER_INFO
 *   br_index(in): broker index
 *   proxy_index(in): it's only valid in SHARD! proxy index
 *
 * Note: inactivate and activate Proxy
 * it's only use in SHARD.
 */
static void
restart_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
{
  int new_pid;

  stop_proxy_server (proxy_info_p, br_index, proxy_index);

  new_pid = run_proxy_server (proxy_info_p, br_index, proxy_index);
  proxy_info_p->pid = new_pid;
  proxy_info_p->num_restarts++;
}

static void
get_as_sql_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
{
  int ret;
  char dirname[BROKER_PATH_MAX];

  get_cubrid_file (FID_SQL_LOG_DIR, dirname, BROKER_PATH_MAX);

  if (br_shard_flag == ON)
    {
      ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d_%d_%d.sql.log", dirname, broker_name,
              as_info_p->proxy_id + 1, as_info_p->shard_id, as_info_p->shard_cas_id + 1);
    }
  else
    {
      ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d.sql.log", dirname, broker_name, as_index + 1);
    }
  if (ret < 0)
    {
      // bad name
      log_filename[0] = '\0';
    }
}

static void
get_as_slow_log_filename (char *log_filename, int len, char *broker_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
{
  int ret;
  char dirname[BROKER_PATH_MAX];

  get_cubrid_file (FID_SLOW_LOG_DIR, dirname, BROKER_PATH_MAX);

  if (br_shard_flag == ON)
    {
      ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d_%d_%d.slow.log", dirname, broker_name,
              as_info_p->proxy_id + 1, as_info_p->shard_id, as_info_p->shard_cas_id + 1);
    }
  else
    {
      ret = snprintf (log_filename, BROKER_PATH_MAX - 1, "%s%s_%d.slow.log", dirname, broker_name, as_index + 1);
    }
  if (ret < 0)
    {
      // bad name
      log_filename[0] = '\0';
    }
}

static int
read_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout)
{
  int read_len = -1;
  struct pollfd po[1] = { {0, 0, 0} };
  int ret;

  po[0].fd = sock_fd;
  po[0].events = POLLIN;

  do
    {
      ret = poll (po, 1, timeout * 1000);
    }
  while (ret < 0 && errno == EINTR);

  if (ret < 1)          /* ERROR OR TIMEOUT */
    {
      return -1;
    }

  if (po[0].revents & POLLIN)   /* RECEIVE NEW REQUEST */
    {
      read_len = READ_FROM_SOCKET (sock_fd, buf, size);
    }

  return read_len;
}

static int
write_buffer_async (SOCKET sock_fd, char *buf, int size, int timeout)
{
  int write_len = -1;
  struct pollfd po[1] = { {0, 0, 0} };
  int ret;

  po[0].fd = sock_fd;
  po[0].events = POLLOUT;

  do
    {
      ret = poll (po, 1, timeout * 1000);
    }
  while (ret < 0 && errno == EINTR);

  if (ret < 1)          /* ERROR OR TIMEOUT */
    {
      return -1;
    }

  if (po[0].revents & POLLOUT)  /* RECEIVE NEW REQUEST */
    {
      write_len = WRITE_TO_SOCKET (sock_fd, buf, size);
    }

  return write_len;
}