Skip to content

File shard_proxy_io.c

File List > broker > shard_proxy_io.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * shard_proxy_io.c -
 *
 */

#ident "$Id$"


#include <assert.h>
#include <signal.h>
#include <string.h>
#if defined(LINUX)
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif /* LINUX */

#include "porting.h"
#include "shard_proxy_io.h"
#include "shard_proxy_handler.h"
#include "shard_proxy_function.h"
#include "cas_protocol.h"
#include "cas_error.h"
#include "broker_cas_cci.h"
#include "shard_shm.h"
#include "broker_acl.h"
#include "system_parameter.h"

#ifndef min
#define min(a,b)    ((a) < (b) ? (a) : (b))
#endif
#ifndef max
#define max(a,b)    ((a) > (b) ? (a) : (b))
#endif

#if defined (SUPPRESS_STRLEN_WARNING)
#define strlen(s1)  ((int) strlen(s1))
#endif /* defined (SUPPRESS_STRLEN_WARNING) */

#if defined(WINDOWS)
#define O_NONBLOCK      FIONBIO
#define HZ              1000
#endif /* WINDOWS */

#define PROC_TYPE_CLIENT        0
#define PROC_TYPE_CAS           1
#define PROC_TYPE_BROKER        2

#define READ_TYPE       1
#define WRITE_TYPE      2

#define CLIENT_READ_ERROR(i)    io_error(i, PROC_TYPE_CLIENT, READ_TYPE)
#define CLIENT_WRITE_ERROR(i)   io_error(i, PROC_TYPE_CLIENT, WRITE_TYPE)
#define CAS_READ_ERROR(i)       io_error(i, PROC_TYPE_CAS, READ_TYPE)
#define CAS_WRITE_ERROR(i)      io_error(i, PROC_TYPE_CAS, WRITE_TYPE)

#define MAX_NUM_NEW_CLIENT  5

#define PROXY_START_PORT    1
#define GET_CLIENT_PORT(broker_port, proxy_index)   (broker_port) + PROXY_START_PORT + (proxy_index)
#define GET_CAS_PORT(broker_port, proxy_index, proxy_max_count) (broker_port) + PROXY_START_PORT + (proxy_max_count) + (proxy_index)

extern T_SHM_APPL_SERVER *shm_as_p;
extern T_SHM_PROXY *shm_proxy_p;
extern T_PROXY_INFO *proxy_info_p;
extern T_SHM_SHARD_USER *shm_user_p;
extern T_SHM_SHARD_CONN *shm_conn_p;
extern int proxy_id;

extern T_PROXY_HANDLER proxy_Handler;
extern T_PROXY_CONTEXT proxy_Context;

typedef T_CAS_IO *(*T_FUNC_FIND_CAS) (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);

extern void proxy_term (void);
extern bool proxy_Keep_running;

extern const char *rel_build_number (void);

static int shard_io_set_fl (int fd, int flags);
#if defined (ENABLE_UNUSED_FUNCTION)
static int shard_io_clr_fl (int fd, int flags);
static int shard_io_setsockbuf (int fd, int size);
#endif /* ENABLE_UNUSED_FUNCTION */

static void proxy_socket_io_clear (T_SOCKET_IO * sock_io_p);
static int proxy_socket_io_initialize (void);
static void proxy_socket_io_destroy (void);
static T_SOCKET_IO *proxy_socket_io_add (SOCKET fd, bool from_cas);
static T_SOCKET_IO *proxy_socket_io_find (SOCKET fd);
static int proxy_socket_io_new_client (SOCKET lsnr_fd);

static int proxy_process_client_register (T_SOCKET_IO * sock_io_p);
static int proxy_process_client_request (T_SOCKET_IO * sock_io_p);
static int proxy_process_client_conn_error (T_SOCKET_IO * sock_io_p);
static int proxy_process_client_write_error (T_SOCKET_IO * sock_io_p);
static int proxy_process_client_read_error (T_SOCKET_IO * sock_io_p);
static int proxy_process_client_message (T_SOCKET_IO * sock_io_p);
static int proxy_process_cas_register (T_SOCKET_IO * sock_io_p);
static int proxy_process_cas_response (T_SOCKET_IO * sock_io_p);
static int proxy_process_cas_conn_error (T_SOCKET_IO * sock_io_p);
static int proxy_process_cas_write_error (T_SOCKET_IO * sock_io_p);
static int proxy_process_cas_read_error (T_SOCKET_IO * sock_io_p);
static int proxy_process_cas_message (T_SOCKET_IO * sock_io_p);
static int proxy_socket_io_write_internal (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_write_to_cas (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_write_to_client (T_SOCKET_IO * sock_io_p);

static int proxy_socket_io_read_internal (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read_from_cas_next (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read_from_cas_first (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read_from_cas (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read_from_client_next (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read_from_client_first (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read_from_client (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_write (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_write_error (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read (T_SOCKET_IO * sock_io_p);
static void proxy_socket_io_read_error (T_SOCKET_IO * sock_io_p);

static int proxy_client_io_initialize (void);
static void proxy_client_io_destroy (void);
static T_CLIENT_IO *proxy_client_io_new (SOCKET fd, char *driver_info);

static int proxy_cas_io_initialize (int shard_id, T_CAS_IO ** cas_io_pp, int size);
static int proxy_shard_io_initialize (void);
static void proxy_shard_io_destroy (void);
static T_SHARD_IO *proxy_shard_io_find (int shard_id);
static T_CAS_IO *proxy_cas_io_new (int shard_id, int cas_id, SOCKET fd);
static void proxy_cas_io_free (int shard_id, int cas_id);
static T_CAS_IO *proxy_cas_io_find_by_fd (int shard_id, int cas_id, SOCKET fd);

static int proxy_client_add_waiter_by_shard (T_SHARD_IO * shard_io_p, int ctx_cid, int ctx_uid, int timeout);
static void proxy_client_check_waiter_and_wakeup (T_SHARD_IO * shard_io_p, T_CAS_IO * cas_io_p);

#if defined(WINDOWS)
static int proxy_io_inet_lsnr (int port);
static int proxy_io_client_lsnr (void);
static SOCKET proxy_io_client_accept (SOCKET lsnr_fd);
#else /* WINDOWS */
static SOCKET proxy_io_connect_to_broker (void);
static int proxy_io_register_to_broker (void);
static int proxy_io_unixd_lsnr (char *unixd_sock_name);
#endif /* !WINDOWS */
static int proxy_io_cas_lsnr (void);
static SOCKET proxy_io_accept (SOCKET lsnr_fd);
static SOCKET proxy_io_cas_accept (SOCKET lsnr_fd);

static void proxy_init_net_buf (T_NET_BUF * net_buf);

static int proxy_io_make_ex_get_int (char *driver_info, char **buffer, int *argv);
static void proxy_set_conn_info (int func_code, int ctx_cid, int ctx_uid, int shard_id, int cas_id);
static T_CAS_IO *proxy_find_idle_cas_by_asc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);
static T_CAS_IO *proxy_find_idle_cas_by_desc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);
static T_CAS_IO *proxy_find_idle_cas_by_conn_info (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid);
static T_CAS_IO *proxy_cas_alloc_by_shard_and_cas_id (int client_id, int shard_id, int cas_id, int ctx_cid,
                              unsigned int ctx_uid);
static T_CAS_IO *proxy_cas_alloc_anything (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid,
                       T_FUNC_FIND_CAS function);
static int proxy_check_authorization (T_PROXY_CONTEXT * ctx_p, const char *db_name, const char *db_user,
                      const char *db_passwd);

#if defined(LINUX)
static int proxy_get_max_socket (void);
static int proxy_add_epoll_event (int fd, unsigned int events);
static int proxy_mod_epoll_event (int fd, unsigned int events);
static int proxy_del_epoll_event (int fd);

static int max_Socket = 0;
static int ep_Fd = INVALID_SOCKET;
static struct epoll_event *ep_Event = NULL;
#else /* LINUX */
int maxfd = 0;
fd_set rset, wset, allset, wnewset, wallset;
#endif /* !LINUX */

#if defined(WINDOWS)
int broker_port = 0;
int accept_ip_addr = 0;
SOCKET client_lsnr_fd = INVALID_SOCKET;
#else /* WINDOWS */
SOCKET broker_conn_fd = INVALID_SOCKET;
#endif /* !WINDOWS */
SOCKET cas_lsnr_fd = INVALID_SOCKET;

T_SOCKET_IO_GLOBAL proxy_Socket_io;
T_CLIENT_IO_GLOBAL proxy_Client_io;
T_SHARD_IO_GLOBAL proxy_Shard_io;

/* SHARD ONLY SUPPORT client_version.8.2.0 ~ */
int cas_info_size = CAS_INFO_SIZE;

/***
  THIS FUNCTION IS LOCATED IN ORIGINAL CAS FILES
 ***/
#if 1               /* SHARD TODO -- remove this functions -- tigger */
void
init_msg_header (MSG_HEADER * header)
{
  header->msg_body_size_ptr = (int *) (header->buf);
  header->info_ptr = (char *) (header->buf + MSG_HEADER_MSG_SIZE);

  *(header->msg_body_size_ptr) = 0;
  header->info_ptr[CAS_INFO_STATUS] = CAS_INFO_STATUS_INACTIVE;
  header->info_ptr[CAS_INFO_RESERVED_1] = CAS_INFO_RESERVED_DEFAULT;
  header->info_ptr[CAS_INFO_RESERVED_2] = CAS_INFO_RESERVED_DEFAULT;
  header->info_ptr[CAS_INFO_ADDITIONAL_FLAG] = CAS_INFO_RESERVED_DEFAULT;
}

void
set_data_length (char *buffer, int length)
{
  assert (buffer);

  /* length : first 4 bytes */
  *((int *) buffer) = htonl (length);
  return;
}

int
get_data_length (char *buffer)
{
  int length;

  assert (buffer);

  /* length : first 4 bytes */
  length = *((int *) (buffer));
  return ntohl (length);
}

int
get_msg_length (char *buffer)
{
  return (get_data_length (buffer) + MSG_HEADER_SIZE);
}

static int
get_dbinfo_length (char *driver_info)
{
  T_BROKER_VERSION client_version = CAS_MAKE_PROTO_VER (driver_info);

  if (client_version < CAS_MAKE_VER (8, 2, 0))
    {
      return SRV_CON_DB_INFO_SIZE_PRIOR_8_2_0;
    }
  else if (client_version < CAS_MAKE_VER (8, 4, 0))
    {
      return SRV_CON_DB_INFO_SIZE_PRIOR_8_4_0;
    }
  return SRV_CON_DB_INFO_SIZE;
}

int
net_decode_str (char *msg, int msg_size, char *func_code, void ***ret_argv)
{
  int remain_size = msg_size;
  char *cur_p = msg;
  char *argp;
  int i_val;
  void **argv = NULL;
  int argc = 0;

  *ret_argv = (void **) NULL;

  if (remain_size < 1)
    return CAS_ER_COMMUNICATION;

  *func_code = *cur_p;
  cur_p += 1;
  remain_size -= 1;

  while (remain_size > 0)
    {
      if (remain_size < 4)
    {
      FREE_MEM (argv);
      return CAS_ER_COMMUNICATION;
    }
      argp = cur_p;
      memcpy ((char *) &i_val, cur_p, 4);
      i_val = ntohl (i_val);
      remain_size -= 4;
      cur_p += 4;

      if (remain_size < i_val)
    {
      FREE_MEM (argv);
      return CAS_ER_COMMUNICATION;
    }

      argc++;
      argv = (void **) REALLOC (argv, sizeof (void *) * argc);
      if (argv == NULL)
    return CAS_ER_NO_MORE_MEMORY;

      argv[argc - 1] = argp;

      cur_p += i_val;
      remain_size -= i_val;
    }

  *ret_argv = argv;
  return argc;
}
#endif /* if 1 */

static int
shard_io_set_fl (int fd, int flags)
{               /* flags are file status flags to turn on */
#if defined(WINDOWS)
  u_long argp;

  if (flags == O_NONBLOCK)
    {
      argp = 1;         /* 1:non-blocking, 0:blocking */
      if (ioctlsocket ((SOCKET) fd, FIONBIO, &argp) == SOCKET_ERROR)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to ioctlsocket(%d, FIONBIO, %u). " "(error:%d).", fd, argp,
             SOCKET_ERROR);
      return -1;
    }
    }
#else
  int val;

  if ((val = fcntl (fd, F_GETFL, 0)) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_GETFL). (val:%d, errno:%d).", fd, val, errno);
      return -1;
    }

  val |= flags;         /* turn on flags */

  if (fcntl (fd, F_SETFL, val) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_SETFL, %d). (errno:%d).", fd, val, errno);
      return -1;
    }
#endif
  return 1;
}

#if defined (ENABLE_UNUSED_FUNCTION)
static int
shard_io_clr_fl (int fd, int flags)
{               /* flags are file status flags to turn on */
#if defined(WINDOWS)
  u_long argp;

  if (flags == O_NONBLOCK)
    {
      argp = 0;         /* 1:non-blocking, 0:blocking */
      if (ioctlsocket ((SOCKET) fd, FIONBIO, &argp) == SOCKET_ERROR)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to ioctlsocket(%d, FIONBIO, %u). (error:%d).", fd, argp,
             SOCKET_ERROR);
      return -1;
    }
    }
#else
  int val;

  if ((val = fcntl (fd, F_GETFL, 0)) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_GETFL). (val:%d, errno:%d).", fd, val, errno);
      return -1;
    }

  val &= ~flags;        /* turn off flags */

  if (fcntl (fd, F_SETFL, val) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to fcntl(%d, F_SETFL, %d). (errno:%d).", fd, val, errno);
      return -1;
    }
#endif
  return 1;
}

static int
shard_io_setsockbuf (int fd, int size)
{
  int n, val;
  socklen_t len;

  val = size;
  len = sizeof (int);
  if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, (char *) &val, &len) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to getsockopt(%d, SO_SND_BUF, %d, %d). " "(errno:%d).", fd, val, len,
         errno);
      return -1;
    }
  if (val < size)
    {
      val = size;
      len = sizeof (int);
      setsockopt (fd, SOL_SOCKET, SO_SNDBUF, (char *) &val, len);
    }

  val = size;
  len = sizeof (int);
  if (getsockopt (fd, SOL_SOCKET, SO_RCVBUF, (char *) &val, &len) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to getsockopt(%d, SO_RCV_BUF, %d, %d). " "(errno:%d).", fd, val, len,
         errno);

      return -1;
    }
  if (val < size)
    {
      val = size;
      len = sizeof (int);
      setsockopt (fd, SOL_SOCKET, SO_RCVBUF, (char *) &val, len);
    }

  return 1;
}
#endif /* ENABLE_UNUSED_FUNCTION */

char *
proxy_dup_msg (char *msg)
{
  char *p;
  int length;

  length = get_msg_length (msg);

  p = (char *) malloc (length * sizeof (char));
  if (p)
    {
      memcpy (p, msg, length);
    }

  return p;
}

void
proxy_set_con_status_in_tran (char *msg)
{
  int pos;

  assert (msg);

  pos = MSG_HEADER_INFO_SIZE + CAS_INFO_STATUS;

  msg[pos] = CAS_INFO_STATUS_ACTIVE;

  return;
}

void
proxy_set_con_status_out_tran (char *msg)
{
  int pos;

  assert (msg);

  pos = MSG_HEADER_INFO_SIZE + CAS_INFO_STATUS;

  msg[pos] = CAS_INFO_STATUS_INACTIVE;

  return;
}

void
proxy_set_force_out_tran (char *msg)
{
  int pos;

  assert (msg);

  pos = MSG_HEADER_INFO_SIZE + CAS_INFO_ADDITIONAL_FLAG;

  msg[pos] |= CAS_INFO_FLAG_MASK_FORCE_OUT_TRAN;

  return;
}

void
proxy_unset_force_out_tran (char *msg)
{
  int pos;

  assert (msg);

  pos = MSG_HEADER_INFO_SIZE + CAS_INFO_ADDITIONAL_FLAG;

  msg[pos] &= ~CAS_INFO_FLAG_MASK_FORCE_OUT_TRAN;

  return;
}

int
proxy_make_net_buf (T_NET_BUF * net_buf, int size, T_BROKER_VERSION client_version)
{
  net_buf_init (net_buf, client_version);

  net_buf->data = (char *) MALLOC (size);
  if (net_buf->data == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Not enough virtual memory. " "Failed to alloc net buffer. " "(errno:%d, size:%d).", errno, size);
      return -1;
    }
  net_buf->alloc_size = size;

  return 0;
}

static void
proxy_init_net_buf (T_NET_BUF * net_buf)
{
  MSG_HEADER msg_header;

  assert (net_buf);
  assert (net_buf->data);

  init_msg_header (&msg_header);

  *(msg_header.msg_body_size_ptr) = htonl (net_buf->data_size);

  /* length */
  memcpy (net_buf->data, msg_header.buf, NET_BUF_HEADER_MSG_SIZE);

  /* cas info */
  /* 0:cas status */
  msg_header.info_ptr[CAS_INFO_STATUS] = CAS_INFO_STATUS_INACTIVE;

  /* 3:cci default autocommit */
  msg_header.info_ptr[CAS_INFO_ADDITIONAL_FLAG] &= ~CAS_INFO_FLAG_MASK_AUTOCOMMIT;
  msg_header.info_ptr[CAS_INFO_ADDITIONAL_FLAG] |= (shm_as_p->cci_default_autocommit & CAS_INFO_FLAG_MASK_AUTOCOMMIT);

  msg_header.info_ptr[CAS_INFO_ADDITIONAL_FLAG] &= ~CAS_INFO_FLAG_MASK_NEW_SESSION_ID;

  memcpy (net_buf->data + NET_BUF_HEADER_MSG_SIZE, msg_header.info_ptr, CAS_INFO_SIZE);

  return;
}

static int
proxy_io_make_ex_get_int (char *driver_info, char **buffer, int *argv)
{
  int error;
  T_NET_BUF net_buf;
  T_BROKER_VERSION client_version;

  assert (buffer);
  assert (*buffer == NULL);
  assert (argv != NULL);

  client_version = CAS_MAKE_PROTO_VER (driver_info);

  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
      goto error_return;
    }

  proxy_init_net_buf (&net_buf);

  /* error code */
  net_buf_cp_int (&net_buf, 0 /* success */ , NULL);
  /* int arg1 */
  net_buf_cp_int (&net_buf, *argv, NULL);

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  net_buf.data = NULL;

  return (net_buf.data_size + MSG_HEADER_SIZE);

error_return:
  *buffer = NULL;

  return -1;
}

/* error */
int
proxy_io_make_error_msg (char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg,
             char is_in_tran)
{
  int error;
  T_NET_BUF net_buf;
  T_BROKER_VERSION client_version;

  assert (buffer);
  assert (*buffer == NULL);

  client_version = CAS_MAKE_PROTO_VER (driver_info);

  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. " "(error:%d).", error);
      goto error_return;
    }

  proxy_init_net_buf (&net_buf);

  if (is_in_tran)
    {
      proxy_set_con_status_in_tran (net_buf.data);
    }

  if (error_ind != CAS_NO_ERROR)
    {
      if (client_version >= CAS_MAKE_VER (8, 3, 0))
    {
      /* error indicator */
      net_buf_cp_int (&net_buf, error_ind, NULL);
    }
    }

  error_code = proxy_convert_error_code (error_ind, error_code, driver_info, client_version, PROXY_CONV_ERR_TO_OLD);

  /* error code */
  net_buf_cp_int (&net_buf, error_code, NULL);

  if (error_msg && error_msg[0])
    {
      /* error messgae */
      net_buf_cp_str (&net_buf, error_msg, strlen (error_msg) + 1);
    }

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  net_buf.data = NULL;

  PROXY_DEBUG_LOG ("make error to send to the client. " "(error_ind:%d, error_code:%d, errro_msg:%s)", error_ind,
           error_code, (error_msg && error_msg[0]) ? error_msg : "-");

  return (net_buf.data_size + MSG_HEADER_SIZE);

error_return:
  *buffer = NULL;

  return -1;
}

int
proxy_io_make_no_error (char *driver_info, char **buffer)
{
  return proxy_io_make_error_msg (driver_info, buffer, CAS_NO_ERROR, CAS_NO_ERROR, NULL, false);
}

int
proxy_io_make_con_close_ok (char *driver_info, char **buffer)
{
  return proxy_io_make_no_error (driver_info, buffer);
}

int
proxy_io_make_end_tran_ok (char *driver_info, char **buffer)
{
  return proxy_io_make_no_error (driver_info, buffer);
}

int
proxy_io_make_check_cas_ok (char *driver_info, char **buffer)
{
  return proxy_io_make_no_error (driver_info, buffer);
}

int
proxy_io_make_set_db_parameter_ok (char *driver_info, char **buffer)
{
  return proxy_io_make_no_error (driver_info, buffer);
}

int
proxy_io_make_ex_get_isolation_level (char *driver_info, char **buffer, void *argv)
{
  return proxy_io_make_ex_get_int (driver_info, buffer, (int *) argv);
}

int
proxy_io_make_ex_get_lock_timeout (char *driver_info, char **buffer, void *argv)
{
  return proxy_io_make_ex_get_int (driver_info, buffer, (int *) argv);
}

int
proxy_io_make_end_tran_request (char *driver_info, char **buffer, bool commit)
{
  int error;
  T_NET_BUF net_buf;
  unsigned char func_code;
  unsigned char tran_commit;
  T_BROKER_VERSION client_version;

  assert (buffer);
  assert (*buffer == NULL);

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
      goto error_return;
    }

  proxy_init_net_buf (&net_buf);

  func_code = CAS_FC_END_TRAN;
  tran_commit = (commit) ? CCI_TRAN_COMMIT : CCI_TRAN_ROLLBACK;

  /* function code */
  net_buf_cp_byte (&net_buf, (unsigned char) func_code);

  /* arg1 : commit or rollback */
  net_buf_cp_int (&net_buf, NET_SIZE_BYTE, NULL);
  net_buf_cp_byte (&net_buf, tran_commit);

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  net_buf.data = NULL;

  return (net_buf.data_size + MSG_HEADER_SIZE);

error_return:
  *buffer = NULL;

  return -1;
}

#if defined (ENABLE_UNUSED_FUNCTION)
int
proxy_io_make_end_tran_commit (char **buffer)
{
  return proxy_io_make_end_tran_request (buffer, true);
}
#endif /* ENABLE_UNUSED_FUNCTION */

int
proxy_io_make_end_tran_abort (char *driver_info, char **buffer)
{
  return proxy_io_make_end_tran_request (driver_info, buffer, false);
}

int
proxy_io_make_close_req_handle_ok (char *driver_info, char **buffer, bool is_in_tran)
{
  int error;
  char *p;
  T_NET_BUF net_buf;
  T_BROKER_VERSION client_version;

  assert (buffer);
  assert (*buffer == NULL);

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
      goto error_return;
    }

  proxy_init_net_buf (&net_buf);

  p = (char *) (net_buf.data + MSG_HEADER_MSG_SIZE);
  if (is_in_tran)
    {
      p[CAS_INFO_STATUS] = CAS_INFO_STATUS_ACTIVE;
    }
  else
    {
      p[CAS_INFO_STATUS] = CAS_INFO_STATUS_INACTIVE;
    }

  /* error code */
  net_buf_cp_int (&net_buf, 0 /* success */ , NULL);

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  net_buf.data = NULL;

  return (net_buf.data_size + MSG_HEADER_SIZE);

error_return:
  *buffer = NULL;

  return -1;
}

#if defined (ENABLE_UNUSED_FUNCTION)
int
proxy_io_make_close_req_handle_in_tran_ok (char **buffer)
{
  return proxy_io_make_close_req_handle_ok (buffer, true /* in_tran */ );
}
#endif /* ENABLE_UNUSED_FUNCTION */

int
proxy_io_make_close_req_handle_out_tran_ok (char *driver_info, char **buffer)
{
  return proxy_io_make_close_req_handle_ok (driver_info, buffer, false /* out_tran */ );
}

int
proxy_io_make_cursor_close_out_tran_ok (char *driver_info, char **buffer)
{
  return proxy_io_make_close_req_handle_ok (driver_info, buffer, false /* out_tran */ );
}

int
proxy_io_make_get_db_version (char *driver_info, char **buffer)
{
  int error;
  T_NET_BUF net_buf;
  char *p;
  T_BROKER_VERSION client_version;

  assert (buffer);
  assert (*buffer == NULL);

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  error = proxy_make_net_buf (&net_buf, SHARD_NET_BUF_ALLOC_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
      goto error_return;
    }

  proxy_init_net_buf (&net_buf);

  p = (char *) rel_build_number ();

  net_buf_cp_int (&net_buf, 0 /* ok */ , NULL);
  if (p == NULL)
    {
      net_buf_cp_byte (&net_buf, '\0');
    }
  else
    {
      net_buf_cp_str (&net_buf, p, strlen (p) + 1);
    }

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  net_buf.data = NULL;

  return (net_buf.data_size + MSG_HEADER_SIZE);

error_return:
  *buffer = NULL;

  return -1;
}

int
proxy_io_make_client_conn_ok (char *driver_info, char **buffer)
{
  (*buffer) = (char *) malloc (sizeof (int));

  if ((*buffer) == NULL)
    {
      return 0;
    }

  memset ((*buffer), 0, sizeof (int));  /* dummy port id */

  return sizeof (int);
}

int
proxy_io_make_client_proxy_alive (char *driver_info, char **buffer)
{
  int error;
  T_NET_BUF net_buf;
  T_BROKER_VERSION client_version;

  assert (buffer);

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  error = proxy_make_net_buf (&net_buf, MSG_HEADER_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
      *buffer = NULL;
      return -1;
    }

  net_buf.data_size = 0;
  proxy_init_net_buf (&net_buf);

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  net_buf.data = NULL;

  return MSG_HEADER_SIZE;
}

int
proxy_io_make_client_dbinfo_ok (char *driver_info, char **buffer)
{
  char *p;
  char dbms_type;
  int reply_size, reply_nsize;
  int cas_info_size;
  int proxy_pid;
  char broker_info[BROKER_INFO_SIZE];
  T_BROKER_VERSION client_version;
  char oracle_compat_number_behavior = 0;

  assert (buffer);

  memset (broker_info, 0, BROKER_INFO_SIZE);

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V5))
    {
      dbms_type = CAS_PROXY_DBMS_CUBRID;
    }
  else
    {
      dbms_type = CAS_DBMS_CUBRID;
    }

  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V12))
    {
      oracle_compat_number_behavior = prm_get_bool_value (PRM_ID_ORACLE_COMPAT_NUMBER_BEHAVIOR);
    }

  cas_bi_make_broker_info (broker_info, dbms_type, shm_as_p->statement_pooling, shm_as_p->cci_pconnect,
               oracle_compat_number_behavior);

  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V4))
    {
      reply_size = CAS_CONNECTION_REPLY_SIZE;
    }
  else if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V3))
    {
      reply_size = CAS_CONNECTION_REPLY_SIZE_V3;
    }
  else
    {
      reply_size = CAS_CONNECTION_REPLY_SIZE_PRIOR_PROTOCOL_V3;
    }

  *buffer = (char *) malloc (PROXY_CONNECTION_REPLY_SIZE (reply_size) * sizeof (char));
  if (*buffer == NULL)
    {
      return -1;
    }

  reply_nsize = htonl (reply_size);
  cas_info_size = htonl (CAS_INFO_SIZE);
  proxy_pid = htonl (getpid ());

  /* length */
  p = *(buffer);
  memcpy (p, &reply_nsize, PROTOCOL_SIZE);
  p += PROTOCOL_SIZE;

  /* cas info */
  memcpy (p, &cas_info_size, CAS_INFO_SIZE);
  p += CAS_INFO_SIZE;

  /* proxy pid */
  memcpy (p, &proxy_pid, CAS_PID_SIZE);
  p += CAS_PID_SIZE;

  /* brokerinfo */
  memcpy (p, broker_info, BROKER_INFO_SIZE);
  p += BROKER_INFO_SIZE;

  /* proxy id */
  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V4))
    {
      int v = htonl (proxy_id + 1);

      memcpy (p, &v, CAS_PID_SIZE);
      p += CAS_PID_SIZE;
    }

  /* session id */
  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V3))
    {
      memset ((char *) p, 0, DRIVER_SESSION_SIZE);
      p += DRIVER_SESSION_SIZE;
    }
  else
    {
      memset ((char *) p, 0, SESSION_ID_SIZE);
      p += SESSION_ID_SIZE;
    }

  return PROXY_CONNECTION_REPLY_SIZE (reply_size);
}

int
proxy_io_make_client_acl_fail (char *driver_info, char **buffer)
{
  char err_msg[1024];
  snprintf (err_msg, sizeof (err_msg), "Authorization error.(Address is rejected)");

  return proxy_io_make_error_msg (driver_info, buffer, DBMS_ERROR_INDICATOR, CAS_ER_NOT_AUTHORIZED_CLIENT, err_msg,
                  false);
}

int
proxy_io_make_shard_info (char *driver_info, char **buffer)
{
  int error;
  int length;
  int shard_index;
  T_NET_BUF net_buf;
  T_SHARD_CONN *shard_conn_p;
  T_BROKER_VERSION client_version;

  assert (buffer);
  assert (*buffer == NULL);

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  error = proxy_make_net_buf (&net_buf, MSG_HEADER_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
      *buffer = NULL;
      return -1;
    }

  net_buf.data_size = 0;
  proxy_init_net_buf (&net_buf);

  /* shard count */
  net_buf_cp_int (&net_buf, proxy_info_p->max_shard, NULL);

  /* N * shard info */
  for (shard_index = 0; shard_index < shm_conn_p->num_shard_conn; shard_index++)
    {
      shard_conn_p = &shm_conn_p->shard_conn[shard_index];

      /* shard id */
      net_buf_cp_int (&net_buf, shard_index, NULL);

      /* shard db name */
      length = strlen (shard_conn_p->db_name) + 1 /* NTS */ ;
      net_buf_cp_int (&net_buf, length, NULL);
      net_buf_cp_str (&net_buf, shard_conn_p->db_name, length);

      /* shard db server */
      length = strlen (shard_conn_p->db_conn_info) + 1 /* NTS */ ;
      net_buf_cp_int (&net_buf, length, NULL);
      net_buf_cp_str (&net_buf, shard_conn_p->db_conn_info, length);
    }

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  net_buf.data = NULL;
  return (net_buf.data_size + MSG_HEADER_SIZE);
}

int
proxy_io_make_check_cas (char *driver_info, char **buffer)
{
  int error;
  T_NET_BUF net_buf;
  T_BROKER_VERSION client_version;

  assert (buffer);
  assert (*buffer == NULL);

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  error = proxy_make_net_buf (&net_buf, MSG_HEADER_SIZE, client_version);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make net buffer. (error:%d).", error);
      *buffer = NULL;
      return -1;
    }

  net_buf.data_size = 0;
  proxy_init_net_buf (&net_buf);

  /* function code */
  net_buf_cp_byte (&net_buf, (unsigned char) CAS_FC_CHECK_CAS);

  *buffer = net_buf.data;
  set_data_length (*buffer, net_buf.data_size);

  proxy_set_force_out_tran (*buffer);

  net_buf.data = NULL;
  return (net_buf.data_size + MSG_HEADER_SIZE);
}

void
proxy_io_buffer_clear (T_IO_BUFFER * io_buffer)
{
  assert (io_buffer);

  io_buffer->length = 0;
  io_buffer->offset = 0;
  FREE_MEM (io_buffer->data);

  return;
}

static void
proxy_socket_io_clear (T_SOCKET_IO * sock_io_p)
{
  assert (sock_io_p);

/* UNUSED */
#if 0
  ENTER_FUNC ();

  PROXY_DEBUG_LOG ("free socket io.(fd:%d,from_cas:%s,shard/cas:%d/%d).\n", sock_io_p->fd,
           (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS) ? "cas" : "client", sock_io_p->id.shard.shard_id,
           sock_io_p->id.shard.cas_id);
#endif

  sock_io_p->fd = INVALID_SOCKET;
  sock_io_p->status = SOCK_IO_IDLE;
  sock_io_p->ip_addr = 0;
  sock_io_p->from_cas = PROXY_EVENT_FROM_CLIENT;
  sock_io_p->id.shard.shard_id = PROXY_INVALID_SHARD;
  sock_io_p->id.shard.cas_id = PROXY_INVALID_CAS;

  if (sock_io_p->read_event)
    {
      proxy_event_free (sock_io_p->read_event);
      sock_io_p->read_event = NULL;
    }

  if (sock_io_p->write_event)
    {
      proxy_event_free (sock_io_p->write_event);
      sock_io_p->write_event = NULL;
    }

/* UNUSED */
#if 0
  EXIT_FUNC ();
#endif
  return;
}


static int
proxy_socket_io_initialize (void)
{
  int i;
  int size;
  T_SOCKET_IO *sock_io_p;

  if (proxy_Socket_io.ent)
    {
      /* alredy initialized */
      return 0;
    }

#if defined(LINUX)
  proxy_Socket_io.max_socket = max_Socket;
#else /* LINUX */
  proxy_Socket_io.max_socket = MAX_FD;
#endif /* !LINUX */
  proxy_Socket_io.cur_socket = 0;

  size = sizeof (T_SOCKET_IO) * proxy_Socket_io.max_socket;
  proxy_Socket_io.ent = (T_SOCKET_IO *) malloc (size);
  if (proxy_Socket_io.ent == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Not enough virtual memory. " "Failed to alloc socket entry. " "(errno:%d, size:%d).", errno, size);
      return -1;
    }
  memset (proxy_Socket_io.ent, 0, size);

  for (i = 0; i < proxy_Socket_io.max_socket; i++)
    {
      sock_io_p = &(proxy_Socket_io.ent[i]);

      proxy_socket_io_clear (sock_io_p);
    }

  return 0;
}

static void
proxy_socket_io_destroy (void)
{
  int i;
  T_SOCKET_IO *sock_io_p;

  if (proxy_Socket_io.ent == NULL)
    {
      return;
    }

  for (i = 0; i < proxy_Socket_io.max_socket; i++)
    {
      sock_io_p = &(proxy_Socket_io.ent[i]);

      if (sock_io_p->fd != INVALID_SOCKET)
    {
#if defined(LINUX)
      if (sock_io_p->status != SOCK_IO_CLOSE_WAIT)
        {
          (void) proxy_del_epoll_event (sock_io_p->fd);
        }
#else /* LINUX */
      FD_CLR (sock_io_p->fd, &allset);
      FD_CLR (sock_io_p->fd, &wnewset);
#endif /* !LINUX */
      CLOSE_SOCKET (sock_io_p->fd);
    }
      proxy_socket_io_clear (sock_io_p);
    }

  proxy_Socket_io.max_socket = 0;
  proxy_Socket_io.cur_socket = 0;
  FREE_MEM (proxy_Socket_io.ent);

  return;
}

#if defined(PROXY_VERBOSE_DEBUG)
void
proxy_socket_io_print (bool print_all)
{
  int i;
  int client_id, shard_id, cas_id;
  char *from_cas;
  T_SOCKET_IO *sock_io_p;

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* SOCKET IO *");
  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_socket", proxy_Socket_io.max_socket);
  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "cur_socket", proxy_Socket_io.cur_socket);

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s     %-5s %-8s %-16s %-8s %-10s %-10s %-10s", "idx", "fd", "status",
         "ip_addr", "cas", "client_id", "shard_id", "cas_id");
  if (proxy_Socket_io.ent)
    {
      for (i = 0; i < proxy_Socket_io.max_socket; i++)
    {
      sock_io_p = &(proxy_Socket_io.ent[i]);
      if (!print_all && IS_INVALID_SOCKET (sock_io_p->fd))
        {
          continue;
        }

      if (sock_io_p->from_cas)
        {
          client_id = PROXY_INVALID_CAS;
          shard_id = sock_io_p->id.shard.shard_id;
          cas_id = sock_io_p->id.shard.cas_id;
          from_cas = (char *) "cas";
        }
      else
        {
          client_id = sock_io_p->id.client_id;
          shard_id = PROXY_INVALID_SHARD;
          cas_id = PROXY_INVALID_CAS;
          from_cas = (char *) "client";
        }

      PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d]     %-5d %-8d %-16s %-8s %-10d %-10d %-10d", i,
             sock_io_p->fd, sock_io_p->status, inet_ntoa (*((struct in_addr *) &sock_io_p->ip_addr)), from_cas,
             client_id, shard_id, cas_id);

    }
    }

  return;
}
#endif /* PROXY_VERBOSE_DEBUG */

static T_SOCKET_IO *
proxy_socket_io_add (SOCKET fd, bool from_cas)
{
#if defined(LINUX)
  int error;
#endif
  T_SOCKET_IO *sock_io_p;

  assert (proxy_Socket_io.ent);
  if (proxy_Socket_io.ent == NULL)
    {
      return NULL;
    }

  if (fd >= proxy_Socket_io.max_socket)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "socket fd exceeds max socket fd. (fd:%d, max socket fd:%d).", fd,
         proxy_Socket_io.max_socket);
      return NULL;
    }

  if (proxy_Socket_io.cur_socket >= proxy_Socket_io.max_socket)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Number of socket entry exceeds max_socket. " "(current_socket:%d, max_socket:%d).",
         proxy_Socket_io.cur_socket, proxy_Socket_io.max_socket);
      return NULL;
    }

  sock_io_p = &(proxy_Socket_io.ent[fd]);

  if (sock_io_p->fd > INVALID_SOCKET || sock_io_p->status != SOCK_IO_IDLE)
    {
      assert (false);
      proxy_Keep_running = false;
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Receive duplicated socket fd. " "(received socket:%d, status:%d)",
         sock_io_p->fd, sock_io_p->status);
      return NULL;
    }

  shard_io_set_fl (fd, O_NONBLOCK);

#if defined(LINUX)
  error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLOUT);
  if (error < 0)
    {
      CLOSE_SOCKET (fd);
      return NULL;
    }
#else /* LINUX */
  FD_SET (fd, &allset);
  maxfd = max (maxfd, fd);
#endif /* !LINUX */

  sock_io_p->fd = fd;
  sock_io_p->status = SOCK_IO_REG_WAIT;
  sock_io_p->ip_addr = 0;

  sock_io_p->from_cas = from_cas;

  sock_io_p->id.shard.shard_id = PROXY_INVALID_SHARD;
  sock_io_p->id.shard.cas_id = PROXY_INVALID_CAS;

  assert (sock_io_p->read_event == NULL);
  assert (sock_io_p->write_event == NULL);
  sock_io_p->read_event = NULL;
  sock_io_p->write_event = NULL;

  proxy_Socket_io.cur_socket++;

  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New socket io created. (fd:%d).", fd);
#if defined(PROXY_VERBOSE_DEBUG)
  proxy_socket_io_print (false);
#endif /* PROXY_VERBOSE_DEBUG */

  return sock_io_p;
}

int
proxy_socket_io_delete (SOCKET fd)
{
  T_SOCKET_IO *sock_io_p;

  ENTER_FUNC ();

  assert (proxy_Socket_io.ent);
  assert (proxy_Socket_io.cur_socket > 0);

  if (fd >= proxy_Socket_io.max_socket)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "socket fd exceeds max socket fd. (fd:%d, max_socket_fd:%d).", fd,
         proxy_Socket_io.max_socket);
      EXIT_FUNC ();
      return -1;
    }

  sock_io_p = &(proxy_Socket_io.ent[fd]);

  if (sock_io_p->fd != INVALID_SOCKET)
    {
#if defined(LINUX)
      if (sock_io_p->status != SOCK_IO_CLOSE_WAIT)
    {
      (void) proxy_del_epoll_event (sock_io_p->fd);
    }
#else /* LINUX */
      FD_CLR (sock_io_p->fd, &allset);
      FD_CLR (sock_io_p->fd, &wnewset);
#endif /* !LINUX */

      PROXY_DEBUG_LOG ("Close socket. (fd:%d).", sock_io_p->fd);
      CLOSE_SOCKET (sock_io_p->fd);
    }
  proxy_socket_io_clear (sock_io_p);
  proxy_Socket_io.cur_socket--;

  EXIT_FUNC ();
  return 0;
}

int
proxy_io_set_established_by_ctx (T_PROXY_CONTEXT * ctx_p)
{
  T_CLIENT_IO *cli_io_p = NULL;
  T_SOCKET_IO *sock_io_p = NULL;

  /* find client io */
  cli_io_p = proxy_client_io_find_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);
  if (cli_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find client. " "(client_id:%d, context id:%d, context uid:%u).",
         ctx_p->client_id, ctx_p->cid, ctx_p->uid);

      EXIT_FUNC ();
      return -1;
    }

  sock_io_p = proxy_socket_io_find (cli_io_p->fd);
  assert (sock_io_p != NULL);

  sock_io_p->status = SOCK_IO_ESTABLISHED;
  return 0;
}

static T_SOCKET_IO *
proxy_socket_io_find (SOCKET fd)
{
  T_SOCKET_IO *sock_io_p;

  assert (proxy_Socket_io.ent);

  sock_io_p = &(proxy_Socket_io.ent[fd]);
  return (sock_io_p->status == SOCK_IO_IDLE) ? NULL : sock_io_p;
}

int
proxy_socket_set_write_event (T_SOCKET_IO * sock_io_p, T_PROXY_EVENT * event_p)
{
#if defined(LINUX)
  int error;
#endif

  assert (sock_io_p);
  assert (event_p);

  if (sock_io_p->write_event)
    {
      /* the procotol between driver and proxy must be broken */
      goto error_return;
    }

#if defined(LINUX)
  error = proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN | EPOLLOUT);
  if (error < 0)
    {
      goto error_return;
    }
#else /* LINUX */
  FD_SET (sock_io_p->fd, &wnewset);
#endif /* !LINUX */

  event_p->buffer.offset = 0;   // set offset to start of the write buffer
  sock_io_p->write_event = event_p;

  return 0;

error_return:

  if (sock_io_p->write_event)
    {
      proxy_event_free (sock_io_p->write_event);
      sock_io_p->write_event = NULL;
    }
  return -1;
}

static int
proxy_socket_io_new_client (SOCKET lsnr_fd)
{
  int client_ip;
  SOCKET client_fd;
  T_CLIENT_IO *cli_io_p;
  T_SOCKET_IO *sock_io_p;
  T_CLIENT_INFO *client_info_p;
  int proxy_status = 0;
  char driver_info[SRV_CON_CLIENT_INFO_SIZE];
#if !defined (WINDOWS)
  T_PROXY_EVENT *event_p;
  int length;
  int error;
#endif

  proxy_info_p->num_connect_requests++;

#if defined(WINDOWS)
  client_fd = lsnr_fd;
  client_ip = accept_ip_addr;
  memset (driver_info, 0, SRV_CON_CLIENT_INFO_SIZE);
  driver_info[SRV_CON_MSG_IDX_PROTO_VERSION] = CAS_PROTO_UNPACK_NET_VER (PROTOCOL_V4);
  driver_info[SRV_CON_MSG_IDX_FUNCTION_FLAG] = BROKER_RENEWED_ERROR_CODE | BROKER_SUPPORT_HOLDABLE_RESULT;
#else /* WINDOWS */
  client_fd = recv_fd (lsnr_fd, &client_ip, driver_info);
  if (client_fd < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to receive socket fd. " "(lsnf_fd:%d, client_fd:%d).", lsnr_fd,
         client_fd);

      /* shard_broker must be abnormal status */
      proxy_Keep_running = false;
      return -1;
    }

  proxy_status = 0;
  length = WRITESOCKET (lsnr_fd, &proxy_status, sizeof (proxy_status));
  if (length != sizeof (proxy_status))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send proxy status to broker. " "(lsnr_fd:%d).", lsnr_fd);
      CLOSE_SOCKET (client_fd);
      return -1;
    }
#endif /* !WINDOWS */

  ENTER_FUNC ();

  sock_io_p = proxy_socket_io_add (client_fd, PROXY_IO_FROM_CLIENT);
  if (sock_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to add socket entry. (client fd:%d).", client_fd);
      CLOSE_SOCKET (client_fd);
      return -1;
    }

  cli_io_p = proxy_client_io_new (client_fd, driver_info);
  if (cli_io_p == NULL)
    {
      proxy_socket_io_delete (client_fd);
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to create client entry. (client fd:%d).", client_fd);
      return -1;
    }

  sock_io_p->ip_addr = client_ip;
  sock_io_p->id.client_id = cli_io_p->client_id;

  /* set shared memory T_CLIENT_INFO */
  client_info_p = shard_shm_get_client_info (proxy_info_p, cli_io_p->client_id);
  client_info_p->client_id = cli_io_p->client_id;
  client_info_p->client_ip = client_ip;
  client_info_p->connect_time = time (NULL);
  memcpy (client_info_p->driver_info, cli_io_p->driver_info, SRV_CON_CLIENT_INFO_SIZE);

#if !defined(WINDOWS)
  /* send client_conn_ok to the client */
  event_p =
    proxy_event_new_with_rsp (cli_io_p->driver_info, PROXY_EVENT_IO_WRITE, PROXY_EVENT_FROM_CLIENT,
                  proxy_io_make_client_conn_ok);
  if (event_p == NULL)
    {
      proxy_socket_io_read_error (sock_io_p);
      EXIT_FUNC ();
      return -1;
    }

  /* set write event to the socket io */
  error = proxy_socket_set_write_event (sock_io_p, event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "(fd:%d). event(%s).", client_fd,
         proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_socket_io_read_error (sock_io_p);

      EXIT_FUNC ();
      return -1;
    }
#endif /* !WINDOWS */

  EXIT_FUNC ();

  return 0;
}

static int
proxy_process_client_register (T_SOCKET_IO * sock_io_p)
{
  int error = 0;
  char *db_name = NULL, *db_user = NULL, *db_passwd = NULL;
  char *url = NULL, *db_sessionid = NULL;
  struct timeval client_start_time;
  T_PROXY_CONTEXT *ctx_p;
  T_IO_BUFFER *read_buffer;
  T_PROXY_EVENT *event_p;
  T_CLIENT_INFO *client_info_p;
  unsigned char *ip_addr;
  char len;
  char *driver_info;
  T_BROKER_VERSION client_version;
  char driver_version[SRV_CON_VER_STR_MAX_SIZE];
  char err_msg[256];

  ENTER_FUNC ();

  assert (sock_io_p);
  assert (sock_io_p->read_event);

  driver_info = proxy_get_driver_info_by_fd (sock_io_p);

  gettimeofday (&client_start_time, NULL);

  read_buffer = &(sock_io_p->read_event->buffer);
  assert (read_buffer);     // __FOR_DEBUG

  db_name = read_buffer->data;
  db_name[SRV_CON_DBNAME_SIZE - 1] = '\0';

  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find context for this socket. (fd:%d)", sock_io_p->fd);
      error = -1;
      goto clear_event_and_return;
    }

  if (ctx_p->error_ind != CAS_NO_ERROR)
    {
      /* Skip authorization and process error */
      goto connection_established;
    }

  if (strcmp (db_name, HEALTH_CHECK_DUMMY_DB) == 0)
    {
      PROXY_DEBUG_LOG ("Incoming health check request from client.");
      /* send proxy_alive response to the client */
      event_p =
    proxy_event_new_with_rsp (driver_info, PROXY_EVENT_IO_WRITE, PROXY_EVENT_FROM_CLIENT,
                  proxy_io_make_client_proxy_alive);
      if (event_p == NULL)
    {
      proxy_socket_io_read_error (sock_io_p);
      EXIT_FUNC ();
      return -1;
    }

      /* set write event to the socket io */
      error = proxy_socket_set_write_event (sock_io_p, event_p);
      if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "(fd:%d). event(%s).", sock_io_p->fd,
             proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_socket_io_read_error (sock_io_p);
      EXIT_FUNC ();
      return -1;
    }
      goto clear_event_and_return;
    }

  db_user = db_name + SRV_CON_DBNAME_SIZE;
  db_user[SRV_CON_DBUSER_SIZE - 1] = '\0';
  if (db_user[0] == '\0')
    {
      strcpy (db_user, "PUBLIC");
    }

  db_passwd = db_user + SRV_CON_DBUSER_SIZE;
  db_passwd[SRV_CON_DBPASSWD_SIZE - 1] = '\0';

  client_version = CAS_MAKE_PROTO_VER (driver_info);
  if (client_version >= CAS_MAKE_VER (8, 2, 0))
    {
      url = db_passwd + SRV_CON_DBPASSWD_SIZE;
      url[SRV_CON_URL_SIZE - 1] = '\0';
      driver_version[0] = '\0';
      if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V5))
    {
      len = *(url + strlen (url) + 1);
      if (len > 0 && len < SRV_CON_VER_STR_MAX_SIZE)
        {
          memcpy (driver_version, url + strlen (url) + 2, (int) len);
          driver_version[(int) len] = '\0';
        }
      else
        {
          snprintf (driver_version, SRV_CON_VER_STR_MAX_SIZE, "PROTOCOL V%d",
            (int) (CAS_PROTO_VER_MASK & client_version));
        }
    }
      else if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V1))
    {
      char *ver;

      CAS_PROTO_TO_VER_STR (&ver, (int) (CAS_PROTO_VER_MASK & client_version));

      strncpy_bufsize (driver_version, ver);
    }
      else
    {
      snprintf (driver_version, SRV_CON_VER_STR_MAX_SIZE, "%d.%d.%d", CAS_VER_TO_MAJOR (client_version),
            CAS_VER_TO_MINOR (client_version), CAS_VER_TO_PATCH (client_version));
    }
      client_info_p = shard_shm_get_client_info (proxy_info_p, sock_io_p->id.client_id);
      if (client_info_p)
    {
      memcpy (client_info_p->driver_version, driver_version, sizeof (driver_version));
    }
    }

  /* SHARD DO NOT SUPPORT SESSION */
  if (client_version >= CAS_MAKE_VER (8, 4, 0))
    {
      db_sessionid = url + SRV_CON_URL_SIZE;
      db_sessionid[SRV_CON_DBSESS_ID_SIZE - 1] = '\0';
    }

  ip_addr = (unsigned char *) (&sock_io_p->ip_addr);

  /* check acl */
  if (shm_as_p->access_control)
    {
      if (access_control_check_right (shm_as_p, db_name, db_user, ip_addr) < 0)
    {
      proxy_info_p->num_connect_rejected++;

      snprintf (err_msg, sizeof (err_msg), "Authorization error.(Address is rejected)");
      proxy_context_set_error_with_msg (ctx_p, DBMS_ERROR_INDICATOR, CAS_ER_NOT_AUTHORIZED_CLIENT, err_msg);

      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Authentication failure. " "(db_name:[%s], db_user:[%s], db_passwd:[%s]).",
             db_name, db_user, db_passwd);

      if (shm_as_p->access_log == ON)
        {
          proxy_access_log (&client_start_time, sock_io_p->ip_addr, (db_name) ? (const char *) db_name : "-",
                (db_user) ? (const char *) db_user : "-", true);
        }

      goto connection_established;
    }
    }

  error = proxy_check_authorization (ctx_p, db_name, db_user, db_passwd);
  if (error < 0)
    {
      goto connection_established;
    }

  strncpy (ctx_p->database_user, db_user, SRV_CON_DBUSER_SIZE - 1);
  strncpy (ctx_p->database_passwd, db_passwd, SRV_CON_DBPASSWD_SIZE - 1);

  if (proxy_info_p->fixed_shard_user == false)
    {
      event_p =
    proxy_event_new_with_rsp (driver_info, PROXY_EVENT_CLIENT_REQUEST, PROXY_EVENT_FROM_CLIENT,
                  proxy_io_make_check_cas);
      if (event_p == NULL)
    {
      proxy_socket_io_read_error (sock_io_p);

      EXIT_FUNC ();
      return -1;
    }

      proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);

      error = shard_queue_enqueue (&proxy_Handler.cli_rcv_q, (void *) event_p);
      if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
             proxy_str_context (ctx_p), proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_context_free (ctx_p);

      EXIT_FUNC ();
      return -1;
    }

      goto clear_event_and_return;
    }

connection_established:
  if (ctx_p->error_ind != CAS_NO_ERROR)
    {
      /*
       * Process error message if exists.
       * context will be freed after sending error message.
       */
      proxy_context_send_error (ctx_p);
      proxy_context_clear_error (ctx_p);

      ctx_p->free_on_client_io_write = true;
    }
  else
    {
      /* send dbinfo_ok to the client */
      event_p =
    proxy_event_new_with_rsp (driver_info, PROXY_EVENT_IO_WRITE, PROXY_EVENT_FROM_CLIENT,
                  proxy_io_make_client_dbinfo_ok);
      if (event_p == NULL)
    {
      proxy_socket_io_read_error (sock_io_p);
      EXIT_FUNC ();
      return -1;
    }

      /* set write event to the socket io */
      error = proxy_socket_set_write_event (sock_io_p, event_p);
      if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "(fd:%d). event(%s).", sock_io_p->fd,
             proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_socket_io_read_error (sock_io_p);
      EXIT_FUNC ();
      return -1;
    }

      ctx_p->is_connected = true;

      sock_io_p->status = SOCK_IO_ESTABLISHED;

      if (shm_as_p->access_log == ON)
    {
      proxy_access_log (&client_start_time, sock_io_p->ip_addr, (db_name) ? (const char *) db_name : "-",
                (db_user) ? (const char *) db_user : "-", true);
    }
    }

clear_event_and_return:
  assert (sock_io_p->read_event);
  proxy_event_free (sock_io_p->read_event);
  sock_io_p->read_event = NULL;

  EXIT_FUNC ();
  return error;
}

static int
proxy_process_client_request (T_SOCKET_IO * sock_io_p)
{
  int error;
  T_PROXY_CONTEXT *ctx_p;
  T_PROXY_EVENT *event_p;

  ENTER_FUNC ();

  assert (sock_io_p);
  assert (sock_io_p->read_event);
  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CLIENT);

  event_p = sock_io_p->read_event;
  sock_io_p->read_event = NULL;

  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
  if (ctx_p == NULL)
    {
      proxy_event_free (event_p);
      event_p = NULL;
      return -1;
    }

  proxy_event_set_type_from (event_p, PROXY_EVENT_CLIENT_REQUEST, PROXY_EVENT_FROM_CLIENT);
  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);

  error = shard_queue_enqueue (&proxy_Handler.cli_rcv_q, (void *) event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
         proxy_str_context (ctx_p), proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_context_free (ctx_p);

      EXIT_FUNC ();
      return -1;
    }

  EXIT_FUNC ();
  return 0;
}

static int
proxy_process_client_conn_error (T_SOCKET_IO * sock_io_p)
{
  int error = 0;
  T_PROXY_CONTEXT *ctx_p;
  T_PROXY_EVENT *event_p;

  ENTER_FUNC ();

  assert (sock_io_p);
  assert (sock_io_p->fd);
  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CLIENT);

  sock_io_p->status = SOCK_IO_CLOSE_WAIT;

#if defined(LINUX)
  error = proxy_del_epoll_event (sock_io_p->fd);
  if (error < 0)
    {
      return -1;
    }
#else /* LINUX */
  /* disable socket read/write */
  FD_CLR (sock_io_p->fd, &allset);
  FD_CLR (sock_io_p->fd, &wnewset);
#endif /* !LINUX */

  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
  if (ctx_p == NULL)
    {
      return -1;
    }

  event_p = proxy_event_new (PROXY_EVENT_CLIENT_CONN_ERROR, PROXY_EVENT_FROM_CLIENT);
  if (event_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make new event. (%s, %s).", "PROXY_EVENT_CLIENT_CONN_ERROR",
         "PROXY_EVENT_FROM_CLIENT");

      proxy_context_free (ctx_p);

      EXIT_FUNC ();
      return -1;
    }
  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);

  error = shard_queue_enqueue (&proxy_Handler.cli_rcv_q, (void *) event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
         proxy_str_context (ctx_p), proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_context_free (ctx_p);

      EXIT_FUNC ();
      return -1;
    }

  EXIT_FUNC ();
  return 0;
}

static int
proxy_process_client_write_error (T_SOCKET_IO * sock_io_p)
{
  int error;

  if (sock_io_p->write_event)
    {
      proxy_event_free (sock_io_p->write_event);
      sock_io_p->write_event = NULL;
    }

  error = proxy_process_client_conn_error (sock_io_p);
  if (error)
    {
      if (sock_io_p->fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (sock_io_p->fd);
    }
    }

  return error;
}

static int
proxy_process_client_read_error (T_SOCKET_IO * sock_io_p)
{
  int error;

  assert (sock_io_p);

#if defined(LINUX)
  /*
   * If connection error event was triggered by EPOLLERR, EPOLLHUP,
   * there could be no error events.
   */
#else /* LINUX */
  assert (sock_io_p->read_event);
#endif /* !LINUX */
  if (sock_io_p->read_event)
    {
      proxy_event_free (sock_io_p->read_event);
      sock_io_p->read_event = NULL;
    }

  error = proxy_process_client_conn_error (sock_io_p);
  if (error)
    {
      if (sock_io_p->fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (sock_io_p->fd);
    }
    }

  return error;
}

static int
proxy_process_client_message (T_SOCKET_IO * sock_io_p)
{
  int error = 0;

  assert (sock_io_p);

  switch (sock_io_p->status)
    {
    case SOCK_IO_REG_WAIT:
      error = proxy_process_client_register (sock_io_p);
      break;

    case SOCK_IO_ESTABLISHED:
      error = proxy_process_client_request (sock_io_p);
      break;

    default:
      break;
    }

  return error;
}

static int
proxy_process_cas_register (T_SOCKET_IO * sock_io_p)
{
  int length;
  char *p;
  int shard_id = PROXY_INVALID_SHARD, cas_id = PROXY_INVALID_CAS;
  char func_code;
  T_SHARD_IO *shard_io_p = NULL;
  T_CAS_IO *cas_io_p = NULL;
  T_IO_BUFFER *read_buffer;

  ENTER_FUNC ();

  assert (sock_io_p);
  assert (sock_io_p->read_event);

  read_buffer = &(sock_io_p->read_event->buffer);

  length = get_msg_length (read_buffer->data);
  assert (read_buffer->offset == length);

  /* func code */
  p = (char *) (read_buffer->data + MSG_HEADER_SIZE);
  memcpy (&func_code, p, sizeof (char));
  p += sizeof (char);

  /* shard id */
  memcpy (&shard_id, p, sizeof (int));
  shard_id = ntohl (shard_id);
  p += sizeof (int);

  memcpy (&cas_id, p, sizeof (int));
  cas_id = ntohl (cas_id);

  cas_io_p = proxy_cas_io_new (shard_id, cas_id, sock_io_p->fd);
  if (cas_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to create CAS entry. " "(shard_id:%d, cas_id:%d, fd:%d).", shard_id,
         cas_id, sock_io_p->fd);
      goto error_return;
    }
  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS);

  /* fill socket io entry */
  sock_io_p->status = SOCK_IO_ESTABLISHED;
  sock_io_p->id.shard.shard_id = shard_id;
  sock_io_p->id.shard.cas_id = cas_id;

  /* SHARD TODO : !!! check protocol */


  shard_io_p = proxy_shard_io_find (shard_id);
  assert (shard_io_p);

  /* set cas io status */
  cas_io_p->status = CAS_IO_CONNECTED;

  PROXY_DEBUG_LOG ("New CAS registered. (shard_id:%d). CAS(%s).", shard_io_p->shard_id, proxy_str_cas_io (cas_io_p));

  proxy_client_check_waiter_and_wakeup (shard_io_p, cas_io_p);

  /* in this case, we should free event now */
  proxy_event_free (sock_io_p->read_event);
  sock_io_p->read_event = NULL;

  EXIT_FUNC ();
  return 0;

error_return:

  /* in this case, we should free event now */
  proxy_event_free (sock_io_p->read_event);
  sock_io_p->read_event = NULL;

  if (cas_io_p && shard_id >= 0 && cas_id >= 0)
    {
      proxy_cas_io_free (shard_id, cas_id);
    }
  else
    {
      /* cas have to retry register to proxy. */
      proxy_socket_io_delete (sock_io_p->fd);
    }

  EXIT_FUNC ();
  return -1;
}

static int
proxy_process_cas_response (T_SOCKET_IO * sock_io_p)
{
  int error;
  T_PROXY_CONTEXT *ctx_p;
  T_CAS_IO *cas_io_p;
  T_PROXY_EVENT *event_p;

  ENTER_FUNC ();

  assert (sock_io_p);
  assert (sock_io_p->read_event);
  assert (sock_io_p->status != SOCK_IO_IDLE);
  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS);

  event_p = sock_io_p->read_event;
  sock_io_p->read_event = NULL;

  cas_io_p = proxy_cas_io_find_by_fd (sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd);
  if (cas_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(shard_id:%d, cas_id:%d, fd:%d). " "event(%s).",
         sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd, proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_socket_io_delete (sock_io_p->fd);
      EXIT_FUNC ();
      return -1;
    }

  if (cas_io_p->is_in_tran == false)
    {
      /* in case, cas session timeout !!! */

      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Unexpected CAS transaction status. " "(expected tran status:%d). CAS(%s). event(%s).", true,
         proxy_str_cas_io (cas_io_p), proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);
      EXIT_FUNC ();
      return -1;
    }

  ctx_p = proxy_context_find (cas_io_p->ctx_cid, cas_io_p->ctx_uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. CAS(%s). event(%s).", proxy_str_cas_io (cas_io_p),
         proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);
      EXIT_FUNC ();
      return -1;
    }

  proxy_event_set_type_from (event_p, PROXY_EVENT_CAS_RESPONSE, PROXY_EVENT_FROM_CAS);
  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);

  error = shard_queue_enqueue (&proxy_Handler.cas_rcv_q, (void *) event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
         proxy_str_context (ctx_p), proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      proxy_context_free (ctx_p);

      EXIT_FUNC ();
      return -1;
    }

  /* in this case, we should detach event from socket io */
  sock_io_p->read_event = NULL;

  EXIT_FUNC ();
  return 0;
}

static int
proxy_process_cas_conn_error (T_SOCKET_IO * sock_io_p)
{
  int error;
  T_PROXY_CONTEXT *ctx_p;
  T_CAS_IO *cas_io_p;
  T_PROXY_EVENT *event_p;

  ENTER_FUNC ();

  assert (sock_io_p);
  assert (sock_io_p->fd != INVALID_SOCKET);
  assert (sock_io_p->from_cas == PROXY_EVENT_FROM_CAS);

  sock_io_p->status = SOCK_IO_CLOSE_WAIT;

#if defined(LINUX)
  error = proxy_del_epoll_event (sock_io_p->fd);
  if (error == -1)
    {
      return -1;
    }
#else /* LINUX */
  /* disable socket read/write */
  FD_CLR (sock_io_p->fd, &allset);
  FD_CLR (sock_io_p->fd, &wnewset);
#endif /* !LINUX */

  cas_io_p = proxy_cas_io_find_by_fd (sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd);
  if (cas_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(shard_id:%d, cas_id:%d, fd:%d). ",
         sock_io_p->id.shard.shard_id, sock_io_p->id.shard.cas_id, sock_io_p->fd);

      proxy_socket_io_delete (sock_io_p->fd);
      EXIT_FUNC ();
      return -1;
    }

  PROXY_DEBUG_LOG ("Detect CAS connection error. CAS(%s).", proxy_str_cas_io (cas_io_p));

  if (cas_io_p->is_in_tran == false)
    {
      /* __FOR_DEBUG */
      assert (cas_io_p->ctx_cid == PROXY_INVALID_CONTEXT);
      assert (cas_io_p->ctx_uid == 0);

      proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);

      EXIT_FUNC ();
      return -1;
    }
  cas_io_p->status = CAS_IO_CLOSE_WAIT;


  ctx_p = proxy_context_find (cas_io_p->ctx_cid, cas_io_p->ctx_uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. CAS(%s).", proxy_str_cas_io (cas_io_p));

      proxy_cas_io_free (cas_io_p->shard_id, cas_io_p->cas_id);

      EXIT_FUNC ();
      return -1;
    }

  event_p = proxy_event_new (PROXY_EVENT_CAS_CONN_ERROR, PROXY_EVENT_FROM_CAS);
  if (event_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make new event. (%s, %s). context(%s).", "PROXY_EVENT_CAS_CONN_ERROR",
         "PROXY_EVENT_FROM_CAS", proxy_str_context (ctx_p));

      proxy_context_free (ctx_p);

      EXIT_FUNC ();
      return -1;
    }
  proxy_event_set_context (event_p, ctx_p->cid, ctx_p->uid);

  error = shard_queue_enqueue (&proxy_Handler.cas_rcv_q, (void *) event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue client event. " "context(%s). event(%s).",
         proxy_str_context (ctx_p), proxy_str_event (event_p));
      proxy_event_free (event_p);
      event_p = NULL;

      proxy_context_free (ctx_p);
      EXIT_FUNC ();
      return -1;
    }

  EXIT_FUNC ();
  return 0;
}

int
proxy_process_cas_write_error (T_SOCKET_IO * sock_io_p)
{
  int error;

  if (sock_io_p->write_event)
    {
      proxy_event_free (sock_io_p->read_event);
      sock_io_p->read_event = NULL;
    }

  error = proxy_process_cas_conn_error (sock_io_p);
  if (error)
    {
      if (sock_io_p->fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (sock_io_p->fd);
    }
    }

  return error;
}

int
proxy_process_cas_read_error (T_SOCKET_IO * sock_io_p)
{
  int error;

  assert (sock_io_p);

#if defined(LINUX)
  /*
   * If connection error event was triggered by EPOLLERR, EPOLLHUP,
   * there could be no error events.
   */
#else /* LINUX */
  assert (sock_io_p->read_event);
#endif /* !LINUX */
  if (sock_io_p->read_event)
    {
      proxy_event_free (sock_io_p->read_event);
      sock_io_p->read_event = NULL;
    }

  error = proxy_process_cas_conn_error (sock_io_p);
  if (error)
    {
      if (sock_io_p->fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (sock_io_p->fd);
    }
    }

  return error;
}

static int
proxy_process_cas_message (T_SOCKET_IO * sock_io_p)
{
  int error = 0;

  assert (sock_io_p);

  switch (sock_io_p->status)
    {
    case SOCK_IO_REG_WAIT:
      error = proxy_process_cas_register (sock_io_p);
      break;

    case SOCK_IO_ESTABLISHED:
      error = proxy_process_cas_response (sock_io_p);
      break;

    default:
      break;
    }

  return error;
}

static int
proxy_socket_io_write_internal (T_SOCKET_IO * sock_io_p)
{
  int write_len;
  int remain;
  char *p;
  T_IO_BUFFER *send_buffer;

  // __FOR_DEBUG
  assert (sock_io_p);
  assert (sock_io_p->write_event);

  send_buffer = &(sock_io_p->write_event->buffer);

  if (send_buffer->length == send_buffer->offset)
    {
      write_len = 0;
      goto write_end;
    }
  else if (send_buffer->length < send_buffer->offset)
    {
      assert (false);

      write_len = -1;
      goto write_end;
    }

  remain = send_buffer->length - send_buffer->offset;

  p = (char *) (send_buffer->data + send_buffer->offset);

  write_len = WRITESOCKET (sock_io_p->fd, p, remain);
  if (write_len < 0)
    {
#if defined(WINDOWS)
      int error;

      error = WSAGetLastError ();
      if (error == WSAEWOULDBLOCK)
#else
      if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR))
#endif
    {
#if defined(LINUX)
      /* Nothing to do. epoll events has not been changed. */
#else /* LINUX */
      FD_SET (sock_io_p->fd, &wnewset);
#endif /* !LINUX */
      return 0;
    }

      goto write_end;
    }

  send_buffer->offset += write_len;

  if (send_buffer->offset < send_buffer->length)
    {
#if defined(LINUX)
      /* Nothing to do. epoll events has not been changed. */
#else /* LINUX */
      FD_SET (sock_io_p->fd, &wnewset);
#endif /* !LINUX */
      return write_len;
    }

write_end:

  proxy_event_free (sock_io_p->write_event);
  sock_io_p->write_event = NULL;

#if defined(LINUX)
  (void) proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN);
#else /* LINUX */
  FD_CLR (sock_io_p->fd, &wnewset);
#endif /* !LINUX */

  return write_len;
}

static void
proxy_socket_io_write_to_cas (T_SOCKET_IO * sock_io_p)
{
  int len;

  assert (sock_io_p);

  len = proxy_socket_io_write_internal (sock_io_p);
  if (len < 0)
    {
      proxy_socket_io_write_error (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_write_to_client (T_SOCKET_IO * sock_io_p)
{
  int len;
  T_PROXY_CONTEXT *ctx_p;
  T_CLIENT_INFO *client_info_p;

  assert (sock_io_p);

  len = proxy_socket_io_write_internal (sock_io_p);
  if (len < 0)
    {
      proxy_socket_io_write_error (sock_io_p);
    }

  ctx_p = proxy_context_find_by_socket_client_io (sock_io_p);
  if (ctx_p == NULL)
    {
      return;
    }

  if (ctx_p->free_on_client_io_write && sock_io_p->write_event == NULL)
    {
      /* init shared memory T_CLIENT_INFO */
      client_info_p = shard_shm_get_client_info (proxy_info_p, sock_io_p->id.client_id);
      shard_shm_init_client_info (client_info_p);

      proxy_context_free (ctx_p);
    }

  return;
}

static int
proxy_socket_io_read_internal (T_SOCKET_IO * sock_io_p)
{
  int error;
  int read_len, remain, total_len;
  char *buffer;
  T_IO_BUFFER *read_buffer;

  assert (sock_io_p);
  assert (sock_io_p->read_event);

read_again:

  read_buffer = &(sock_io_p->read_event->buffer);
  buffer = (char *) (read_buffer->data + read_buffer->offset);
  remain = read_buffer->length - read_buffer->offset;

  read_len = READSOCKET (sock_io_p->fd, buffer, remain);
  if (read_len < 0)
    {
#if defined(WINDOWS)
      error = WSAGetLastError ();
      if ((error == WSAECONNRESET) || (error == WSAECONNABORTED))
    {
      return -1;        /* failed */
    }
      else if (error == WSAEWOULDBLOCK)
#else
      if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == EINTR))
#endif
    {
      return 0;     /* retry */
    }
      return -1;        /* disconnected */
    }
  else if (read_len == 0)
    {
      return -1;        /* disconnected */
    }

  read_buffer->offset += read_len;

  /* common message header */
  if (read_buffer->length == MSG_HEADER_SIZE && read_buffer->length == read_buffer->offset)
    {
      /* expand buffer to receive payload */
      total_len = get_msg_length (read_buffer->data);
      if (total_len == read_buffer->offset)
    {
      /* no more data */
      return read_buffer->offset;
    }

      error = proxy_event_realloc_buffer (sock_io_p->read_event, total_len);
      if (error)
    {
      PROXY_DEBUG_LOG ("Failed to realloc event buffer. (error:%d).", error);
      proxy_socket_io_read_error (sock_io_p);
      return -1;
    }

      goto read_again;
    }

  return read_len;
}

/* proxy_socket_io_read_internal */
static void
proxy_socket_io_read_from_cas_next (T_SOCKET_IO * sock_io_p)
{
  int error;

  assert (sock_io_p);

  error = proxy_socket_io_read_internal (sock_io_p);
  if (error < 0)
    {
      proxy_socket_io_read_error (sock_io_p);
      return;
    }

  if (proxy_event_io_read_complete (sock_io_p->read_event))
    {
      proxy_process_cas_message (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_read_from_cas_first (T_SOCKET_IO * sock_io_p)
{
  int error;
  int length;

  assert (sock_io_p);
  assert (sock_io_p->read_event);

  length = MSG_HEADER_SIZE;
  error = proxy_event_alloc_buffer (sock_io_p->read_event, length);
  if (error)
    {
      proxy_socket_io_read_error (sock_io_p);
      return;
    }

  proxy_socket_io_read_from_cas_next (sock_io_p);
}

static void
proxy_socket_io_read_from_cas (T_SOCKET_IO * sock_io_p)
{
  T_IO_BUFFER *r_buf;

  assert (sock_io_p);
  assert (sock_io_p->read_event);

  r_buf = &(sock_io_p->read_event->buffer);

  if (r_buf->length == 0)
    {
      proxy_socket_io_read_from_cas_first (sock_io_p);
    }
  else
    {
      proxy_socket_io_read_from_cas_next (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_read_from_client_next (T_SOCKET_IO * sock_io_p)
{
  int error;

  assert (sock_io_p);

  error = proxy_socket_io_read_internal (sock_io_p);
  if (error < 0)
    {
      proxy_socket_io_read_error (sock_io_p);
      return;
    }

  if (proxy_event_io_read_complete (sock_io_p->read_event))
    {
      proxy_process_client_message (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_read_from_client_first (T_SOCKET_IO * sock_io_p)
{
  int error;
  int length;
  char *driver_info;

  assert (sock_io_p);
  assert (sock_io_p->read_event);

  if (sock_io_p->status == SOCK_IO_REG_WAIT)
    {
      driver_info = proxy_get_driver_info_by_fd (sock_io_p);
      length = get_dbinfo_length (driver_info);
    }
  else
    {
      length = MSG_HEADER_SIZE;
    }

  error = proxy_event_alloc_buffer (sock_io_p->read_event, length);
  if (error)
    {
      proxy_socket_io_write_error (sock_io_p);
      return;
    }

  proxy_socket_io_read_from_client_next (sock_io_p);
}

static void
proxy_socket_io_read_from_client (T_SOCKET_IO * sock_io_p)
{
  T_IO_BUFFER *r_buf;

  assert (sock_io_p);
  assert (sock_io_p->read_event);

  r_buf = &(sock_io_p->read_event->buffer);

  if (r_buf->length == 0)
    {
      proxy_socket_io_read_from_client_first (sock_io_p);
    }
  else
    {
      proxy_socket_io_read_from_client_next (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_write (T_SOCKET_IO * sock_io_p)
{
  assert (sock_io_p);

  if (sock_io_p->status == SOCK_IO_CLOSE_WAIT)
    {
      PROXY_DEBUG_LOG ("Unexpected socket status. (fd:%d, status:%d). \n", sock_io_p->fd, sock_io_p->status);

      /*
       * free writer event when sock status is 'close wait'
       */
      if (sock_io_p->write_event)
    {
      proxy_event_free (sock_io_p->write_event);
      sock_io_p->write_event = NULL;
    }

#if defined(LINUX)
      (void) proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN);
#else /* LINUX */
      FD_CLR (sock_io_p->fd, &wnewset);
#endif /* !LINUX */

      return;
    }
  else if (sock_io_p->status == SOCK_IO_IDLE)
    {
      assert (false);
      PROXY_DEBUG_LOG ("Unexpected socket status. (fd:%d, status:%d). \n", sock_io_p->fd, sock_io_p->status);
    }

  if (sock_io_p->write_event == NULL)
    {
#if defined(LINUX)
      (void) proxy_mod_epoll_event (sock_io_p->fd, EPOLLIN);
#else /* LINUX */
      FD_CLR (sock_io_p->fd, &wnewset);
#endif /* !LINUX */

      PROXY_DEBUG_LOG ("Write event couldn't be NULL. (fd:%d, status:%d). \n", sock_io_p->fd, sock_io_p->status);
      return;
    }

  if (sock_io_p->from_cas)
    {
      proxy_socket_io_write_to_cas (sock_io_p);
    }
  else
    {
      proxy_socket_io_write_to_client (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_write_error (T_SOCKET_IO * sock_io_p)
{
  assert (sock_io_p);

  if (sock_io_p->from_cas)
    {
      proxy_process_cas_write_error (sock_io_p);
    }
  else
    {
      proxy_process_client_write_error (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_read (T_SOCKET_IO * sock_io_p)
{
  assert (sock_io_p);

  if (sock_io_p->status == SOCK_IO_CLOSE_WAIT)
    {
      /* this socket connection will be close */

      PROXY_DEBUG_LOG ("Unexpected socket status. " "socket will be closed. " "(fd:%d, status:%d).", sock_io_p->fd,
               sock_io_p->status);
      //
      // proxy_io_buffer_clear (&sock_io_p->recv_buffer);

      // assert (false);
      return;
    }

  if (sock_io_p->read_event == NULL)
    {
      sock_io_p->read_event = proxy_event_new (PROXY_EVENT_IO_READ, sock_io_p->from_cas);
      if (sock_io_p->read_event == NULL)
    {
      assert (false);   /* __FOR_DEBUG */

      proxy_socket_io_read_error (sock_io_p);

      return;
    }
    }

  /* __FOR_DEBUG */
  assert (sock_io_p->read_event->type == PROXY_EVENT_IO_READ);

  if (sock_io_p->from_cas)
    {
      proxy_socket_io_read_from_cas (sock_io_p);
    }
  else
    {
      proxy_socket_io_read_from_client (sock_io_p);
    }

  return;
}

static void
proxy_socket_io_read_error (T_SOCKET_IO * sock_io_p)
{
  assert (sock_io_p);

  if (sock_io_p->from_cas)
    {
      proxy_process_cas_read_error (sock_io_p);
    }
  else
    {
      proxy_process_client_read_error (sock_io_p);
    }

  return;
}

static int
proxy_client_io_initialize (void)
{
  int error;
  int i, size;
  T_CLIENT_IO *client_io_ent_p;

  proxy_Client_io.max_client = shm_proxy_p->max_client;
  proxy_Client_io.cur_client = 0;
  proxy_Client_io.max_context = shm_proxy_p->max_context;

  error = shard_cqueue_initialize (&proxy_Client_io.freeq, proxy_Client_io.max_context);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize client entries. " "(error:%d).", error);
      return -1;
    }

  /* make client io entry */
  size = proxy_Client_io.max_context * sizeof (T_CLIENT_IO);
  proxy_Client_io.ent = (T_CLIENT_IO *) malloc (size);
  if (proxy_Client_io.ent == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Not enough virtual memory. " "Failed to alloc client entries. " "(errno:%d, size:%d).", errno, size);
      goto error_return;
    }

  for (i = 0; i < proxy_Client_io.max_context; i++)
    {
      client_io_ent_p = &(proxy_Client_io.ent[i]);

      client_io_ent_p->client_id = i;
      client_io_ent_p->is_busy = false;
      client_io_ent_p->fd = INVALID_SOCKET;
      client_io_ent_p->ctx_cid = PROXY_INVALID_CONTEXT;
      client_io_ent_p->ctx_uid = 0;

      error = shard_cqueue_enqueue (&proxy_Client_io.freeq, (void *) client_io_ent_p);
      if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize client free queue." "(error:%d).", error);
      goto error_return;
    }
    }

  return 0;

error_return:
  proxy_client_io_destroy ();

  return -1;
}

static void
proxy_client_io_destroy (void)
{
  shard_cqueue_destroy (&proxy_Client_io.freeq);
  FREE_MEM (proxy_Client_io.ent);
  proxy_Client_io.max_client = -1;
  proxy_Client_io.cur_client = 0;
}

#if defined(PROXY_VERBOSE_DEBUG)
void
proxy_client_io_print (bool print_all)
{
  int i;
  T_CLIENT_IO *cli_io_p = NULL;

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* CLIENT IO *\n");
  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_client", proxy_Client_io.max_client);
  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_context", proxy_Client_io.max_context);
  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "cur_client", proxy_Client_io.cur_client);

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s    %-10s %-5s %-5s %-10s %-10s", "idx", "client_id", "busy", "fd",
         "context_id", "uid");
  if (proxy_Client_io.ent)
    {
      for (i = 0; i < proxy_Client_io.max_context; i++)
    {
      cli_io_p = &(proxy_Client_io.ent[i]);
      if (!print_all && !cli_io_p->is_busy)
        {
          continue;
        }
      PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d]     %-10d %-5s %-5d %-10d %-10u", i, cli_io_p->client_id,
             (cli_io_p->is_busy) ? "YES" : "NO", cli_io_p->fd, cli_io_p->ctx_cid, cli_io_p->ctx_uid);
    }
    }

  return;
}
#endif /* PROXY_VERBOSE_DEBUG */

char *
proxy_str_client_io (T_CLIENT_IO * cli_io_p)
{
  static char buffer[LINE_MAX];

  if (cli_io_p == NULL)
    {
      return (char *) "-";
    }

  snprintf (buffer, sizeof (buffer), "client_id:%d, is_busy:%s, fd:%d, ctx_cid:%d, ctx_uid:%u", cli_io_p->client_id,
        (cli_io_p->is_busy) ? "Y" : "N", cli_io_p->fd, cli_io_p->ctx_cid, cli_io_p->ctx_uid);

  return (char *) buffer;
}

static T_CLIENT_IO *
proxy_client_io_new (SOCKET fd, char *driver_info)
{
  T_PROXY_CONTEXT *ctx_p;
  T_CLIENT_IO *cli_io_p = NULL;

  cli_io_p = (T_CLIENT_IO *) shard_cqueue_dequeue (&proxy_Client_io.freeq);

  if (cli_io_p)
    {
      cli_io_p->fd = fd;
      cli_io_p->is_busy = true;

      proxy_Client_io.cur_client++;
      proxy_info_p->cur_client = proxy_Client_io.cur_client;

      ctx_p = proxy_context_new ();
      if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to make new context. client(%s).", proxy_str_client_io (cli_io_p));

      proxy_client_io_free (cli_io_p);
      return NULL;
    }

      cli_io_p->ctx_cid = ctx_p->cid;
      cli_io_p->ctx_uid = ctx_p->uid;
      memcpy (cli_io_p->driver_info, driver_info, SRV_CON_CLIENT_INFO_SIZE);

      ctx_p->client_id = cli_io_p->client_id;

      PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New client connected. client(%s).", proxy_str_client_io (cli_io_p));

      if (proxy_Client_io.cur_client > proxy_Client_io.max_client)
    {
      /*
       * Error message would be retured when processing
       * register(db_info) request.
       */
      char err_msg[256];

      snprintf (err_msg, sizeof (err_msg), "Proxy refused client connection. max clients exceeded");

      proxy_context_set_error_with_msg (ctx_p, DBMS_ERROR_INDICATOR, CAS_ER_MAX_CLIENT_EXCEEDED, err_msg);
    }
    }

#if defined(PROXY_VERBOSE_DEBUG)
  proxy_client_io_print (false);
#endif /* PROXY_VERBOSE_DEBUG */

  return cli_io_p;
}

void
proxy_client_io_free (T_CLIENT_IO * cli_io_p)
{
  int error;

  assert (cli_io_p->fd != INVALID_SOCKET);
  assert (cli_io_p->is_busy == true);

  proxy_socket_io_delete (cli_io_p->fd);

  cli_io_p->fd = INVALID_SOCKET;
  cli_io_p->is_busy = false;
  cli_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
  cli_io_p->ctx_uid = 0;
  memset (cli_io_p->driver_info, 0, SRV_CON_CLIENT_INFO_SIZE);

  proxy_Client_io.cur_client--;
  if (proxy_Client_io.cur_client < 0)
    {
      assert (false);
      proxy_Client_io.cur_client = 0;
    }
  proxy_info_p->cur_client = proxy_Client_io.cur_client;

  error = shard_cqueue_enqueue (&proxy_Client_io.freeq, (void *) cli_io_p);
  if (error)
    {
      assert (false);
      return;
    }

  return;
}

void
proxy_client_io_free_by_ctx (int client_id, int ctx_cid, int ctx_uid)
{
  T_CLIENT_IO *cli_io_p;

  cli_io_p = proxy_client_io_find_by_ctx (client_id, ctx_cid, ctx_uid);
  if (cli_io_p == NULL)
    {
      return;
    }

  proxy_client_io_free (cli_io_p);

  return;
}


T_CLIENT_IO *
proxy_client_io_find_by_ctx (int client_id, int ctx_cid, unsigned int ctx_uid)
{
  T_CLIENT_IO *cli_io_p;

  if (client_id < 0 || client_id >= proxy_Client_io.max_context)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid client id. (client_id;%d, max_context:%d).", client_id,
         proxy_Client_io.max_context);
      return NULL;
    }

  cli_io_p = &(proxy_Client_io.ent[client_id]);

  if (cli_io_p->ctx_cid != ctx_cid || cli_io_p->ctx_uid != ctx_uid)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find client by context. " "(context id:%d/%d, context uid:%d/%d).",
         cli_io_p->ctx_cid, ctx_cid, cli_io_p->ctx_uid, ctx_uid);
      return NULL;
    }

  return (cli_io_p->is_busy) ? cli_io_p : NULL;
}

T_CLIENT_IO *
proxy_client_io_find_by_fd (int client_id, SOCKET fd)
{
  T_CLIENT_IO *cli_io_p;

  if (client_id < 0 || client_id >= proxy_Client_io.max_context)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid client id. (client_id:%d, max_context:%d).", client_id,
         proxy_Client_io.max_context);
      return NULL;
    }

  cli_io_p = &(proxy_Client_io.ent[client_id]);

  /* client io's socket fd must be the same requested fd */
  assert (cli_io_p->fd == fd);

  return (cli_io_p->is_busy) ? cli_io_p : NULL;
}

int
proxy_client_io_write (T_CLIENT_IO * cli_io_p, T_PROXY_EVENT * event_p)
{
  int error;

  T_SOCKET_IO *sock_io_p;

  assert (cli_io_p);
  assert (event_p);

  sock_io_p = proxy_socket_io_find (cli_io_p->fd);
  if (sock_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find socket entry. (fd:%d).", cli_io_p->fd);
      return -1;
    }

  error = proxy_socket_set_write_event (sock_io_p, event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "client(%s). event(%s).",
         proxy_str_client_io (cli_io_p), proxy_str_event (event_p));
      return -1;
    }

  return 0;
}

static int
proxy_cas_io_initialize (int shard_id, T_CAS_IO ** cas_io_pp, int size)
{
  int i;
  T_CAS_IO *cas_io_p;
  T_CAS_IO *buffer;

  assert (cas_io_pp);

  buffer = (T_CAS_IO *) malloc (sizeof (T_CAS_IO) * size);
  if (buffer == NULL)
    {
      return -1;
    }

  for (i = 0; i < size; i++)
    {
      cas_io_p = &(buffer[i]);

      cas_io_p->cas_id = i;
      cas_io_p->shard_id = shard_id;
      cas_io_p->is_in_tran = false;
      cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
      cas_io_p->ctx_uid = 0;
      cas_io_p->fd = INVALID_SOCKET;
    }

  *cas_io_pp = buffer;

  return 0;
}

int
proxy_cas_io_write (T_CAS_IO * cas_io_p, T_PROXY_EVENT * event_p)
{
  int error;
  T_SOCKET_IO *sock_io_p;

  assert (cas_io_p);
  assert (event_p);

  sock_io_p = proxy_socket_io_find (cas_io_p->fd);
  if (sock_io_p == NULL)
    {
      assert (false);
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find socket entry. (fd:%d).", cas_io_p->fd);
      return -1;
    }

  error = proxy_socket_set_write_event (sock_io_p, event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set write buffer. " "CAS(%s). event(%s).",
         proxy_str_cas_io (cas_io_p), proxy_str_event (event_p));
      return -1;
    }

  return 0;
}

static int
proxy_shard_io_initialize (void)
{
  int error;
  int i;
  T_SHARD_IO *shard_io_p;
  T_SHARD_INFO *shard_info_p;
  int max_appl_server;

  proxy_Shard_io.max_shard = proxy_info_p->max_shard;

  proxy_Shard_io.ent = (T_SHARD_IO *) malloc (sizeof (T_SHARD_IO) * proxy_Shard_io.max_shard);
  if (proxy_Shard_io.ent == NULL)
    {
      return -1;
    }

  shard_info_p = shard_shm_find_shard_info (proxy_info_p, 0);
  max_appl_server = shard_info_p->max_appl_server;

  for (i = 0; i < proxy_Shard_io.max_shard; i++)
    {
      shard_io_p = &(proxy_Shard_io.ent[i]);

      shard_io_p->shard_id = i;
      shard_io_p->max_num_cas = max_appl_server;
      shard_io_p->cur_num_cas = 0;
      shard_io_p->num_cas_in_tran = 0;
      error = shard_queue_initialize (&shard_io_p->waitq);
      if (error)
    {
      goto error_return;
    }

      error = proxy_cas_io_initialize (shard_io_p->shard_id, &shard_io_p->ent, shard_io_p->max_num_cas);
      if (error)
    {
      goto error_return;
    }
    }

  return 0;

error_return:
  proxy_shard_io_destroy ();
  return -1;
}

static void
proxy_shard_io_destroy (void)
{
  int i;
  T_SHARD_IO *shard_io_p;

  for (i = 0; i < proxy_Shard_io.max_shard; i++)
    {
      shard_io_p = &(proxy_Shard_io.ent[i]);

      shard_queue_destroy (&shard_io_p->waitq);
      FREE_MEM (shard_io_p->ent);
    }

  FREE_MEM (proxy_Shard_io.ent);

  return;
}

#if defined(PROXY_VERBOSE_DEBUG)
void
proxy_shard_io_print (bool print_all)
{
  int i, j;
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* SHARD IO *");
  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "max_shard", proxy_Shard_io.max_shard);

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s    %-15s %-15s %-15s %-15s", "idx", "shard_id", "max_cas", "cur_cas",
         "in_tran");
  if (proxy_Shard_io.ent)
    {
      for (i = 0; i < proxy_Shard_io.max_shard; i++)
    {
      shard_io_p = &(proxy_Shard_io.ent[i]);

      PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d]     %-15d %-15d %-15d %-15d", i, shard_io_p->shard_id,
             shard_io_p->max_num_cas, shard_io_p->cur_num_cas, shard_io_p->num_cas_in_tran);

      PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "       <cas>   %-7s    %-10s %-10s %-10s %-10s", "idx", "cas_id",
             "shard_id", "in_tran", "fd");
      if (shard_io_p->ent)
        {
          for (j = 0; j < shard_io_p->max_num_cas; j++)
        {
          cas_io_p = &(shard_io_p->ent[j]);
          if (!print_all && IS_INVALID_SOCKET (cas_io_p->fd))
            {
              continue;
            }

          PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "               [%-5d]    %-10d %-10d %-10s %-10d", j,
                 cas_io_p->cas_id, cas_io_p->shard_id, (cas_io_p->is_in_tran) ? "YES" : "NO", cas_io_p->fd);
        }
        }
    }
    }

  return;
}
#endif /* PROXY_VERBOSE_DEBUG */

char *
proxy_str_cas_io (T_CAS_IO * cas_io_p)
{
  static char buffer[LINE_MAX];

  if (cas_io_p == NULL)
    {
      return (char *) "-";
    }

  snprintf (buffer, sizeof (buffer),
        "cas_id:%d, shard_id:%d, is_in_tran:%s, " "status:%d, ctx_cid:%d, ctx_uid:%u, fd:%d", cas_io_p->cas_id,
        cas_io_p->shard_id, (cas_io_p->is_in_tran) ? "Y" : "N", cas_io_p->status, cas_io_p->ctx_cid,
        cas_io_p->ctx_uid, cas_io_p->fd);

  return (char *) buffer;
}

static T_SHARD_IO *
proxy_shard_io_find (int shard_id)
{
  T_SHARD_IO *shard_io_p;

  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
    {
      PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
      return NULL;
    }

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  return shard_io_p;
}

static T_CAS_IO *
proxy_cas_io_new (int shard_id, int cas_id, SOCKET fd)
{
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;

  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id,
         proxy_Shard_io.max_shard);
      return NULL;
    }

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
  if (cas_id < 0 || cas_id >= shard_io_p->max_num_cas)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
      return NULL;
    }

  cas_io_p = &(shard_io_p->ent[cas_id]);

  if (cas_io_p->fd != INVALID_SOCKET || cas_io_p->is_in_tran)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Already registered CAS. " "(fd:%d, shard_id:%d, cas_id:%d, max_num_cas:%d).",
         cas_io_p->fd, shard_id, cas_id, shard_io_p->max_num_cas);
      return NULL;
    }
  assert (cas_io_p->fd == INVALID_SOCKET);
  assert (cas_io_p->is_in_tran == false);

  cas_io_p->status = CAS_IO_NOT_CONNECTED;
  cas_io_p->fd = fd;
  cas_io_p->is_in_tran = false;
  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
  cas_io_p->ctx_uid = 0;

  shard_io_p->cur_num_cas++;

  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New CAS connected. CAS(%s).", proxy_str_cas_io (cas_io_p));
#if defined(PROXY_VERBOSE_DEBUG)
  proxy_shard_io_print (false);
#endif /* PROXY_VERBOSE_DEBUG */

  return cas_io_p;
}

/* by socket event */
static void
proxy_cas_io_free (int shard_id, int cas_id)
{
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;

  ENTER_FUNC ();

  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
    {
      PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
      EXIT_FUNC ();
      return;
    }

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
  if (cas_id < 0 || cas_id >= shard_io_p->max_num_cas)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
      EXIT_FUNC ();
      return;
    }

  cas_io_p = &(shard_io_p->ent[cas_id]);

  proxy_socket_io_delete (cas_io_p->fd);

  if (cas_io_p->fd != INVALID_SOCKET)
    {
      cas_io_p->fd = INVALID_SOCKET;
      shard_stmt_del_all_srv_h_id_for_shard_cas (cas_io_p->shard_id, cas_io_p->cas_id);
    }

  if (cas_io_p->is_in_tran == true)
    {
      shard_io_p->num_cas_in_tran--;

      PROXY_DEBUG_LOG ("shard/CAS status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
               shard_io_p->shard_id);

      assert (shard_io_p->num_cas_in_tran >= 0);
      if (shard_io_p->num_cas_in_tran < 0)
    {
      shard_io_p->num_cas_in_tran = 0;
    }
    }


  cas_io_p->is_in_tran = false;
  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
  cas_io_p->ctx_uid = 0;

  cas_io_p->status = CAS_IO_NOT_CONNECTED;
  shard_io_p->cur_num_cas--;

  EXIT_FUNC ();
  return;
}

/* by context */
void
proxy_cas_io_free_by_ctx (int shard_id, int cas_id, int ctx_cid, int unsigned ctx_uid)
{
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;

  ENTER_FUNC ();

  if (shard_id < 0 || shard_id >= proxy_Shard_io.max_shard)
    {

      PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
      EXIT_FUNC ();
      return;
    }

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);
  if (cas_id < 0 || cas_id >= shard_io_p->max_num_cas)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
      EXIT_FUNC ();
      return;
    }

  cas_io_p = &(shard_io_p->ent[cas_id]);

  if (cas_io_p->is_in_tran == false || cas_io_p->ctx_cid != ctx_cid || cas_io_p->ctx_uid != ctx_uid)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(context id:%d, context uid:%d). CAS(%S).",
         ctx_cid, ctx_uid, proxy_str_cas_io (cas_io_p));

      assert (false);

      EXIT_FUNC ();
      return;
    }

  proxy_socket_io_delete (cas_io_p->fd);

  if (cas_io_p->fd != INVALID_SOCKET)
    {
      cas_io_p->fd = INVALID_SOCKET;
      shard_stmt_del_all_srv_h_id_for_shard_cas (cas_io_p->shard_id, cas_io_p->cas_id);
    }

  cas_io_p->is_in_tran = false;
  cas_io_p->status = CAS_IO_NOT_CONNECTED;
  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
  cas_io_p->ctx_uid = 0;

  shard_io_p->cur_num_cas--;
  assert (shard_io_p->cur_num_cas >= 0);
  if (shard_io_p->cur_num_cas < 0)
    {
      shard_io_p->cur_num_cas = 0;
    }

  shard_io_p->num_cas_in_tran--;
  PROXY_DEBUG_LOG ("Shard/CAS status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
           shard_io_p->shard_id);
  assert (shard_io_p->num_cas_in_tran >= 0);
  if (shard_io_p->num_cas_in_tran < 0)
    {
      shard_io_p->num_cas_in_tran = 0;
    }

  EXIT_FUNC ();
  return;
}

static T_CAS_IO *
proxy_cas_io_find_by_fd (int shard_id, int cas_id, SOCKET fd)
{
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;

  if (shard_id < 0 || cas_id < 0 || fd == INVALID_SOCKET)
    {
      PROXY_DEBUG_LOG ("Unable to find CAS entry. " "(shard:%d, cas:%d, fd:%d).", shard_id, cas_id, fd);
      return NULL;
    }

  /* __FOR_DEBUG */
  assert (shard_id <= proxy_Shard_io.max_shard);
  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  /* __FOR_DEBUG */
  assert (cas_id <= shard_io_p->max_num_cas);
  cas_io_p = &(shard_io_p->ent[cas_id]);

  assert (cas_io_p->fd == fd);

  return cas_io_p;
}

T_CAS_IO *
proxy_cas_find_io_by_ctx (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
{
  /* in case, find cas explicitly */
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;

  if (0 > shard_id || shard_id >= proxy_Shard_io.max_shard)
    {
      PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
      return NULL;
    }
  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  if (0 > cas_id || cas_id >= shard_io_p->max_num_cas)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
      return NULL;
    }
  cas_io_p = &(shard_io_p->ent[cas_id]);

  if (cas_io_p->is_in_tran == false || cas_io_p->ctx_cid != ctx_cid || cas_io_p->ctx_uid != ctx_uid)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(context id:%d, context uid:%d). CAS(%S).",
         ctx_cid, ctx_uid, proxy_str_cas_io (cas_io_p));
      return NULL;
    }

  return cas_io_p;
}

T_CAS_IO *
proxy_cas_alloc_by_ctx (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, int timeout,
            int func_code)
{
  int error;
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;
  T_CLIENT_INFO *client_info_p;

  unsigned int curr_shard_id = 0;
  static unsigned int last_shard_id = 0;

  /* valid shard id */
  if ((shard_id < 0 && cas_id >= 0) || (shard_id >= proxy_Shard_io.max_shard))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid shard/CAS id is requested. " "(shard_id:%d, cas_id:%d). ", shard_id,
         cas_id);

      return NULL;
    }

  if (shard_id >= 0 && cas_id >= 0)
    {
      cas_io_p = proxy_cas_alloc_by_shard_and_cas_id (client_id, shard_id, cas_id, ctx_cid, ctx_uid);
    }
  else
    {
      if (func_code == CAS_FC_CHECK_CAS)
    {
      cas_io_p =
        proxy_cas_alloc_anything (client_id, shard_id, cas_id, ctx_cid, ctx_uid, proxy_find_idle_cas_by_desc);
    }
      else
    {
      cas_io_p =
        proxy_cas_alloc_anything (client_id, shard_id, cas_id, ctx_cid, ctx_uid, proxy_find_idle_cas_by_conn_info);
      if (cas_io_p == NULL)
        {
          cas_io_p =
        proxy_cas_alloc_anything (client_id, shard_id, cas_id, ctx_cid, ctx_uid, proxy_find_idle_cas_by_asc);
        }
    }
    }

  if (cas_io_p == NULL)
    {
      goto set_waiter;
    }

  shard_id = cas_io_p->shard_id;
  cas_id = cas_io_p->cas_id;

  client_info_p = shard_shm_get_client_info (proxy_info_p, client_id);
  if (client_info_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_cid, ctx_uid);
    }
  else
    if (shard_shm_set_as_client_info_with_db_param
    (proxy_info_p, shm_as_p, cas_io_p->shard_id, cas_io_p->cas_id, client_info_p) == false)
    {

      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS info in shared memory. " "(shard_id:%d, cas_id:%d).",
         cas_io_p->shard_id, cas_io_p->cas_id);
    }

  cas_io_p->is_in_tran = true;
  cas_io_p->ctx_cid = ctx_cid;
  cas_io_p->ctx_uid = ctx_uid;

  proxy_set_conn_info (func_code, ctx_cid, ctx_uid, shard_id, cas_id);

  return cas_io_p;

set_waiter:
  if (shard_id >= 0)
    {
      curr_shard_id = (unsigned int) shard_id;
    }
  else
    {
      curr_shard_id = (unsigned int) (last_shard_id + 1) % proxy_Shard_io.max_shard;

      last_shard_id = curr_shard_id;
    }

  shard_io_p = &(proxy_Shard_io.ent[curr_shard_id]);
  if (shard_io_p->cur_num_cas <= 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Failed to allocate shard/cas. " "No available cas in this shard. "
         "Wait until shard has available cas. " "(cur_num_cas:%d, max_num_cas:%d)", shard_io_p->cur_num_cas,
         shard_io_p->max_num_cas);
    }

  error = proxy_client_add_waiter_by_shard (shard_io_p, ctx_cid, ctx_uid, timeout);
  if (error)
    {
      return NULL;
    }

  return (T_CAS_IO *) (SHARD_TEMPORARY_UNAVAILABLE);
}

void
proxy_cas_release_by_ctx (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
{
  T_SHARD_IO *shard_io_p;
  T_CAS_IO *cas_io_p;

  ENTER_FUNC ();

  if (0 > shard_id || shard_id >= proxy_Shard_io.max_shard)
    {
      PROXY_DEBUG_LOG ("Invalid shard id. (shard_id:%d, max_shard:%d).", shard_id, proxy_Shard_io.max_shard);
      EXIT_FUNC ();
      return;
    }
  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  if (0 > cas_id || cas_id >= shard_io_p->max_num_cas)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS id. (cas_id:%d, max_num_cas:%d).", cas_id, shard_io_p->max_num_cas);
      EXIT_FUNC ();
      return;
    }
  cas_io_p = &(shard_io_p->ent[cas_id]);

  if (cas_io_p->is_in_tran == false || cas_io_p->ctx_cid != ctx_cid || cas_io_p->ctx_uid != ctx_uid)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS entry. " "(context id:%d, context uid:%d). CAS(%S).",
         ctx_cid, ctx_uid, proxy_str_cas_io (cas_io_p));

      assert (false);

      EXIT_FUNC ();
      return;
    }

  cas_io_p->is_in_tran = false;
  cas_io_p->ctx_cid = PROXY_INVALID_CONTEXT;
  cas_io_p->ctx_uid = 0;

  /* decrease number of cas in transaction */
  shard_io_p->num_cas_in_tran--;

  PROXY_DEBUG_LOG ("Shard status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
           shard_io_p->shard_id);

  assert (shard_io_p->num_cas_in_tran >= 0);
  if (shard_io_p->num_cas_in_tran < 0)
    {
      shard_io_p->num_cas_in_tran = 0;
    }

  /* check and wakeup shard/cas waiter */
  proxy_client_check_waiter_and_wakeup (shard_io_p, cas_io_p);

  EXIT_FUNC ();
  return;
}

static int
proxy_client_add_waiter_by_shard (T_SHARD_IO * shard_io_p, int ctx_cid, int ctx_uid, int timeout)
{
  int error;
  T_WAIT_CONTEXT *waiter_p;
  T_SHARD_INFO *shard_info_p = NULL;

  ENTER_FUNC ();

  assert (shard_io_p);

  waiter_p = proxy_waiter_new (ctx_cid, ctx_uid, timeout);
  if (waiter_p == NULL)
    {
      EXIT_FUNC ();
      return -1;
    }

  PROXY_DEBUG_LOG ("Context(context id:%d, context uid:%u) " "is waiting on shard(shard_id:%d).", ctx_cid, ctx_uid,
           shard_io_p->shard_id);

  error = shard_queue_ordered_enqueue (&shard_io_p->waitq, (void *) waiter_p, proxy_waiter_comp_fn);
  if (error)
    {
      EXIT_FUNC ();
      return -1;
    }

  shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_io_p->shard_id);
  if (shard_info_p != NULL)
    {
      shard_info_p->waiter_count++;
      PROXY_DEBUG_LOG ("Add waiter by shard. (waiter_count:%d).", shard_info_p->waiter_count);
    }

  EXIT_FUNC ();
  return 0;
}

static void
proxy_client_check_waiter_and_wakeup (T_SHARD_IO * shard_io_p, T_CAS_IO * cas_io_p)
{
  int error;
  T_WAIT_CONTEXT *waiter_p = NULL;
  T_SHARD_INFO *shard_info_p = NULL;

  ENTER_FUNC ();

  assert (shard_io_p);
  assert (cas_io_p);

  waiter_p = (T_WAIT_CONTEXT *) shard_queue_dequeue (&shard_io_p->waitq);
  while (waiter_p)
    {
      PROXY_DEBUG_LOG ("Wakeup waiter by shard. (shard_id:%d, cas_id:%d).", cas_io_p->shard_id, cas_io_p->cas_id);

      shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_io_p->shard_id);
      if ((shard_info_p != NULL) && (shard_info_p->waiter_count > 0))
    {
      shard_info_p->waiter_count--;
      PROXY_DEBUG_LOG ("Wakeup context by shard. (waiter_count:%d).", shard_info_p->waiter_count);
    }

      error = proxy_wakeup_context_by_shard (waiter_p, cas_io_p->shard_id, cas_io_p->cas_id);
      if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to wakeup context by shard. " "(error:%d, shard_id:%d, cas_id:%d).",
             error, cas_io_p->shard_id, cas_io_p->cas_id);
      FREE_MEM (waiter_p);
      continue;
    }

      FREE_MEM (waiter_p);
      break;
    }

  EXIT_FUNC ();
  return;
}

#if !defined(WINDOWS)
static SOCKET
proxy_io_connect_to_broker (void)
{
  SOCKET fd;
  int len;

  struct sockaddr_un shard_sock_addr;
  char *port_name;

  if ((port_name = shm_as_p->port_name) == NULL)
    {
      return (-1);
    }

  /* FOR DEBUG */
  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Connect to broker. (port_name:[%s]).", port_name);

  if ((fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
    {
      return (-1);
    }

  memset (&shard_sock_addr, 0, sizeof (shard_sock_addr));
  shard_sock_addr.sun_family = AF_UNIX;
  strcpy (shard_sock_addr.sun_path, port_name);
#ifdef  _SOCKADDR_LEN       /* 4.3BSD Reno and later */
  len = sizeof (shard_sock_addr.sun_len) + sizeof (shard_sock_addr.sun_family) + strlen (shard_sock_addr.sun_path) + 1;
  shard_sock_addr.sun_len = len;
#else /* vanilla 4.3BSD */
  len = strlen (shard_sock_addr.sun_path) + sizeof (shard_sock_addr.sun_family) + 1;
#endif

  if (connect (fd, (struct sockaddr *) &shard_sock_addr, len) != 0)
    {
      CLOSESOCKET (fd);
      return (-4);
    }

  return fd;
}

static int
proxy_io_register_to_broker (void)
{
  SOCKET fd = INVALID_SOCKET;
  int error;
  int tmp_proxy_id;
  int len;
  int num_retry = 0;

  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Register to broker. ");

  do
    {
      sleep (1);
      fd = proxy_io_connect_to_broker ();
    }
  while (IS_INVALID_SOCKET (fd) && (num_retry++) < 5);
  if (IS_INVALID_SOCKET (fd))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to connect to broker. (fd:%d).", fd);
      return -1;
    }

  tmp_proxy_id = htonl (proxy_id);
  len = WRITESOCKET (fd, &tmp_proxy_id, sizeof (tmp_proxy_id));
  if (len != sizeof (tmp_proxy_id))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to register to broker. (fd:%d).", fd);
      return -1;
    }

#if defined(LINUX)
  shard_io_set_fl (fd, O_NONBLOCK);

  error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLPRI);
  if (error < 0)
    {
      CLOSE_SOCKET (fd);
      return -1;
    }
#else /* LINUX */
  FD_SET (fd, &allset);
  maxfd = max (maxfd, fd);
#endif /* !LINUX */
  broker_conn_fd = fd;

  return 0;
}
#endif /* !WINDOWS */

#if defined(WINDOWS)
static int
proxy_io_inet_lsnr (int port)
{
  SOCKET fd;
  int len, backlog_size;
  int one = 1;
  struct sockaddr_in shard_sock_addr;

  fd = socket (AF_INET, SOCK_STREAM, 0);
  if (IS_INVALID_SOCKET (fd))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Fail to create socket. (Error:%d).", WSAGetLastError ());
      return (-1);
    }
  if ((setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Fail to set socket option. (Error:%d).", WSAGetLastError ());
      return (-1);
    }

  memset (&shard_sock_addr, 0, sizeof (struct sockaddr_in));
  shard_sock_addr.sin_family = AF_INET;
  shard_sock_addr.sin_port = htons ((unsigned short) (port));
  len = sizeof (struct sockaddr_in);
  shard_sock_addr.sin_addr.s_addr = htonl (INADDR_ANY);

  /* bind the name to the descriptor */
  if (bind (fd, (struct sockaddr *) &shard_sock_addr, len) < 0)
    {
      CLOSESOCKET (fd);
      return (-2);
    }

  /* SHARD TODO -- modify backlog size to max_client_size or other rule -- tigger */
  backlog_size = 10;
  if (listen (fd, backlog_size) < 0)
    {
      /* tell kernel we're a server */
      CLOSESOCKET (fd);
      return (-3);
    }

  return fd;
}
#else /* WINDOWS */
static int
proxy_io_unixd_lsnr (char *unixd_sock_name)
{
  SOCKET fd;
  int len, backlog_size;
  struct sockaddr_un shard_sock_addr;

  if ((fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
    {
      return (-1);
    }

  memset (&shard_sock_addr, 0, sizeof (shard_sock_addr));
  shard_sock_addr.sun_family = AF_UNIX;
  strcpy (shard_sock_addr.sun_path, unixd_sock_name);

#ifdef  _SOCKADDR_LEN       /* 4.3BSD Reno and later */
  len = sizeof (shard_sock_addr.sun_len) + sizeof (shard_sock_addr.sun_family) + strlen (shard_sock_addr.sun_path) + 1;
  shard_sock_addr.sun_len = len;
#else /* vanilla 4.3BSD */
  len = strlen (shard_sock_addr.sun_path) + sizeof (shard_sock_addr.sun_family) + 1;
#endif

  /* bind the name to the descriptor */
  if (bind (fd, (struct sockaddr *) &shard_sock_addr, len) < 0)
    {
      CLOSESOCKET (fd);
      return (-2);
    }

  /* SHARD TODO -- modify backlog size to max_client_size or other rule -- tigger */
  backlog_size = 10;
  if (listen (fd, backlog_size) < 0)
    {
      /* tell kernel we're a server */
      CLOSESOCKET (fd);
      return (-3);
    }

  return fd;
}
#endif /* !WINDOWS */

static int
proxy_io_cas_lsnr (void)
{
  int error = 0;
  SOCKET fd;

#if defined(WINDOWS)
  int port = GET_CAS_PORT (broker_port, proxy_info_p->proxy_id,
               shm_proxy_p->num_proxy);

  /* FOR DEBUG */
  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Listen CAS socket. (port number:[%d])", port);

  fd = proxy_io_inet_lsnr (port);
#else /* WINDOWS */
  /* FOR DEBUG */
  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Listen CAS socket. (port name:[%s])", proxy_info_p->port_name);

  fd = proxy_io_unixd_lsnr (proxy_info_p->port_name);
#endif /* !WINDOWS */

  if (IS_INVALID_SOCKET (fd))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to listen CAS socket. (fd:%d).", fd);
      return -1;        /* FAIELD */
    }

#if defined(LINUX)
  shard_io_set_fl (fd, O_NONBLOCK);

  error = proxy_add_epoll_event (fd, EPOLLIN | EPOLLPRI);
  if (error < 0)
    {
      CLOSE_SOCKET (fd);
      return -1;
    }
#else /* LINUX */
  FD_SET (fd, &allset);
  maxfd = max (maxfd, fd);
#endif /* !LINUX */
  cas_lsnr_fd = fd;

  return 0;         /* SUCCESS */
}

#if defined(WINDOWS)
static int
proxy_io_client_lsnr (void)
{
  SOCKET fd;

  int port = GET_CLIENT_PORT (broker_port, proxy_info_p->proxy_id);

  /* FOR DEBUG */
  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Listen Client socket. " "(port number:[%d])", port);

  fd = proxy_io_inet_lsnr (port);
  if (IS_INVALID_SOCKET (fd))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to listen Client socket. (fd:%d).", fd);
      return -1;        /* FAIELD */
    }

  proxy_info_p->proxy_port = port;
  client_lsnr_fd = fd;
  FD_SET (client_lsnr_fd, &allset);
  maxfd = max (maxfd, client_lsnr_fd);

  return 0;
}
#endif

static SOCKET
proxy_io_accept (SOCKET lsnr_fd)
{
  socklen_t len;
  SOCKET fd;
#if defined(WINDOWS)
  struct sockaddr_in shard_sock_addr;
#else /* WINDOWS */
  struct sockaddr_un shard_sock_addr;
#endif /* !WINDOWS */

  len = sizeof (shard_sock_addr);
  fd = accept (lsnr_fd, (struct sockaddr *) &shard_sock_addr, &len);
  if (IS_INVALID_SOCKET (fd))
    {
      return (-1);
    }

#if defined(WINDOWS)
  memcpy (&accept_ip_addr, &(shard_sock_addr.sin_addr), 4);
#endif /* WINDOWS */

  return (fd);
}

static SOCKET
proxy_io_cas_accept (SOCKET lsnr_fd)
{
  return proxy_io_accept (lsnr_fd);
}

#if defined(WINDOWS)
static SOCKET
proxy_io_client_accept (SOCKET lsnr_fd)
{
  return proxy_io_accept (lsnr_fd);
}
#endif

int
proxy_io_initialize (void)
{
  int error;

#if defined(LINUX)
  max_Socket = proxy_get_max_socket ();
  assert (max_Socket > PROXY_RESERVED_FD);

  /* create epoll */
  ep_Fd = epoll_create (max_Socket);
  if (ep_Fd == INVALID_SOCKET)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to create epoll fd. (errno=%d[%s])", errno, strerror (errno));
      return -1;
    }

  ep_Event = (epoll_event *) calloc (max_Socket, sizeof (struct epoll_event));
  if (ep_Event == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory for epoll event. (error:%d[%s]).", errno,
         strerror (errno));
      return -1;
    }

#else /* LINUX */
  /* clear fds */
  FD_ZERO (&allset);
  FD_ZERO (&wnewset);
#endif /* !LINUX */

#if defined(WINDOWS)
  broker_port = shm_as_p->broker_port;

  if (broker_port <= 0)
    {
      return (-1);
    }

  /* make listener for client */
  error = proxy_io_client_lsnr ();
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize Client socket listener. (error:%d).", error);
      return -1;
    }
#else /* WINDOWS */
  /* register to broker */
  error = proxy_io_register_to_broker ();
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to register to broker. (error:%d).", error);
      return -1;
    }
#endif /* !WINDOWS */

  /* make listener for cas */
  error = proxy_io_cas_lsnr ();
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize CAS socket listener. (error:%d).", error);
      return -1;
    }

  error = proxy_socket_io_initialize ();
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize socket entries. (error:%d).", error);
      return error;
    }

  /* initialize client io */
  error = proxy_client_io_initialize ();
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize client entries. (error:%d).", error);
      return error;
    }

  /* initialize shard/cas io */
  error = proxy_shard_io_initialize ();
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize shard/CAS entries. (error:%d).", error);
      return error;
    }

  return 0;
}

void
proxy_io_destroy (void)
{
  proxy_io_close_all_fd ();
  proxy_socket_io_destroy ();
  proxy_client_io_destroy ();
  proxy_shard_io_destroy ();

#if defined (LINUX)
  FREE_MEM (ep_Event);
#endif /* LINUX */

  return;
}

int
proxy_io_close_all_fd (void)
{
  int i;
  T_SOCKET_IO *sock_io_p;

  /* close client/cas fd */
  for (i = 0; i < proxy_Socket_io.max_socket; i++)
    {
      sock_io_p = &(proxy_Socket_io.ent[i]);
      if (sock_io_p->fd == INVALID_SOCKET)
    {
      continue;
    }
      CLOSE_SOCKET (sock_io_p->fd);
    }

  /* close cas listen fd */
  if (cas_lsnr_fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (cas_lsnr_fd);
    }

#if defined(WINDOWS)
  /* close client listen fd */
  if (client_lsnr_fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (client_lsnr_fd);
    }
#else /* WINDOWS */
  /* close broker connection fd */
  if (broker_conn_fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (broker_conn_fd);
    }
#endif /* !WINDOWS */

#if defined(LINUX)
  if (ep_Fd != INVALID_SOCKET)
    {
      CLOSE_SOCKET (ep_Fd);
    }
#endif /* LINUX */
  return 0;
}

int
proxy_io_process (void)
{
  int cas_fd;
  int i;
#if defined(WINDOWS)
  int client_fd;
#endif
  int n;

#if defined(LINUX)
  int sock_fd;
  int timeout;
#else /* LINUX */
  struct timeval tv;
#endif /* !LINUX */

  T_SOCKET_IO *sock_io_p = NULL;

#if defined(LINUX)
  timeout = 1000 / HZ;

  n = epoll_wait (ep_Fd, ep_Event, max_Socket, timeout);
#else /* LINUX */
  rset = allset;
  wset = wallset = wnewset;

  tv.tv_sec = 0;
  tv.tv_usec = 1000000 / HZ;
  n = select (maxfd + 1, &rset, &wset, NULL, &tv);
#endif /* !LINUX */
  if (n < 0)
    {
      if (errno != EINTR)
    {
      perror ("select error");
      return -1;
    }
    }
  else if (n == 0)
    {
      /* print_statistics (); */
      return 0;         /* or -1 */
    }

#if defined(LINUX)
  for (i = 0; i < n; i++)
    {
      if (cas_lsnr_fd == ep_Event[i].data.fd)   /* new cas */
    {
      if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
        {
          proxy_Keep_running = false;
          return -1;
        }
      else if (ep_Event[i].events & EPOLLIN || ep_Event[i].events & EPOLLPRI)
        {
          cas_fd = proxy_io_cas_accept (cas_lsnr_fd);
          if (cas_fd >= 0)
        {
          if (proxy_socket_io_add (cas_fd, PROXY_IO_FROM_CAS) == NULL)
            {
              PROXY_DEBUG_LOG ("Close socket. (fd:%d). \n", cas_fd);
              CLOSE_SOCKET (cas_fd);
            }
        }
          else
        {
          PROXY_LOG (PROXY_LOG_MODE_ERROR, "Accept socket failure. (fd:%d).", cas_fd);
          return 0; /* or -1 */
        }

        }
    }
      else if (broker_conn_fd == ep_Event[i].data.fd)   /* new client */
    {
      if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
        {
          proxy_Keep_running = false;
          return -1;
        }
      else if (ep_Event[i].events & EPOLLIN || ep_Event[i].events & EPOLLPRI)
        {
          proxy_socket_io_new_client (broker_conn_fd);
        }
    }
      else
    {
      sock_fd = ep_Event[i].data.fd;
      assert (sock_fd <= proxy_Socket_io.max_socket);

      sock_io_p = &(proxy_Socket_io.ent[sock_fd]);
      if (sock_io_p->fd != sock_fd)
        {
          assert (false);
          continue;
        }

      if ((ep_Event[i].events & EPOLLERR) || (ep_Event[i].events & EPOLLHUP))
        {
          if (ep_Event[i].events & EPOLLIN)
        {
          proxy_socket_io_read (sock_io_p);
        }
          else
        {
          proxy_socket_io_read_error (sock_io_p);
        }
        }
      else
        {
          if (ep_Event[i].events & EPOLLOUT)
        {
          proxy_socket_io_write (sock_io_p);
        }

          if (ep_Event[i].events & EPOLLIN)
        {
          proxy_socket_io_read (sock_io_p);
        }
        }
    }
    }
#else /* LINUX */

  /* new cas */
  if (FD_ISSET (cas_lsnr_fd, &rset))
    {
      cas_fd = proxy_io_cas_accept (cas_lsnr_fd);
      if (cas_fd >= 0)
    {
      if (proxy_socket_io_add (cas_fd, PROXY_IO_FROM_CAS) == NULL)
        {
          PROXY_DEBUG_LOG ("Close socket. (fd:%d). \n", cas_fd);
          CLOSE_SOCKET (cas_fd);
        }
    }
      else
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Accept socket failure. (fd:%d).", cas_fd);
      return 0;     /* or -1 */
    }
    }

  /* new client */
#if defined(WINDOWS)
  if (FD_ISSET (client_lsnr_fd, &rset))
    {
      client_fd = proxy_io_client_accept (client_lsnr_fd);
      if (client_fd < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Accept socket failure. (fd:%d).", client_fd);
      return 0;     /* or -1 */
    }
      proxy_socket_io_new_client (client_fd);
    }
#else /* WINDOWS */
  if (FD_ISSET (broker_conn_fd, &rset))
    {
      proxy_socket_io_new_client (broker_conn_fd);
    }
#endif /* !WINDOWS */

  /* process socket io */
  for (i = 0; i <= maxfd; i++)
    {
      sock_io_p = &(proxy_Socket_io.ent[i]);
      if (IS_INVALID_SOCKET (sock_io_p->fd))
    {
      continue;
    }
      if (!IS_INVALID_SOCKET (sock_io_p->fd) && FD_ISSET (sock_io_p->fd, &wset))
    {
      proxy_socket_io_write (sock_io_p);
    }
      if (!IS_INVALID_SOCKET (sock_io_p->fd) && FD_ISSET (sock_io_p->fd, &rset))
    {
      proxy_socket_io_read (sock_io_p);
    }
    }
#endif /* !LINUX */

  return 0;
}

char *
proxy_get_driver_info_by_ctx (T_PROXY_CONTEXT * ctx_p)
{
  static char dummy_info[SRV_CON_CLIENT_INFO_SIZE] = {
    'C', 'U', 'B', 'R', 'K',
    CAS_CLIENT_JDBC,
    CAS_PROTO_INDICATOR | CURRENT_PROTOCOL,
    (char) BROKER_RENEWED_ERROR_CODE | BROKER_SUPPORT_HOLDABLE_RESULT,
    0,
    0
  };
  T_CLIENT_IO *cli_io_p;

  assert (ctx_p);

  if (ctx_p == NULL)
    {
      return dummy_info;
    }

  cli_io_p = proxy_client_io_find_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);
  if (cli_io_p == NULL)
    {
      return dummy_info;
    }

  return cli_io_p->driver_info;

}

char *
proxy_get_driver_info_by_fd (T_SOCKET_IO * sock_io_p)
{
  static char dummy_info[SRV_CON_CLIENT_INFO_SIZE] = {
    'C', 'U', 'B', 'R', 'K',
    CAS_CLIENT_JDBC,
    CAS_PROTO_INDICATOR | CURRENT_PROTOCOL,
    (char) BROKER_RENEWED_ERROR_CODE | BROKER_SUPPORT_HOLDABLE_RESULT,
    0,
    0
  };
  T_CLIENT_IO *cli_io_p;

  assert (sock_io_p);

  if (sock_io_p == NULL)
    {
      return dummy_info;
    }

  cli_io_p = proxy_client_io_find_by_fd (sock_io_p->id.client_id, sock_io_p->fd);
  if (cli_io_p == NULL)
    {
      return dummy_info;
    }

  return cli_io_p->driver_info;

}

void
proxy_available_cas_wait_timer (void)
{
  T_SHARD_IO *shard_io_p;
  T_SHARD_INFO *shard_info_p = NULL;
  T_WAIT_CONTEXT *waiter_p;
  int i, now;

  now = time (NULL);

  for (i = 0; i < proxy_Shard_io.max_shard; i++)
    {
      shard_io_p = &(proxy_Shard_io.ent[i]);
      proxy_waiter_timeout (&shard_io_p->waitq, (shard_info_p != NULL) ? &shard_info_p->waiter_count : NULL, now);

      waiter_p = (T_WAIT_CONTEXT *) shard_queue_peek_value (&shard_io_p->waitq);
      if (waiter_p == NULL)
    {
      shard_info_p = shard_shm_find_shard_info (proxy_info_p, shard_io_p->shard_id);
      if ((shard_info_p != NULL) && (shard_info_p->waiter_count > 0))
        {
          shard_info_p->waiter_count = 0;
          PROXY_DEBUG_LOG ("Clear shard(%d) waiter queue by timer.", shard_io_p->shard_id);
        }
    }
    }

  return;
}

static void
proxy_set_conn_info (int func_code, int ctx_cid, int ctx_uid, int shard_id, int cas_id)
{
  T_PROXY_CONTEXT *ctx_p = NULL;
  T_APPL_SERVER_INFO *as_info_p = NULL;

  ctx_p = proxy_context_find (ctx_cid, ctx_uid);
  assert (ctx_p != NULL);

  as_info_p = shard_shm_get_as_info (proxy_info_p, shm_as_p, shard_id, cas_id);
  assert (as_info_p != NULL);

  if (func_code == CAS_FC_CHECK_CAS)
    {
      as_info_p->force_reconnect = true;
    }

  if (strcasecmp (as_info_p->database_user, ctx_p->database_user) == 0
      && strcmp (as_info_p->database_passwd, ctx_p->database_passwd) == 0)
    {
      return;
    }

  /* this cas will reconnect to database. */
  shard_stmt_del_all_srv_h_id_for_shard_cas (shard_id, cas_id);

  strncpy_bufsize (as_info_p->database_user, ctx_p->database_user);
  strncpy_bufsize (as_info_p->database_passwd, ctx_p->database_passwd);
}

static T_CAS_IO *
proxy_cas_alloc_by_shard_and_cas_id (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
{
  T_SHARD_IO *shard_io_p = NULL;
  T_CAS_IO *cas_io_p = NULL;

  assert (shard_id >= 0 && cas_id >= 0);

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  if (0 > cas_id || cas_id >= shard_io_p->max_num_cas)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find available CAS. " "(shard_id:%d, cas_id:%d, max_num_cas:%d).",
         shard_id, cas_id, shard_io_p->max_num_cas);
      return NULL;
    }
  cas_io_p = &(shard_io_p->ent[cas_id]);

  if ((cas_io_p->ctx_cid != PROXY_INVALID_CONTEXT && cas_io_p->ctx_cid != ctx_cid)
      || (cas_io_p->ctx_uid != 0 && cas_io_p->ctx_uid != ctx_uid))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid CAS status. " "(context id:%d, context uid:%d). CAS(%s). ", ctx_cid,
         ctx_uid, proxy_str_cas_io (cas_io_p));

      assert (false);
      return NULL;
    }

  if (cas_io_p->status != CAS_IO_CONNECTED)
    {
      PROXY_DEBUG_LOG ("Unexpected CAS status. (context id:%d, context uid:%d). " "CAS(%s). ", ctx_cid, ctx_uid,
               proxy_str_cas_io (cas_io_p));
      return NULL;
    }

  if (cas_io_p->ctx_cid == PROXY_INVALID_CONTEXT)
    {
      shard_io_p->num_cas_in_tran++;

      PROXY_DEBUG_LOG ("Shard IO status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
               shard_io_p->shard_id);

      assert (shard_io_p->cur_num_cas >= shard_io_p->num_cas_in_tran);
      assert (cas_io_p->is_in_tran == false);
      assert (cas_io_p->ctx_uid == 0);
    }

  return cas_io_p;
}

static T_CAS_IO *
proxy_find_idle_cas_by_asc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
{
  int i = 0;
  T_CAS_IO *cas_io_p = NULL;
  T_SHARD_IO *shard_io_p = NULL;

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  for (i = 0; i < shard_io_p->cur_num_cas; i++)
    {
      cas_io_p = &(shard_io_p->ent[i]);

      if (cas_io_p->is_in_tran || cas_io_p->status != CAS_IO_CONNECTED)
    {
      continue;
    }

      return cas_io_p;
    }

  return NULL;
}

static T_CAS_IO *
proxy_find_idle_cas_by_desc (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
{
  int i = 0;
  T_CAS_IO *cas_io_p = NULL;
  T_SHARD_IO *shard_io_p = NULL;

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  for (i = shard_io_p->cur_num_cas - 1; i >= 0; i--)
    {
      cas_io_p = &(shard_io_p->ent[i]);

      if (cas_io_p->is_in_tran || cas_io_p->status != CAS_IO_CONNECTED)
    {
      continue;
    }

      return cas_io_p;
    }

  return NULL;
}

static T_CAS_IO *
proxy_find_idle_cas_by_conn_info (int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
{
  int i = 0;
  T_CAS_IO *cas_io_p = NULL;
  T_SHARD_IO *shard_io_p = NULL;
  T_PROXY_CONTEXT *ctx_p = NULL;
  T_APPL_SERVER_INFO *as_info_p = NULL;

  shard_io_p = &(proxy_Shard_io.ent[shard_id]);

  ctx_p = proxy_context_find (ctx_cid, ctx_uid);
  assert (ctx_p != NULL);

  for (i = 0; i < shard_io_p->cur_num_cas; i++)
    {
      as_info_p = shard_shm_get_as_info (proxy_info_p, shm_as_p, shard_id, i);

      cas_io_p = &(shard_io_p->ent[i]);

      if (cas_io_p->is_in_tran || cas_io_p->status != CAS_IO_CONNECTED)
    {
      continue;
    }

      if (strcasecmp (as_info_p->database_user, ctx_p->database_user)
      || strcmp (as_info_p->database_passwd, ctx_p->database_passwd))
    {
      continue;
    }

      return cas_io_p;
    }

  return NULL;
}

static T_CAS_IO *
proxy_cas_alloc_anything (int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid,
              T_FUNC_FIND_CAS function)
{
  int i = 0;
  T_SHARD_IO *shard_io_p = NULL;
  T_CAS_IO *cas_io_p = NULL;
  static int last_shard_id = -1;

  if (shard_id >= 0)
    {
      shard_io_p = &(proxy_Shard_io.ent[shard_id]);

      cas_io_p = function (shard_id, cas_id, ctx_cid, ctx_uid);
    }
  else
    {
      /* select any shard */
      shard_id = last_shard_id;
      for (i = 0; i < proxy_Shard_io.max_shard; i++)
    {
      shard_id = (shard_id + 1) % proxy_Shard_io.max_shard;
      shard_io_p = &(proxy_Shard_io.ent[shard_id]);
      assert (shard_io_p->cur_num_cas >= shard_io_p->num_cas_in_tran);

      if (shard_io_p->cur_num_cas == shard_io_p->num_cas_in_tran)
        {
          continue;
        }

      cas_io_p = function (shard_id, cas_id, ctx_cid, ctx_uid);
      if (cas_io_p != NULL)
        {
          break;
        }
    }

      if (i >= proxy_Shard_io.max_shard)
    {
      PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Unable to find avaiable shard. (index:%d, max_shard:%d).", i,
             proxy_Shard_io.max_shard);

      return NULL;
    }

      last_shard_id = shard_id;

      assert (cas_io_p != NULL);
    }

  if (cas_io_p != NULL)
    {
      shard_io_p->num_cas_in_tran++;

      PROXY_DEBUG_LOG ("Shard status. (num_cas_in_tran=%d, shard_id=%d).", shard_io_p->num_cas_in_tran,
               shard_io_p->shard_id);

      assert (shard_io_p->cur_num_cas >= shard_io_p->num_cas_in_tran);
      assert (cas_io_p->is_in_tran == false);
      assert (cas_io_p->ctx_cid == PROXY_INVALID_CONTEXT);
      assert (cas_io_p->ctx_uid == 0);
      assert (cas_io_p->fd != INVALID_SOCKET);
    }

  return cas_io_p;
}

static int
proxy_check_authorization (T_PROXY_CONTEXT * ctx_p, const char *db_name, const char *db_user, const char *db_passwd)
{
  T_SHARD_USER *user_p = NULL;
  char err_msg[256];

  user_p = shard_metadata_get_shard_user (shm_user_p);
  assert (user_p);

  if (strcasecmp (db_name, user_p->db_name))
    {
      goto authorization_error;
    }

  if (proxy_info_p->fixed_shard_user == false)
    {
      return 0;
    }

  if (strcasecmp (db_user, user_p->db_user) || strcmp (db_passwd, user_p->db_password))
    {
      goto authorization_error;
    }

  return 0;

authorization_error:
  snprintf (err_msg, sizeof (err_msg), "Authorization error.");
  proxy_context_set_error_with_msg (ctx_p, DBMS_ERROR_INDICATOR, CAS_ER_NOT_AUTHORIZED_CLIENT, err_msg);

  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Authentication failure. " "(db_name:[%s], db_user:[%s], db_passwd:[%s]).", db_name,
         db_user, db_passwd);

  return -1;
}

int
proxy_convert_error_code (int error_ind, int error_code, char *driver_info, T_BROKER_VERSION client_version,
              bool to_new)
{
  if (error_code >= 0)
    {
      assert (error_code == 0);
      return error_code;
    }

  if (client_version < CAS_MAKE_VER (8, 3, 0))
    {
      if (to_new == PROXY_CONV_ERR_TO_NEW)
    {
      error_code = CAS_CONV_ERROR_TO_NEW (error_code);
    }
      else
    {
      error_code = CAS_CONV_ERROR_TO_OLD (error_code);
    }
    }
  else if (!DOES_CLIENT_MATCH_THE_PROTOCOL (client_version, PROTOCOL_V2)
       && !cas_di_understand_renewed_error_code (driver_info) && error_code != NO_ERROR)
    {
      if (error_ind == CAS_ERROR_INDICATOR || error_code == CAS_ER_NOT_AUTHORIZED_CLIENT)
    {
      if (to_new == PROXY_CONV_ERR_TO_NEW)
        {
          error_code = CAS_CONV_ERROR_TO_NEW (error_code);
        }
      else
        {
          error_code = CAS_CONV_ERROR_TO_OLD (error_code);
        }
    }
    }

  return error_code;
}

int
proxy_context_direct_send_error (T_PROXY_CONTEXT * ctx_p)
{
  T_CLIENT_IO *cli_io_p = NULL;
  T_SOCKET_IO *sock_io_p = NULL;

  cli_io_p = proxy_client_io_find_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);
  if (cli_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find client. " "(client_id:%d, context id:%d, context uid:%u).",
         ctx_p->client_id, ctx_p->cid, ctx_p->uid);

      return -1;
    }

  sock_io_p = proxy_socket_io_find (cli_io_p->fd);
  if (sock_io_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find socket entry. (fd:%d).", cli_io_p->fd);
      return -1;
    }

  proxy_socket_io_write (sock_io_p);
  return 0;
}

#if defined(LINUX)
static int
proxy_get_max_socket (void)
{
  int max_socket = 0;
  T_SHARD_INFO *first_shard_info_p;

  first_shard_info_p = shard_shm_find_shard_info (proxy_info_p, 0);
  assert (first_shard_info_p != NULL);

  max_socket = proxy_info_p->max_context;
  max_socket += (proxy_info_p->max_shard * first_shard_info_p->max_appl_server);

  return max_socket;
}

static int
proxy_add_epoll_event (int fd, unsigned int events)
{
  int error;
  struct epoll_event ep_ev;

  assert (ep_Fd != INVALID_SOCKET);

  memset (&ep_ev, 0, sizeof (struct epoll_event));
  ep_ev.data.fd = fd;
  ep_ev.events = events;
  error = epoll_ctl (ep_Fd, EPOLL_CTL_ADD, fd, &ep_ev);
  if (error == -1)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to add epoll event. (fd:%d), (error=%d[%s])", fd, errno,
         strerror (errno));
      CLOSE_SOCKET (fd);
      return error;
    }

  return error;
}

static int
proxy_mod_epoll_event (int fd, unsigned int events)
{
  int error;
  struct epoll_event ep_ev;

  assert (ep_Fd != INVALID_SOCKET);

  memset (&ep_ev, 0, sizeof (struct epoll_event));
  ep_ev.data.fd = fd;
  ep_ev.events = events;
  error = epoll_ctl (ep_Fd, EPOLL_CTL_MOD, fd, &ep_ev);
  if (error == -1)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to modify epoll event. (fd:%d), (error=%d[%s])", fd, errno,
         strerror (errno));
      CLOSE_SOCKET (fd);
      return error;
    }

  return error;
}

static int
proxy_del_epoll_event (int fd)
{
  int error;
  struct epoll_event ep_ev;

  assert (ep_Fd != INVALID_SOCKET);

  memset (&ep_ev, 0, sizeof (struct epoll_event));
  ep_ev.data.fd = INVALID_SOCKET;
  /* events will be ignored, and it is only for portability */
  ep_ev.events = 0;
  error = epoll_ctl (ep_Fd, EPOLL_CTL_DEL, fd, &ep_ev);
  if (error == -1)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to delete epoll event. (fd:%d), (error=%d[%s])", fd, errno,
         strerror (errno));
      CLOSE_SOCKET (fd);
      return error;
    }

  return error;

}
#endif /* LINUX */