Skip to content

File shard_proxy_handler.c

File List > broker > shard_proxy_handler.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * shard_proxy_handler.c
 */

#ident "$Id$"


#include <assert.h>

#if defined(WINDOWS)
#include "porting.h"
#endif /* WINDOWS */

#include "broker_config.h"
#include "broker_shm.h"
#include "shard_proxy_common.h"
#include "cas_protocol.h"
#include "cas_error.h"
#include "shard_shm.h"

#include "shard_proxy_handler.h"
#include "shard_proxy_io.h"
#include "shard_proxy_queue.h"
#include "shard_parser.h"
#include "shard_proxy_function.h"

#define PROXY_MAX_IGNORE_TIMER_CHECK    10
#define PROXY_TIMER_CHECK_INTERVAL  1   /* sec */

extern T_SHM_APPL_SERVER *shm_as_p;
extern T_SHM_PROXY *shm_proxy_p;
extern T_PROXY_INFO *proxy_info_p;

SP_PARSER_CTX *parser = NULL;

T_PROXY_HANDLER proxy_Handler;
T_PROXY_CONTEXT_GLOBAL proxy_Context;

static void proxy_handler_process_cas_response (T_PROXY_EVENT * event_p);
static int proxy_handler_process_cas_error (T_PROXY_EVENT * event_p);
static void proxy_handler_process_cas_conn_error (T_PROXY_EVENT * event_p);
static void proxy_handler_process_cas_event (T_PROXY_EVENT * event_p);
static void proxy_handler_process_client_request (T_PROXY_EVENT * event_p);
static void proxy_handler_process_client_conn_error (T_PROXY_EVENT * event_p);
static void proxy_handler_process_client_wakeup_by_shard (T_PROXY_EVENT * event_p);
static void proxy_handler_process_client_wakeup_by_statement (T_PROXY_EVENT * event_p);
static void proxy_handler_process_client_event (T_PROXY_EVENT * event_p);

static void proxy_context_clear (T_PROXY_CONTEXT * ctx_p);
static void proxy_context_free_client (T_PROXY_CONTEXT * ctx_p);
static void proxy_context_free_shard (T_PROXY_CONTEXT * ctx_p);

static int proxy_context_initialize (void);
static void proxy_context_destroy (void);

static T_PROXY_CLIENT_FUNC proxy_client_fn_table[] = {
  fn_proxy_client_end_tran, /* fn_end_tran */
  fn_proxy_client_prepare,  /* fn_prepare */
  fn_proxy_client_execute,  /* fn_execute */
  fn_proxy_client_get_db_parameter, /* fn_get_db_parameter */
  fn_proxy_client_set_db_parameter, /* fn_set_db_parameter */
  fn_proxy_client_close_req_handle, /* fn_close_req_handle */
  fn_proxy_client_cursor,   /* fn_cursor */
  fn_proxy_client_fetch,    /* fn_fetch */
  fn_proxy_client_schema_info,  /* fn_schema_info */
  fn_proxy_client_not_supported,    /* fn_oid_get */
  fn_proxy_client_not_supported,    /* fn_oid_put */
  fn_proxy_client_not_supported,    /* fn_deprecated */
  fn_proxy_client_not_supported,    /* fn_deprecated */
  fn_proxy_client_not_supported,    /* fn_deprecated */
  fn_proxy_client_get_db_version,   /* fn_get_db_version */
  fn_proxy_client_not_supported,    /* fn_get_class_num_objs */
  fn_proxy_client_not_supported,    /* fn_oid */
  fn_proxy_client_not_supported,    /* fn_collection */
  fn_proxy_client_not_supported,    /* fn_next_result */
  fn_proxy_client_not_supported,    /* fn_execute_batch */
  fn_proxy_client_execute_array,    /* fn_execute_array */
  fn_proxy_client_not_supported,    /* fn_cursor_update */
  fn_proxy_client_not_supported,    /* fn_get_attr_type_str */
  fn_proxy_client_not_supported,    /* fn_get_query_info */
  fn_proxy_client_not_supported,    /* fn_deprecated */
  fn_proxy_client_not_supported,    /* fn_savepoint */
  fn_proxy_client_not_supported,    /* fn_parameter_info */
  fn_proxy_client_not_supported,    /* fn_xa_prepare */
  fn_proxy_client_not_supported,    /* fn_xa_recover */
  fn_proxy_client_not_supported,    /* fn_xa_end_tran */
  fn_proxy_client_con_close,    /* fn_con_close */
  fn_proxy_client_check_cas,    /* fn_check_cas */
  fn_proxy_client_not_supported,    /* fn_make_out_rs */
  fn_proxy_client_not_supported,    /* fn_get_generated_keys */
  fn_proxy_client_not_supported,    /* fn_lob_new */
  fn_proxy_client_not_supported,    /* fn_lob_write */
  fn_proxy_client_not_supported,    /* fn_lob_read */
  fn_proxy_client_not_supported,    /* fn_end_session */
  fn_proxy_client_not_supported,    /* fn_get_row_count */
  fn_proxy_client_not_supported,    /* fn_get_last_insert_id */
  fn_proxy_client_prepare_and_execute,  /* fn_prepare_and_execute */
  fn_proxy_client_cursor_close, /* fn_cursor_close */
  fn_proxy_get_shard_info   /* fn_get_shard_info */
};


static T_PROXY_CAS_FUNC proxy_cas_fn_table[] = {
  fn_proxy_cas_end_tran,    /* fn_end_tran */
  fn_proxy_cas_prepare,     /* fn_prepare */
  fn_proxy_cas_execute,     /* fn_execute */
  fn_proxy_cas_relay_only,  /* fn_get_db_parameter */
  fn_proxy_cas_relay_only,  /* fn_set_db_parameter */
  fn_proxy_cas_relay_only,  /* fn_close_req_handle */
  fn_proxy_cas_relay_only,  /* fn_cursor */
  fn_proxy_cas_fetch,       /* fn_fetch */
  fn_proxy_cas_schema_info, /* fn_schema_info */
  fn_proxy_cas_relay_only,  /* fn_oid_get */
  fn_proxy_cas_relay_only,  /* fn_oid_put */
  fn_proxy_cas_relay_only,  /* fn_deprecated */
  fn_proxy_cas_relay_only,  /* fn_deprecated */
  fn_proxy_cas_relay_only,  /* fn_deprecated */
  fn_proxy_cas_relay_only,  /* fn_get_db_version */
  fn_proxy_cas_relay_only,  /* fn_get_class_num_objs */
  fn_proxy_cas_relay_only,  /* fn_oid */
  fn_proxy_cas_relay_only,  /* fn_collection */
  fn_proxy_cas_relay_only,  /* fn_next_result */
  fn_proxy_cas_relay_only,  /* fn_execute_batch */
  fn_proxy_cas_execute_array,   /* fn_execute_array */
  fn_proxy_cas_relay_only,  /* fn_cursor_update */
  fn_proxy_cas_relay_only,  /* fn_get_attr_type_str */
  fn_proxy_cas_relay_only,  /* fn_get_query_info */
  fn_proxy_cas_relay_only,  /* fn_deprecated */
  fn_proxy_cas_relay_only,  /* fn_savepoint */
  fn_proxy_cas_relay_only,  /* fn_parameter_info */
  fn_proxy_cas_relay_only,  /* fn_xa_prepare */
  fn_proxy_cas_relay_only,  /* fn_xa_recover */
  fn_proxy_cas_relay_only,  /* fn_xa_end_tran */
  fn_proxy_cas_relay_only,  /* fn_con_close */
  fn_proxy_cas_check_cas,   /* fn_check_cas */
  fn_proxy_cas_relay_only,  /* fn_make_out_rs */
  fn_proxy_cas_relay_only,  /* fn_get_generated_keys */
  fn_proxy_cas_relay_only,  /* fn_lob_new */
  fn_proxy_cas_relay_only,  /* fn_lob_write */
  fn_proxy_cas_relay_only,  /* fn_lob_read */
  fn_proxy_cas_relay_only,  /* fn_end_session */
  fn_proxy_cas_relay_only,  /* fn_get_row_count */
  fn_proxy_cas_relay_only,  /* fn_get_last_insert_id */
  fn_proxy_cas_prepare_and_execute, /* fn_prepare_and_execute */
  fn_proxy_cas_relay_only,  /* fn_cursor_close */
  fn_proxy_cas_relay_only   /* fn_get_shard_info */
};


T_WAIT_CONTEXT *
proxy_waiter_new (int ctx_cid, unsigned int ctx_uid, int timeout)
{
  T_WAIT_CONTEXT *n;

  n = (T_WAIT_CONTEXT *) malloc (sizeof (T_WAIT_CONTEXT));
  if (n)
    {
      n->ctx_cid = ctx_cid;
      n->ctx_uid = ctx_uid;
      if (timeout <= 0)
    {
      n->expire_time = INT_MAX;
    }
      else
    {
      n->expire_time = time (NULL) + timeout;
    }
    }
  return n;
}

void
proxy_waiter_free (T_WAIT_CONTEXT * waiter)
{
  assert (waiter);

  FREE_MEM (waiter);
}

void
proxy_waiter_timeout (T_SHARD_QUEUE * waitq, INT64 * counter, int now)
{
  T_PROXY_CONTEXT *ctx_p;
  T_WAIT_CONTEXT *waiter_p;

  while (1)
    {
      waiter_p = (T_WAIT_CONTEXT *) shard_queue_peek_value (waitq);
      if (waiter_p == NULL)
    {
      break;
    }

      if (waiter_p->expire_time >= now)
    {
      break;
    }

      waiter_p = (T_WAIT_CONTEXT *) shard_queue_dequeue (waitq);
      assert (waiter_p != NULL);

      if (counter != NULL && *counter > 0)
    {
      (*counter)--;
      PROXY_DEBUG_LOG ("Waiter timeout. (counter:%d).", *counter);
    }

      ctx_p = proxy_context_find (waiter_p->ctx_cid, waiter_p->ctx_uid);
      if (ctx_p == NULL)
    {
      /* context was freed already */
      proxy_waiter_free (waiter_p);
      waiter_p = NULL;
      continue;
    }

      proxy_context_timeout (ctx_p);

      proxy_waiter_free (waiter_p);
      waiter_p = NULL;
    }

  return;
}

int
proxy_waiter_comp_fn (const void *arg1, const void *arg2)
{
  T_WAIT_CONTEXT *l, *r;

  l = (T_WAIT_CONTEXT *) arg1;
  r = (T_WAIT_CONTEXT *) arg2;

  return (l->expire_time - r->expire_time);
}

bool
proxy_handler_is_cas_in_tran (int shard_id, int cas_id)
{
  T_APPL_SERVER_INFO *as_info = NULL;

  assert (shard_id >= 0);
  assert (cas_id >= 0);

  as_info = shard_shm_get_as_info (proxy_info_p, shm_as_p, shard_id, cas_id);
  if (as_info)
    {
      if (as_info->con_status == CON_STATUS_IN_TRAN || as_info->num_holdable_results > 0
      || as_info->cas_change_mode == CAS_CHANGE_MODE_KEEP)
    {
      return true;
    }
      else
    {
      return false;
    }
    }

  return false;
}

void
proxy_context_set_error (T_PROXY_CONTEXT * ctx_p, int error_ind, int error_code)
{
  assert (ctx_p);

  assert (ctx_p->error_ind == CAS_NO_ERROR);
  assert (ctx_p->error_code == CAS_NO_ERROR);
  assert (ctx_p->error_msg[0] == '\0');

  ctx_p->error_ind = error_ind;
  ctx_p->error_code = error_code;
  ctx_p->error_msg[0] = '\0';

  return;
}

void
proxy_context_set_error_with_msg (T_PROXY_CONTEXT * ctx_p, int error_ind, int error_code, const char *error_msg)
{
  proxy_context_set_error (ctx_p, error_ind, error_code);
  snprintf (ctx_p->error_msg, sizeof (ctx_p->error_msg), error_msg);

  return;
}

void
proxy_context_clear_error (T_PROXY_CONTEXT * ctx_p)
{
  assert (ctx_p);

  ctx_p->error_ind = CAS_NO_ERROR;
  ctx_p->error_code = CAS_NO_ERROR;
  ctx_p->error_msg[0] = '\0';

  return;
}

int
proxy_context_send_error (T_PROXY_CONTEXT * ctx_p)
{
  int error;
  T_PROXY_EVENT *event_p;
  T_CLIENT_INFO *client_info_p = NULL;
  char *driver_info;

  ENTER_FUNC ();

  assert (ctx_p->error_ind != CAS_NO_ERROR);
  assert (ctx_p->error_code != CAS_NO_ERROR);

  proxy_info_p->num_proxy_error_processed++;

  /* reset request and response timeout */
  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
  if (client_info_p != NULL)
    {
      shard_shm_init_client_info_request (client_info_p);
    }

  driver_info = proxy_get_driver_info_by_ctx (ctx_p);

  event_p =
    proxy_event_new_with_error (driver_info, PROXY_EVENT_IO_WRITE, PROXY_EVENT_FROM_CLIENT, proxy_io_make_error_msg,
                ctx_p->error_ind, ctx_p->error_code, ctx_p->error_msg, ctx_p->is_client_in_tran);
  if (event_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make error message. " "context(%s).", proxy_str_context (ctx_p));
      goto error_return;
    }

  error = proxy_send_response_to_client (ctx_p, event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send response " "to the client. (error:%d). context(%s). event(%s). ",
         error, proxy_str_context (ctx_p), proxy_str_event (event_p));
      goto error_return;
    }
  event_p = NULL;

  EXIT_FUNC ();
  return 0;

error_return:

  if (event_p)
    {
      proxy_event_free (event_p);
      event_p = NULL;
    }

  ctx_p->free_context = true;
  return -1;
}

static void
proxy_handler_process_cas_response (T_PROXY_EVENT * event_p)
{
  int error = NO_ERROR;
  int error_ind;
  char *response_p;
  T_PROXY_CONTEXT *ctx_p = NULL;
  T_CLIENT_INFO *client_info_p;
  char func_code;

  T_PROXY_CAS_FUNC proxy_cas_fn;

  ENTER_FUNC ();

  assert (event_p->from_cas == PROXY_EVENT_FROM_CAS);

  response_p = event_p->buffer.data;
  assert (response_p);

  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;

      EXIT_FUNC ();
      return;
    }

  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);

  error_ind = proxy_check_cas_error (response_p);
  if (error_ind < 0)
    {
      error = proxy_handler_process_cas_error (event_p);
      if (error)
    {
      proxy_event_free (event_p);
      event_p = NULL;
      goto end;
    }
    }

  func_code = ctx_p->func_code;
  if (func_code <= 0 || func_code >= CAS_FC_MAX)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported function code. " "(func_code:%d). context(%s).", func_code,
         proxy_str_context (ctx_p));
      /*
       * 1*) drop unexpected messages from cas ?
       * 2) free context ?
       */
      proxy_event_free (event_p);
      goto end;
    }

  PROXY_DEBUG_LOG ("process cas response. (func_code:%d, context:%s)", func_code, proxy_str_context (ctx_p));

  proxy_cas_fn = proxy_cas_fn_table[func_code - 1];
  error = proxy_cas_fn (ctx_p, event_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Error returned. (CAS function, func_code:%d, error:%d).", func_code, error);

      event_p = NULL;
      goto end;
    }
  event_p = NULL;

  ctx_p->is_cas_in_tran = proxy_handler_is_cas_in_tran (ctx_p->shard_id, ctx_p->cas_id);
  ctx_p->is_in_tran = ctx_p->is_cas_in_tran;
  if (ctx_p->is_in_tran == false)
    {
      /* release shard/cas */
      proxy_cas_release_by_ctx (ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid);

      proxy_context_set_out_tran (ctx_p);
      ctx_p->prepared_stmt = NULL;
      ctx_p->stmt_h_id = SHARD_STMT_INVALID_HANDLE_ID;

      if (ctx_p->dont_free_statement == false)
    {
      proxy_context_free_stmt (ctx_p);
    }
    }

  PROXY_DEBUG_LOG ("process cas response end. (func_code:%d, context:%s)", func_code, proxy_str_context (ctx_p));

end:
  ctx_p->func_code = PROXY_INVALID_FUNC_CODE;
  ctx_p->wait_timeout = proxy_info_p->wait_timeout;

  if (ctx_p->free_context)
    {
      proxy_context_free (ctx_p);
    }

  shard_shm_set_client_info_response (client_info_p);

  EXIT_FUNC ();
  return;
}

static int
proxy_handler_process_cas_error (T_PROXY_EVENT * event_p)
{
  T_PROXY_CONTEXT *ctx_p;

  ENTER_FUNC ();

  assert (event_p);

  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));

      goto end;
    }

  ctx_p->wait_timeout = proxy_info_p->wait_timeout;
  if (ctx_p->is_in_tran)
    {
      if (ctx_p->func_code)
    {
      /*
       * we should relay error event to the client
       * so, must not call proxy_event_free().
       */
      return 0;
    }
      else
    {
      /* unexpected error, free context */
      /* it may be session timeout error, ... */
      ctx_p->free_context = true;
    }
    }
  else
    {
      /* discard unexpected error */
    }

end:
  proxy_event_free (event_p);

  EXIT_FUNC ();
  return -1;
}

static void
proxy_handler_process_cas_conn_error (T_PROXY_EVENT * event_p)
{
  int error;
  T_PROXY_CONTEXT *ctx_p;

  ENTER_FUNC ();

  assert (event_p->from_cas == PROXY_EVENT_FROM_CAS);

  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));
      goto end;
    }

  PROXY_DEBUG_LOG ("Process CAS connection error. context(%s).", proxy_str_context (ctx_p));

  if (ctx_p->is_in_tran)
    {
      if (ctx_p->waiting_event && ctx_p->is_cas_in_tran == false
      && (ctx_p->func_code == CAS_FC_PREPARE || ctx_p->func_code == CAS_FC_EXECUTE
          || ctx_p->func_code == CAS_FC_PREPARE_AND_EXECUTE || ctx_p->func_code == CAS_FC_CHECK_CAS))
    {
      PROXY_DEBUG_LOG ("Context is in_tran status " "and waiting prepare/execute response. "
               "retransmit reqeust to the CAS. context(%s).", proxy_str_context (ctx_p));
      /* release this shard/cas */
      proxy_context_free_shard (ctx_p);

      /* set transaction status 'out_tran' */
      proxy_context_set_out_tran (ctx_p);

      /* retry previous request */
      error = shard_queue_enqueue (&proxy_Handler.cli_ret_q, (void *) ctx_p->waiting_event);
      if (error)
        {
          assert (false);

          proxy_context_free (ctx_p);
          goto end;
        }

      /* reset waiting event */
      ctx_p->waiting_event = NULL;
    }
      else
    {
      PROXY_DEBUG_LOG ("context is in_tran status. " "and function code %d, " "cas_is_in_ran %s, "
               "waiting_event %p.", ctx_p->func_code, (ctx_p->is_cas_in_tran) ? "in_tran" : "out_tran",
               ctx_p->waiting_event);

      /* TODO : send error to the client */
      proxy_context_free (ctx_p);
    }
    }
  else
    {
      PROXY_DEBUG_LOG ("context is out_tran status.");
      proxy_context_free (ctx_p);
    }

end:
  proxy_event_free (event_p);
  event_p = NULL;

  EXIT_FUNC ();
  return;
}

static void
proxy_handler_process_cas_event (T_PROXY_EVENT * event_p)
{
  ENTER_FUNC ();

  assert (event_p);

  switch (event_p->type)
    {
    case PROXY_EVENT_CAS_RESPONSE:
      proxy_handler_process_cas_response (event_p);
      break;

    case PROXY_EVENT_CAS_CONN_ERROR:
      proxy_handler_process_cas_conn_error (event_p);
      break;

    default:
      assert (false);
      break;
    }

  EXIT_FUNC ();
  return;
}

static void
proxy_handler_process_client_request (T_PROXY_EVENT * event_p)
{
  int error = NO_ERROR;
  char *request_p;
  char *payload;
  int length;
  int argc;
  char **argv = NULL;
  char func_code;
  T_PROXY_CONTEXT *ctx_p;
  T_CLIENT_INFO *client_info_p;
  T_BROKER_VERSION client_version;
  char *driver_info;

  T_PROXY_CLIENT_FUNC proxy_client_fn;

  ENTER_FUNC ();

  assert (event_p);
  assert (event_p->from_cas == PROXY_EVENT_FROM_CLIENT);

  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));

      proxy_event_free (event_p);

      EXIT_FUNC ();
      return;
    }
  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);

  request_p = event_p->buffer.data;
  assert (request_p);

  payload = request_p + MSG_HEADER_SIZE;
  length = get_data_length (request_p);

  argc = net_decode_str (payload, length, &func_code, (void ***) (&argv));
  if (argc < 0)
    {
      error = ER_FAILED;

      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). event(%s).", argc, proxy_str_event (event_p));

      proxy_event_free (event_p);
      goto end;
    }

  if (func_code <= 0 || func_code >= CAS_FC_MAX)
    {
      error = ER_FAILED;

      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported function code. " "(func_code:%d). context(%s).", func_code,
         proxy_str_context (ctx_p));

      proxy_event_free (event_p);
      goto end;
    }

  /* SHARD TODO : fix cci/jdbc */
  proxy_unset_force_out_tran (request_p);

  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
  client_version = CAS_MAKE_PROTO_VER (driver_info);
  if (DOES_CLIENT_MATCH_THE_PROTOCOL (client_version, PROTOCOL_V2))
    {
      switch (func_code)
    {
    case CAS_FC_PREPARE_AND_EXECUTE:
      func_code = CAS_FC_PREPARE_AND_EXECUTE_FOR_PROTO_V2;
      break;
    case CAS_FC_CURSOR_CLOSE:
      func_code = CAS_FC_CURSOR_CLOSE_FOR_PROTO_V2;
      break;
    default:
      break;
    }
    }

  shard_shm_set_client_info_request (client_info_p, func_code);

  PROXY_LOG (PROXY_LOG_MODE_DEBUG, "process client request. (func_code:%d, context:%s)", func_code,
         proxy_str_context (ctx_p));

  proxy_client_fn = proxy_client_fn_table[func_code - 1];
  error = proxy_client_fn (ctx_p, event_p, argc, argv);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Error returned. (client function, func_code:%d, error:%d).", func_code, error);

      event_p = NULL;
      goto end;
    }
  event_p = NULL;

end:

  if (ctx_p->error_ind)
    {
      proxy_context_send_error (ctx_p);
      proxy_context_clear_error (ctx_p);
    }

  if (ctx_p->free_context)
    {
      if (proxy_context_direct_send_error (ctx_p) < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send error message");
    }
      proxy_context_free (ctx_p);
    }

  if (error)
    {
      shard_shm_init_client_info_request (client_info_p);
    }

  if (argv)
    {
      FREE_MEM (argv);
    }

  EXIT_FUNC ();

  return;
}

static void
proxy_handler_process_client_conn_error (T_PROXY_EVENT * event_p)
{
  int error;
  T_PROXY_CONTEXT *ctx_p;

  ENTER_FUNC ();

  assert (event_p->from_cas == PROXY_EVENT_FROM_CLIENT);

  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));

      proxy_event_free (event_p);
      event_p = NULL;
      EXIT_FUNC ();
      return;
    }

  error = fn_proxy_client_conn_error (ctx_p);
  if (error)
    {
      ;
    }

  proxy_event_free (event_p);
  event_p = NULL;

  if (ctx_p->free_context)
    {
      proxy_context_free (ctx_p);
    }

  EXIT_FUNC ();
  return;
}

static void
proxy_handler_process_client_wakeup_by_shard (T_PROXY_EVENT * event_p)
{
  T_PROXY_EVENT *waiting_event;
  T_PROXY_CONTEXT *ctx_p;
  T_CLIENT_INFO *client_info_p;

  ENTER_FUNC ();

  assert (event_p->from_cas == PROXY_EVENT_FROM_CLIENT);

  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
  if (ctx_p == NULL)
    {
      proxy_event_free (event_p);
      event_p = NULL;

      EXIT_FUNC ();
      return;
    }

  /* set in_tran, shard/cas */
  proxy_context_set_in_tran (ctx_p, event_p->shard_id, event_p->cas_id);

  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
  if (client_info_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_p->cid,
         ctx_p->uid);
    }
  else
    {
      if (shard_shm_set_as_client_info
      (proxy_info_p, shm_as_p, event_p->shard_id, event_p->cas_id, client_info_p->client_ip,
       client_info_p->driver_info, client_info_p->driver_info) == false)
    {

      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS info in shared memory. " "(shard_id:%d, cas_id:%d).",
             event_p->shard_id, event_p->cas_id);
    }
      assert (CAS_MAKE_PROTO_VER (client_info_p->driver_info) != 0);
    }

  /* retry */
  waiting_event = ctx_p->waiting_event;
  ctx_p->waiting_event = NULL;

  proxy_handler_process_client_request (waiting_event); /* DO NOT MOVE */

  proxy_event_free (event_p);
  event_p = NULL;

  EXIT_FUNC ();
  return;
}

static void
proxy_handler_process_client_wakeup_by_statement (T_PROXY_EVENT * event_p)
{
  T_PROXY_EVENT *waiting_event;
  T_PROXY_CONTEXT *ctx_p;

  ENTER_FUNC ();

  assert (event_p->from_cas == PROXY_EVENT_FROM_CLIENT);

  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
  if (ctx_p == NULL)
    {
      proxy_event_free (event_p);
      event_p = NULL;

      EXIT_FUNC ();
      return;
    }

  /* retry */
  waiting_event = ctx_p->waiting_event;
  ctx_p->waiting_event = NULL;

  proxy_handler_process_client_request (waiting_event); /* DO NOT MOVE */

  proxy_event_free (event_p);
  event_p = NULL;

  EXIT_FUNC ();
  return;
}

static void
proxy_handler_process_client_event (T_PROXY_EVENT * event_p)
{
  ENTER_FUNC ();

  assert (event_p);

  switch (event_p->type)
    {
    case PROXY_EVENT_CLIENT_REQUEST:
      proxy_handler_process_client_request (event_p);
      break;

    case PROXY_EVENT_CLIENT_CONN_ERROR:
      proxy_handler_process_client_conn_error (event_p);
      break;

    case PROXY_EVENT_CLIENT_WAKEUP_BY_SHARD:
      proxy_handler_process_client_wakeup_by_shard (event_p);
      break;

    case PROXY_EVENT_CLIENT_WAKEUP_BY_STATEMENT:
      proxy_handler_process_client_wakeup_by_statement (event_p);
      break;

    default:
      assert (false);
      break;
    }

  EXIT_FUNC ();
  return;
}

static int
proxy_context_initialize (void)
{
  int error;
  int i, size;

  T_PROXY_CONTEXT *ctx_p;

  static bool is_init = false;

  if (is_init == true)
    {
      return 0;
    }

  proxy_Context.size = shm_proxy_p->max_context;

  error = shard_cqueue_initialize (&proxy_Context.freeq, proxy_Context.size);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize context free queue. " "(error:%d).", error);

      goto error_return;
    }

  size = proxy_Context.size * sizeof (T_PROXY_CONTEXT);
  proxy_Context.ent = (T_PROXY_CONTEXT *) malloc (size);
  if (proxy_Context.ent == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Not enough virtual memory. " "Failed to alloc context entries. " "(errno:%d, size:%d).", errno, size);
      goto error_return;
    }
  memset (proxy_Context.ent, 0, size);

  for (i = 0; i < proxy_Context.size; i++)
    {
      ctx_p = &(proxy_Context.ent[i]);

      proxy_context_clear (ctx_p);
      ctx_p->cid = i;

      error = shard_cqueue_enqueue (&proxy_Context.freeq, (void *) ctx_p);
      if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize context free queue entries." "(error:%d).", error);
      goto error_return;
    }
    }

  is_init = true;
  return 0;

error_return:
  return -1;
}

static void
proxy_context_destroy (void)
{
  shard_cqueue_destroy (&proxy_Context.freeq);

  FREE_MEM (proxy_Context.ent);

  return;
}

#if defined(PROXY_VERBOSE_DEBUG)
void
proxy_context_dump_stmt (FILE * fp, T_PROXY_CONTEXT * ctx_p)
{
  T_CONTEXT_STMT *stmt_list_p;

  fprintf (fp, "* STMT_LIST: ");
  for (stmt_list_p = ctx_p->stmt_list; stmt_list_p; stmt_list_p = stmt_list_p->next)
    {
      fprintf (fp, "%-10d ", stmt_list_p->stmt_h_id);
    }
  fprintf (fp, "\n");

  return;
}

void
proxy_context_dump_title (FILE * fp)
{
  fprintf (fp,
       "%-5s %-10s %-10s %-5s %-5s " "%-5s %-5s %-15s " "%-15s %-15s %-15s " "%-10s %-10s %-10s " "%-10s %-10s \n",
       "CID", "UID", "CLIENT", "SHARD", "CAS", "BUSY", "IN_TRAN", "PREPARE_FOR_EXEC", "FREE_ON_END_TRAN",
       "FREE_CONTEXT", "CLIENT_END_TRAN", "FUNC_CODE", "STMT_H_ID", "STMT_HINT_TYPE", "ERROR_IND", "ERROR_CODE");

  return;
}


void
proxy_context_dump (FILE * fp, T_PROXY_CONTEXT * ctx_p)
{
  fprintf (fp,
       "%-5d %-10u %-10d %-5d %-5d " "%-5s %-5s %-15s " "%-15s %-15s %-15s %-15s " "%-10d %-10d %-10d "
       "%-10d %-10d \n", ctx_p->cid, ctx_p->uid, ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id,
       (ctx_p->is_busy) ? "YES" : "NO", (ctx_p->is_in_tran) ? "YES" : "NO",
       (ctx_p->is_prepare_for_execute) ? "YES" : "NO", (ctx_p->free_on_end_tran) ? "YES" : "NO",
       (ctx_p->free_on_client_io_write) ? "YES" : "NO", (ctx_p->free_context) ? "YES" : "NO",
       (ctx_p->is_client_in_tran) ? "YES" : "NO", ctx_p->func_code, ctx_p->stmt_h_id, ctx_p->stmt_hint_type,
       ctx_p->error_ind, ctx_p->error_code);

  proxy_context_dump_stmt (fp, ctx_p);

  return;
}

void
proxy_context_dump_all (FILE * fp)
{
  T_PROXY_CONTEXT *ctx_p;
  int i;

  fprintf (fp, "\n");
  fprintf (fp, "* %-20s : %d\n", "SIZE", proxy_Context.size);
  fprintf (fp, "\n");

  if (proxy_Context.ent == NULL)
    {
      return;
    }

  proxy_context_dump_title (fp);
  for (i = 0; i < proxy_Context.size; i++)
    {
      ctx_p = &(proxy_Context.ent[i]);
      proxy_context_dump (fp, ctx_p);
    }
  return;
}

void
proxy_context_print (bool print_all)
{
  T_PROXY_CONTEXT *ctx_p;
  int i;

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* CONTEXT *");
  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "size", proxy_Context.size);

  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s    %-5s %-10s %-5s %-5s %-8s " "%-10s %-10s %-10s %-10s %-10s",
         "idx", "cid", "uid", "busy", "wait", "in_tran", "client_id", "shard_id", "cas_id", "func_code", "stmt_id");
  if (proxy_Context.ent)
    {
      for (i = 0; i < proxy_Context.size; i++)
    {
      ctx_p = &(proxy_Context.ent[i]);

      if (!print_all && !ctx_p->is_busy)
        {
          continue;
        }
      PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d]    %-5d %-10u %-5s %-8s " "%-10d %-10d %-10d %-10d %-10d",
             i, ctx_p->cid, ctx_p->uid, (ctx_p->is_busy) ? "YES" : "NO", (ctx_p->is_in_tran) ? "YES" : "NO",
             ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->func_code, ctx_p->stmt_h_id);
    }
    }

  return;
}
#endif /* PROXY_VERBOSE_DEBUG */

char *
proxy_str_context (T_PROXY_CONTEXT * ctx_p)
{
  static char buffer[BUFSIZ];

  if (ctx_p == NULL)
    {
      return (char *) "-";
    }

  snprintf (buffer, sizeof (buffer),
        "cid:%d, uid:%u, " "is_busy:%s, is_in_tran:%s, " "is_prepare_for_execute:%s, free_on_end_tran:%s, "
        "free_on_client_io_write:%s, " "free_context:%s, is_client_in_tran:%s, " "is_cas_in_tran:%s, "
        "waiting_event:(%p, %s), " "func_code:%d, stmt_h_id:%d, stmt_hint_type:%d, " "wait_timeout:%d, "
        "client_id:%d, shard_id:%d, cas_id:%d, " "error_ind:%d, error_code:%d, error_msg:[%s] ", ctx_p->cid,
        ctx_p->uid, (ctx_p->is_busy) ? "Y" : "N", (ctx_p->is_in_tran) ? "Y" : "N",
        (ctx_p->is_prepare_for_execute) ? "Y" : "N", (ctx_p->free_on_end_tran) ? "Y" : "N",
        (ctx_p->free_on_client_io_write) ? "Y" : "N", (ctx_p->free_context) ? "Y" : "N",
        (ctx_p->is_client_in_tran) ? "Y" : "N", (ctx_p->is_cas_in_tran) ? "Y" : "N", ctx_p->waiting_event,
        proxy_str_event (ctx_p->waiting_event), ctx_p->func_code, ctx_p->stmt_h_id, ctx_p->stmt_hint_type,
        ctx_p->wait_timeout, ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->error_ind, ctx_p->error_code,
        (ctx_p->error_msg[0]) ? ctx_p->error_msg : "-");

  return (char *) buffer;
}


static void
proxy_context_clear (T_PROXY_CONTEXT * ctx_p)
{
  assert (ctx_p);

  ctx_p->uid = 0;
  ctx_p->is_busy = false;
  ctx_p->is_in_tran = false;
  ctx_p->is_prepare_for_execute = false;
  ctx_p->free_on_end_tran = false;
  ctx_p->free_on_client_io_write = false;
  ctx_p->free_context = false;
  ctx_p->is_client_in_tran = false;
  ctx_p->is_cas_in_tran = false;
  ctx_p->waiting_dummy_prepare = false;
  ctx_p->dont_free_statement = false;
  ctx_p->wait_timeout = 0;

  if (ctx_p->waiting_event)
    {
      proxy_event_free (ctx_p->waiting_event);
      ctx_p->waiting_event = NULL;
    }

  ctx_p->func_code = PROXY_INVALID_FUNC_CODE;
  ctx_p->stmt_h_id = SHARD_STMT_INVALID_HANDLE_ID;
  ctx_p->stmt_hint_type = HT_INVAL;

  if (ctx_p->prepared_stmt != NULL)
    {
      if (ctx_p->prepared_stmt->status == SHARD_STMT_STATUS_IN_PROGRESS)
    {
      /* check and wakeup statement waiter */
      shard_stmt_check_waiter_and_wakeup (ctx_p->prepared_stmt);

      /* free statement */
      shard_stmt_free (ctx_p->prepared_stmt);
    }
      else if (ctx_p->prepared_stmt->stmt_type != SHARD_STMT_TYPE_PREPARED)
    {
      /*
       * shcema info server handle can't be shared with other context
       * so, we can free statement at this time.
       */

      shard_stmt_free (ctx_p->prepared_stmt);
    }
    }
  ctx_p->prepared_stmt = NULL;

  ctx_p->is_connected = false;
  ctx_p->database_user[0] = '\0';
  ctx_p->database_passwd[0] = '\0';

  proxy_context_free_stmt (ctx_p);

  ctx_p->client_id = PROXY_INVALID_CLIENT;
  ctx_p->shard_id = PROXY_INVALID_SHARD;
  ctx_p->cas_id = PROXY_INVALID_CAS;

  proxy_context_clear_error (ctx_p);
  return;
}

void
proxy_context_set_in_tran (T_PROXY_CONTEXT * ctx_p, int shard_id, int cas_id)
{
  assert (ctx_p);
  assert (shard_id >= 0);
  assert (cas_id >= 0);

  ctx_p->is_in_tran = true;
  ctx_p->shard_id = shard_id;
  ctx_p->cas_id = cas_id;
  ctx_p->dont_free_statement = false;

  return;
}

void
proxy_context_set_out_tran (T_PROXY_CONTEXT * ctx_p)
{
  assert (ctx_p);

  ctx_p->is_in_tran = false;
  ctx_p->shard_id = PROXY_INVALID_SHARD;
  ctx_p->cas_id = PROXY_INVALID_CAS;

  return;
}


T_PROXY_CONTEXT *
proxy_context_new (void)
{
  T_PROXY_CONTEXT *ctx_p;
  static unsigned int uid = 0;

  ctx_p = (T_PROXY_CONTEXT *) shard_cqueue_dequeue (&proxy_Context.freeq);
  if (ctx_p)
    {
      assert (ctx_p->is_busy == false);
      assert (ctx_p->is_in_tran == false);

      proxy_context_clear (ctx_p);
      ctx_p->uid = (++uid == 0) ? ++uid : uid;
      ctx_p->is_busy = true;
      ctx_p->wait_timeout = proxy_info_p->wait_timeout;

      PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New context created. context(%s).", proxy_str_context (ctx_p));
    }

#if defined(PROXY_VERBOSE_DEBUG)
  proxy_context_print (false);
#endif /* PROXY_VERBOSE_DEBUG */

  return ctx_p;
}


static void
proxy_context_free_client (T_PROXY_CONTEXT * ctx_p)
{
  T_CLIENT_INFO *client_info_p = NULL;

  ENTER_FUNC ();

  assert (ctx_p);

  if (ctx_p->client_id < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid client identifier. " "(client_id:%d). context(%s).", ctx_p->client_id,
         proxy_str_context (ctx_p));
      EXIT_FUNC ();
      return;
    }

  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
  if (client_info_p != NULL)
    {
      shard_shm_init_client_info (client_info_p);
    }

  proxy_client_io_free_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);

  EXIT_FUNC ();
  return;
}

static void
proxy_context_free_shard (T_PROXY_CONTEXT * ctx_p)
{
  ENTER_FUNC ();

  assert (ctx_p);

  if (ctx_p->is_in_tran == false)
    {
      EXIT_FUNC ();
      return;
    }

  proxy_cas_io_free_by_ctx (ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid);

  EXIT_FUNC ();
  return;
}

void
proxy_context_free (T_PROXY_CONTEXT * ctx_p)
{
  int error;

  ENTER_FUNC ();

  assert (ctx_p);

  proxy_context_free_shard (ctx_p);
  proxy_context_free_client (ctx_p);
  proxy_context_clear (ctx_p);

  error = shard_cqueue_enqueue (&proxy_Context.freeq, (void *) ctx_p);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue context to free queue. (error:%d).", error);
      assert (false);
    }

  EXIT_FUNC ();
  return;
}

#if defined (ENABLE_UNUSED_FUNCTION)
void
proxy_context_free_by_cid (int cid, unsigned int uid)
{
  int error;
  T_PROXY_CONTEXT *ctx_p;

  ctx_p = proxy_context_find (cid, uid);
  if (ctx_p)
    {
      proxy_context_free (ctx_p);
    }
  return;
}
#endif /* ENABLE_UNUSED_FUNCTION */

T_PROXY_CONTEXT *
proxy_context_find (int cid, unsigned int uid)
{
  T_PROXY_CONTEXT *ctx_p;

  ctx_p = &(proxy_Context.ent[cid]);

  return (ctx_p->is_busy && ctx_p->uid == uid) ? ctx_p : NULL;
}

T_PROXY_CONTEXT *
proxy_context_find_by_socket_client_io (T_SOCKET_IO * sock_io_p)
{
  T_PROXY_CONTEXT *ctx_p = NULL;
  T_CLIENT_IO *cli_io_p = NULL;

  assert (sock_io_p);

  cli_io_p = proxy_client_io_find_by_fd (sock_io_p->id.client_id, sock_io_p->fd);
  if (cli_io_p == NULL)
    {
      proxy_socket_io_delete (sock_io_p->fd);

      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find client entry. " "(client_id:%d, fd:%d).",
         sock_io_p->id.client_id, sock_io_p->fd);
      return NULL;
    }

  ctx_p = proxy_context_find (cli_io_p->ctx_cid, cli_io_p->ctx_uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. client(%s).", proxy_str_client_io (cli_io_p));

      proxy_client_io_free (cli_io_p);
      return NULL;
    }

  return ctx_p;
}

T_CONTEXT_STMT *
proxy_context_find_stmt (T_PROXY_CONTEXT * ctx_p, int stmt_h_id)
{
  T_CONTEXT_STMT *stmt_list_p;

  for (stmt_list_p = ctx_p->stmt_list; stmt_list_p; stmt_list_p = stmt_list_p->next)
    {
      if (stmt_list_p->stmt_h_id != stmt_h_id)
    {
      continue;
    }

      return stmt_list_p;
    }

  return NULL;
}

T_CONTEXT_STMT *
proxy_context_add_stmt (T_PROXY_CONTEXT * ctx_p, T_SHARD_STMT * stmt_p)
{
  int error = 0;
  T_CONTEXT_STMT *stmt_list_p;

  stmt_list_p = proxy_context_find_stmt (ctx_p, stmt_p->stmt_h_id);
  if (stmt_list_p)
    {
      return stmt_list_p;
    }

  stmt_list_p = (T_CONTEXT_STMT *) malloc (sizeof (T_CONTEXT_STMT));
  if (stmt_list_p == NULL)
    {
      PROXY_DEBUG_LOG ("malloc failed. ");
      return NULL;
    }

  error = shard_stmt_pin (stmt_p);
  if (error < 0)
    {
      FREE_MEM (stmt_list_p);

      return NULL;
    }

  stmt_list_p->stmt_h_id = stmt_p->stmt_h_id;

  stmt_list_p->next = ctx_p->stmt_list;
  ctx_p->stmt_list = stmt_list_p;

  PROXY_DEBUG_LOG ("add prepared statement to context. " "(context:(%s), stmt:(%s))", proxy_str_context (ctx_p),
           shard_str_stmt (stmt_p));

  return stmt_list_p;
}

void
proxy_context_free_stmt (T_PROXY_CONTEXT * ctx_p)
{
  T_CONTEXT_STMT *stmt_list_p, *stmt_list_np;
  T_SHARD_STMT *stmt_p;

  for (stmt_list_p = ctx_p->stmt_list; stmt_list_p; stmt_list_p = stmt_list_np)
    {
      stmt_list_np = stmt_list_p->next;

      stmt_p = shard_stmt_find_by_stmt_h_id (stmt_list_p->stmt_h_id);
      if (stmt_p)
    {
      PROXY_DEBUG_LOG ("remove prepared statement from context. " "(context:(%s), stmt:(%s))",
               proxy_str_context (ctx_p), shard_str_stmt (stmt_p));

      shard_stmt_unpin (stmt_p);

      if (stmt_p->num_pinned <= 0 && stmt_p->status == SHARD_STMT_STATUS_INVALID)
        {
          shard_stmt_free (stmt_p);
        }
      else if (stmt_p->stmt_type != SHARD_STMT_TYPE_PREPARED)
        {
          assert (stmt_p->num_pinned == 0);
          shard_stmt_free (stmt_p);
        }
    }

      FREE_MEM (stmt_list_p);
    }

  ctx_p->stmt_list = NULL;

  return;
}

void
proxy_context_timeout (T_PROXY_CONTEXT * ctx_p)
{
  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Context waiter timed out. " "context(%s).", proxy_str_context (ctx_p));

  /* free pending 'prepare/execute' request */
  if (ctx_p->waiting_event)
    {
      proxy_event_free (ctx_p->waiting_event);
      ctx_p->waiting_event = NULL;
    }

  ctx_p->waiting_dummy_prepare = false;
  ctx_p->wait_timeout = proxy_info_p->wait_timeout;

  ctx_p->func_code = PROXY_INVALID_FUNC_CODE;
  ctx_p->stmt_h_id = SHARD_STMT_INVALID_HANDLE_ID;
  ctx_p->stmt_hint_type = HT_INVAL;

  /* if statement owner */
  if (ctx_p->prepared_stmt && ctx_p->prepared_stmt->status == SHARD_STMT_STATUS_IN_PROGRESS)
    {
      /* check and wakeup statement waiter */
      shard_stmt_check_waiter_and_wakeup (ctx_p->prepared_stmt);

      /* free statement */
      shard_stmt_free (ctx_p->prepared_stmt);
    }
  ctx_p->prepared_stmt = NULL;

  /* if shard/cas waiter */
  if (ctx_p->is_in_tran == false)
    {
      if (ctx_p->cas_id != PROXY_INVALID_CAS)
    {
      assert (false);
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected transaction status. " "context(%s).", proxy_str_context (ctx_p));

    }
      ctx_p->shard_id = PROXY_INVALID_SHARD;
      ctx_p->cas_id = PROXY_INVALID_CAS;
    }

  proxy_context_set_error_with_msg (ctx_p, CAS_ERROR_INDICATOR, CAS_ER_INTERNAL,
                    "proxy service temporarily unavailable");
  proxy_context_send_error (ctx_p);
  proxy_context_clear_error (ctx_p);

  return;
}

void
proxy_handler_destroy (void)
{
  shard_queue_destroy (&proxy_Handler.cas_rcv_q);
  shard_queue_destroy (&proxy_Handler.cli_ret_q);
  shard_queue_destroy (&proxy_Handler.cli_rcv_q);
  proxy_context_destroy ();
}

int
proxy_handler_initialize (void)
{
  int error;
  static bool is_init = false;

  if (is_init == true)
    {
      return 0;
    }

  error = proxy_context_initialize ();
  if (error)
    {
      goto error_return;
    }

  error = shard_queue_initialize (&proxy_Handler.cas_rcv_q);
  if (error)
    {
      goto error_return;
    }

  error = shard_queue_initialize (&proxy_Handler.cli_ret_q);
  if (error)
    {
      goto error_return;
    }

  error = shard_queue_initialize (&proxy_Handler.cli_rcv_q);
  if (error)
    {
      goto error_return;
    }

  is_init = true;
  return 0;

error_return:
  proxy_handler_destroy ();

  return -1;
}

void
proxy_handler_process (void)
{
  T_PROXY_HANDLER *handler_p;
  void *msg;

  handler_p = &(proxy_Handler);

  /* process cas response message */
  while ((msg = shard_queue_dequeue (&handler_p->cas_rcv_q)) != NULL)
    {
      proxy_handler_process_cas_event ((T_PROXY_EVENT *) msg);
    }

  /* process client retry message */
  while ((msg = shard_queue_dequeue (&handler_p->cli_ret_q)) != NULL)
    {
      proxy_handler_process_client_event ((T_PROXY_EVENT *) msg);
    }

  /* process client request message */
  while ((msg = shard_queue_dequeue (&handler_p->cli_rcv_q)) != NULL)
    {
      proxy_handler_process_client_event ((T_PROXY_EVENT *) msg);
    }

  return;
}

int
proxy_wakeup_context_by_shard (T_WAIT_CONTEXT * waiter_p, int shard_id, int cas_id)
{
  int error;
  T_CAS_IO *cas_io_p;
  T_PROXY_EVENT *event_p = NULL;
  T_PROXY_CONTEXT *ctx_p = NULL;

  assert (waiter_p);
  assert (shard_id >= 0);
  assert (cas_id >= 0);

  ctx_p = proxy_context_find (waiter_p->ctx_cid, waiter_p->ctx_uid);
  if (ctx_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. " "(context id:%d, context uid:%d).", waiter_p->ctx_cid,
         waiter_p->ctx_uid);
      goto error_return;
    }

  PROXY_DEBUG_LOG ("wakeup context(cid:%d, uid:%u) by shard(%d)/cas(%d).", ctx_p->cid, ctx_p->uid, shard_id, cas_id);

  cas_io_p =
    proxy_cas_alloc_by_ctx (ctx_p->client_id, shard_id, cas_id, waiter_p->ctx_cid, waiter_p->ctx_uid,
                ctx_p->wait_timeout, ctx_p->func_code);
  if (cas_io_p == NULL)
    {
      PROXY_DEBUG_LOG ("failed to proxy_cas_alloc_by_ctx. " "(shard_id:%d, cas_id:%d, ctx_cid:%d, ctx_uid:%d", shard_id,
               cas_id, waiter_p->ctx_cid, waiter_p->ctx_uid);
      goto error_return;
    }
  if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
    {
      assert (false);
      goto error_return;
    }

  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);

  event_p = proxy_event_new (PROXY_EVENT_CLIENT_WAKEUP_BY_SHARD, PROXY_EVENT_FROM_CLIENT);
  if (event_p == NULL)
    {
      PROXY_DEBUG_LOG ("failed to proxy_event_new.");
      goto error_return;
    }
  proxy_event_set_context (event_p, waiter_p->ctx_cid, waiter_p->ctx_uid);
  proxy_event_set_shard (event_p, cas_io_p->shard_id, cas_io_p->cas_id);

  error = shard_queue_enqueue (&proxy_Handler.cli_ret_q, (void *) event_p);
  if (error)
    {
      assert (false);

      proxy_event_free (event_p);
      event_p = NULL;

      PROXY_DEBUG_LOG ("failed to shard_queue_enqueue.");
      goto error_return;
    }

  return 0;

error_return:
  if (ctx_p)
    {
      proxy_context_free (ctx_p);
    }
  return -1;
}

int
proxy_wakeup_context_by_statement (T_WAIT_CONTEXT * waiter_p)
{
  int error;
  T_PROXY_EVENT *event_p;

  assert (waiter_p);

  event_p = proxy_event_new (PROXY_EVENT_CLIENT_WAKEUP_BY_STATEMENT, PROXY_EVENT_FROM_CLIENT);
  if (event_p == NULL)
    {
      PROXY_DEBUG_LOG ("failed to proxy_event_new.");
      return -1;
    }
  proxy_event_set_context (event_p, waiter_p->ctx_cid, waiter_p->ctx_uid);

  error = shard_queue_enqueue (&proxy_Handler.cli_ret_q, (void *) event_p);
  if (error)
    {
      PROXY_DEBUG_LOG ("failed to shard_queue_enqueue.");
      assert (false);
      return -1;
    }

  return 0;
}

T_PROXY_EVENT *
proxy_event_new (unsigned int type, int from_cas)
{
  T_PROXY_EVENT *event_p;

  event_p = (T_PROXY_EVENT *) malloc (sizeof (T_PROXY_EVENT));
  if (event_p)
    {
      event_p->type = type;
      event_p->from_cas = from_cas;

      event_p->cid = PROXY_INVALID_CONTEXT;
      event_p->uid = 0;
      event_p->shard_id = PROXY_INVALID_SHARD;
      event_p->cas_id = PROXY_INVALID_CAS;

      memset (((void *) &event_p->buffer), 0, sizeof (event_p->buffer));
    }

  return event_p;
}

T_PROXY_EVENT *
proxy_event_dup (T_PROXY_EVENT * event_p)
{
  T_PROXY_EVENT *new_event_p;

  new_event_p = (T_PROXY_EVENT *) malloc (sizeof (T_PROXY_EVENT));
  if (new_event_p)
    {
      memcpy ((void *) new_event_p, (void *) event_p, offsetof (T_PROXY_EVENT, buffer));

      new_event_p->buffer.length = event_p->buffer.length;
      new_event_p->buffer.offset = event_p->buffer.offset;
      new_event_p->buffer.data = (char *) malloc (event_p->buffer.length * sizeof (char));
      if (new_event_p->buffer.data == NULL)
    {
      FREE_MEM (new_event_p);
      return NULL;
    }
      memcpy ((void *) new_event_p->buffer.data, (void *) event_p->buffer.data, event_p->buffer.length);
    }

  return new_event_p;
}

T_PROXY_EVENT *
proxy_event_new_with_req (char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC req_func)
{
  T_PROXY_EVENT *event_p;
  char *msg = NULL;
  int length;

  event_p = proxy_event_new (type, from);
  if (event_p == NULL)
    {
      return NULL;
    }

  length = req_func (driver_info, &msg);
  if (length <= 0)
    {
      proxy_event_free (event_p);
      return NULL;
    }

  proxy_event_set_buffer (event_p, msg, length);

  return event_p;
}

T_PROXY_EVENT *
proxy_event_new_with_rsp (char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
{
  return proxy_event_new_with_req (driver_info, type, from, resp_func);
}

T_PROXY_EVENT *
proxy_event_new_with_rsp_ex (char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC_EX resp_func,
                 void *argv)
{
  T_PROXY_EVENT *event_p;
  char *msg = NULL;
  int length;

  event_p = proxy_event_new (type, from);
  if (event_p == NULL)
    {
      return NULL;
    }

  length = resp_func (driver_info, &msg, argv);
  if (length <= 0)
    {
      proxy_event_free (event_p);
      return NULL;
    }

  proxy_event_set_buffer (event_p, msg, length);

  return event_p;
}

T_PROXY_EVENT *
proxy_event_new_with_error (char *driver_info, unsigned int type, int from,
                int (*err_func) (char *driver_info, char **buffer, int error_ind, int error_code,
                         const char *error_msg, char is_in_tran), int error_ind, int error_code,
                const char *error_msg, char is_in_tran)
{
  T_PROXY_EVENT *event_p;
  char *msg = NULL;
  int length;

  event_p = proxy_event_new (type, from);
  if (event_p == NULL)
    {
      return NULL;
    }

  length = err_func (driver_info, &msg, error_ind, error_code, error_msg, is_in_tran);
  if (length <= 0)
    {
      proxy_event_free (event_p);
      return NULL;
    }

  proxy_event_set_buffer (event_p, msg, length);

  return event_p;
}

int
proxy_event_alloc_buffer (T_PROXY_EVENT * event_p, unsigned int size)
{
  assert (event_p);

  event_p->buffer.length = size;
  event_p->buffer.offset = 0;
  event_p->buffer.data = (char *) malloc (size * sizeof (char));

  return (event_p->buffer.data != NULL) ? 0 : -1;
}

int
proxy_event_realloc_buffer (T_PROXY_EVENT * event_p, unsigned int size)
{
  char *old_data;

  assert (event_p);
  assert (event_p->buffer.data);

  old_data = event_p->buffer.data;

  assert (event_p->buffer.length < (int) size);

  event_p->buffer.data = (char *) realloc (event_p->buffer.data, size * sizeof (char));
  if (event_p->buffer.data == NULL)
    {
      event_p->buffer.data = old_data;
      return -1;
    }
  event_p->buffer.length = size;

  return 0;
}

void
proxy_event_set_buffer (T_PROXY_EVENT * event_p, char *data, unsigned int size)
{
  assert (event_p);
  assert (data != NULL);

  event_p->buffer.length = size;
  event_p->buffer.offset = 0;
  event_p->buffer.data = data;
}

void
proxy_event_set_type_from (T_PROXY_EVENT * event_p, unsigned int type, int from_cas)
{
  assert (event_p);

  event_p->type = type;
  event_p->from_cas = from_cas;

  return;
}

void
proxy_event_set_context (T_PROXY_EVENT * event_p, int cid, unsigned int uid)
{
  assert (event_p);

  event_p->cid = cid;
  event_p->uid = uid;

  return;
}


void
proxy_event_set_shard (T_PROXY_EVENT * event_p, int shard_id, int cas_id)
{
  assert (event_p);

  event_p->shard_id = shard_id;
  event_p->cas_id = cas_id;

  return;
}

bool
proxy_event_io_read_complete (T_PROXY_EVENT * event_p)
{
  assert (event_p);
  assert (event_p->type == PROXY_EVENT_IO_READ);

  return (event_p->buffer.length != 0 && event_p->buffer.length == event_p->buffer.offset) ? true : false;
}

#if defined (ENABLE_UNUSED_FUNCTION)
bool
proxy_event_io_write_complete (T_PROXY_EVENT * event_p)
{
  assert (event_p);
  assert (event_p->type == PROXY_EVENT_IO_WRITE);

  return (event_p->buffer.length != 0 && event_p->buffer.length == event_p->buffer.offset) ? true : false;
}
#endif /* ENABLE_UNUSED_FUNCTION */

void
proxy_event_free (T_PROXY_EVENT * event_p)
{
  assert (event_p);

  proxy_io_buffer_clear (&event_p->buffer);
  FREE_MEM (event_p);

  return;
}

char *
proxy_str_event (T_PROXY_EVENT * event_p)
{
  static char buffer[LINE_MAX];

  if (event_p == NULL)
    {
      return (char *) "-";
    }

  snprintf (buffer, sizeof (buffer), "type:%d, from_cas:%s, cid:%d, uid:%u, shard_id:%d, cas_id:%d", event_p->type,
        (event_p->from_cas) ? "Y" : "N", event_p->cid, event_p->uid, event_p->shard_id, event_p->cas_id);

  return (char *) buffer;
}

void
proxy_timer_process (void)
{
  static int num_called = 0;
  static int old = 0;
  int now, diff_time;

  num_called++;
  num_called = (num_called % PROXY_MAX_IGNORE_TIMER_CHECK);
  if (num_called != 0)
    {
      return;
    }

  now = time (NULL);
  diff_time = now - old;
  if (diff_time < PROXY_TIMER_CHECK_INTERVAL)
    {
      return;
    }

  shard_statement_wait_timer ();
  proxy_available_cas_wait_timer ();

  old = now;

  return;
}

char *
shard_str_sqls (char *sql)
{
  static char buffer[LINE_MAX];
  size_t len;
  size_t head_len, ws_len, tail_len;
  char *from, *to;

  if (sql == NULL)
    {
      return (char *) "-";
    }

  len = strlen (sql);
  if (len < (int) sizeof (buffer))
    {
      return sql;
    }

  ws_len = 32;
  head_len = sizeof (buffer) / 2;
  tail_len = sizeof (buffer) - head_len - ws_len;

  /* head */
  memcpy (buffer, sql, head_len);

  /* ws */
  *(buffer + head_len) = '\n';
  memset ((buffer + head_len + 1), (int) '.', ws_len - 2);
  *(buffer + head_len + ws_len - 1) = '\n';

  /* tail */
  to = (sql + len);
  from = (to - tail_len);
  memcpy ((buffer + head_len + ws_len), from, tail_len);

  buffer[LINE_MAX - 1] = '\0';  /* bulletproof */

  return buffer;
}