Skip to content

File shard_statement.c

File List > broker > shard_statement.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * shard_statement.c -
 */

#ident "$Id$"


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>

#include "cas_common.h"
#include "shard_proxy.h"
#include "shard_proxy_common.h"
#include "shard_statement.h"
#include "shard_shm.h"

#if defined (SUPPRESS_STRLEN_WARNING)
#define strlen(s1)  ((int) strlen(s1))
#endif /* defined (SUPPRESS_STRLEN_WARNING) */

extern T_SHM_SHARD_KEY *shm_key_p;
extern T_PROXY_INFO *proxy_info_p;

static int shard_stmt_lru_insert (T_SHARD_STMT * stmt_p);
static int shard_stmt_lru_delete (T_SHARD_STMT * stmt_p);
static T_SHARD_STMT *shard_stmt_get_lru (void);

static int *shard_stmt_pos_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id);
static int shard_stmt_find_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id);
static int shard_stmt_set_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id, int srv_h_id);

static T_SHARD_STMT *shard_stmt_find_unused (void);

static T_SHARD_STMT *shard_stmt_new_internal (int stmt_type, char *sql_stmt, int ctx_cid, unsigned int ctx_uid,
                          T_BROKER_VERSION client_version);

static int shard_stmt_change_shard_val_to_id (char **sql_stmt, const char **buf, char appl_server);
static char *shard_stmt_write_buf_to_sql (char *sql_stmt, const char *buf, int length, bool is_to_upper,
                      char appl_server);

static T_BROKER_VERSION shard_stmt_make_protocol_version (T_BROKER_VERSION client_version);
static void shard_stmt_put_statement_to_map (const char *sql_stmt, T_SHARD_STMT * stmt_p);
static void shard_stmt_del_statement_from_map (T_SHARD_STMT * stmt_p);
static void shard_stmt_set_status (int stmt_h_id, int status);

T_SHARD_STMT_GLOBAL shard_Stmt;

int shard_Stmt_max_num_alloc = SHARD_STMT_MAX_NUM_ALLOC;

static int
shard_stmt_lru_insert (T_SHARD_STMT * stmt_p)
{
  if ((stmt_p->num_pinned != 0) || (stmt_p->lru_prev != NULL) || (stmt_p->lru_next != NULL))
    {
      PROXY_DEBUG_LOG ("Invalid statement status. statement(%s).", shard_str_stmt (stmt_p));

      assert (false);
      return -1;
    }

  stmt_p->lru_next = NULL;
  stmt_p->lru_prev = shard_Stmt.mru;

  if (shard_Stmt.mru)
    {
      shard_Stmt.mru->lru_next = stmt_p;
    }
  else
    {
      shard_Stmt.lru = stmt_p;
    }

  shard_Stmt.mru = stmt_p;

  return 0;
}

static int
shard_stmt_lru_delete (T_SHARD_STMT * stmt_p)
{
  ENTER_FUNC ();

  if ((stmt_p->lru_prev == NULL && shard_Stmt.lru != stmt_p) || (stmt_p->lru_next == NULL && shard_Stmt.mru != stmt_p))
    {
      PROXY_DEBUG_LOG ("Invalid statement lru list. " "(stmt_p:%p, lru:%p, mru:%p). " "statement(%s).", stmt_p,
               shard_Stmt.lru, shard_Stmt.mru, shard_str_stmt (stmt_p));

      assert (false);

      EXIT_FUNC ();
      return -1;
    }

  if (stmt_p->lru_next)
    {
      stmt_p->lru_next->lru_prev = stmt_p->lru_prev;
    }
  else
    {
      shard_Stmt.mru = stmt_p->lru_prev;
    }

  if (stmt_p->lru_prev)
    {
      stmt_p->lru_prev->lru_next = stmt_p->lru_next;
    }
  else
    {
      shard_Stmt.lru = stmt_p->lru_next;
    }

  stmt_p->lru_prev = NULL;
  stmt_p->lru_next = NULL;

  EXIT_FUNC ();
  return 0;
}

static T_SHARD_STMT *
shard_stmt_get_lru (void)
{
  return shard_Stmt.lru;
}

static int *
shard_stmt_pos_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id)
{
  int pos;
  int srv_h_id;

  if ((shard_id < 0 || shard_Stmt.max_num_shard <= shard_id) || (cas_id < 0 || shard_Stmt.num_cas_per_shard <= cas_id))
    {
      PROXY_DEBUG_LOG ("Invalid statement shard/CAS id. " "(shard_id:%d, cas_id:%d, max_num_shard:%d, "
               "num_cas_per_shard:%d). statement(%s).", shard_id, cas_id, shard_Stmt.max_num_shard,
               shard_Stmt.num_cas_per_shard, shard_str_stmt (stmt_p));

      assert (false);
      return NULL;
    }

  pos = (shard_id * shard_Stmt.num_cas_per_shard) + cas_id;
  srv_h_id = stmt_p->srv_h_id_ent[pos];

  return &(stmt_p->srv_h_id_ent[pos]);
}

static int
shard_stmt_find_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id)
{
  int *srv_h_id_p;

  srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
  if (srv_h_id_p == NULL)
    {
      return -1;
    }

  return (*srv_h_id_p);
}

static int
shard_stmt_set_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id, int srv_h_id)
{
  int *srv_h_id_p;

  srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
  if (srv_h_id_p == NULL)
    {
      return -1;
    }

  *srv_h_id_p = srv_h_id;
  return 0;
}

static T_BROKER_VERSION
shard_stmt_make_protocol_version (T_BROKER_VERSION client_version)
{
  /* protocol version used in only shard_Stmt->stmt_entry's client_version */
  if (client_version < CAS_MAKE_VER (8, 3, 0))
    {
      /* old protocol */
      return CAS_MAKE_VER (8, 2, 0);
    }
  else if (client_version < CAS_MAKE_VER (8, 4, 0))
    {
      /* error indicator protocol added */
      return CAS_MAKE_VER (8, 3, 0);
    }
  else if (client_version <= CAS_PROTO_MAKE_VER (PROTOCOL_V1))
    {
      /* prepare result info added */
      return CAS_PROTO_MAKE_VER (PROTOCOL_V1);
    }
  /* send columns meta-data with the result for executing */
  /* PROTOCOL_V2 is current protocol version */
  return CAS_PROTO_CURRENT_VER;
}

T_SHARD_STMT *
shard_stmt_find_by_sql (char *sql_stmt, const char *db_user, T_BROKER_VERSION client_version)
{
  T_SHARD_STMT *stmt_p = NULL;
  T_BROKER_VERSION client_protocol_version;

  client_protocol_version = shard_stmt_make_protocol_version (client_version);

  stmt_p = (T_SHARD_STMT *) mht_get (shard_Stmt.stmt_map, sql_stmt);
  for (; stmt_p != NULL; stmt_p = stmt_p->hash_next)
    {
      if (stmt_p->stmt_type != SHARD_STMT_TYPE_PREPARED)
    {
      assert (false);
      continue;
    }

      if (stmt_p->status == SHARD_STMT_STATUS_UNUSED || stmt_p->status == SHARD_STMT_STATUS_INVALID)
    {
      continue;
    }

      if (stmt_p->client_version != client_protocol_version)
    {
      continue;
    }

      if (strcasecmp (db_user, stmt_p->database_user))
    {
      continue;
    }

      assert (strcmp (sp_get_sql_stmt (stmt_p->parser), sql_stmt) == 0);

      return stmt_p;
    }

  return NULL;
}

T_SHARD_STMT *
shard_stmt_find_by_stmt_h_id (int stmt_h_id)
{
  T_SHARD_STMT *stmt_p = NULL;
  unsigned hash_res;

  hash_res = (stmt_h_id % shard_Stmt.max_num_stmt);

  stmt_p = &(shard_Stmt.stmt_ent[hash_res]);
  if (stmt_p->status != SHARD_STMT_STATUS_UNUSED && stmt_p->stmt_h_id == stmt_h_id)
    {
      return stmt_p;
    }

  return NULL;
}

static T_SHARD_STMT *
shard_stmt_find_unused (void)
{
  T_SHARD_STMT *stmt_p = NULL;
  int i;

  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
    {
      stmt_p = &(shard_Stmt.stmt_ent[i]);
      if (stmt_p->status != SHARD_STMT_STATUS_UNUSED)
    {
      continue;
    }

      return stmt_p;
    }

  return NULL;
}

int
shard_stmt_pin (T_SHARD_STMT * stmt_p)
{
  int error = 0;

  if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement status. statement(%s).", shard_str_stmt (stmt_p));
      assert (false);
      return -1;
    }

  stmt_p->num_pinned += 1;

  if ((shard_Stmt.lru == stmt_p) || (stmt_p->lru_prev || stmt_p->lru_next))
    {
      PROXY_DEBUG_LOG ("Pin statement. statement(%s).", shard_str_stmt (stmt_p));

      error = shard_stmt_lru_delete (stmt_p);
      if (error < 0)
    {
      stmt_p->num_pinned -= 1;
      return error;
    }
    }

  return error;
}

int
shard_stmt_unpin (T_SHARD_STMT * stmt_p)
{
  int error = 0;

  if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement status. statement(%s).", shard_str_stmt (stmt_p));
      assert (false);
      return -1;
    }

  stmt_p->num_pinned -= 1;
  assert (stmt_p->num_pinned >= 0);

  if (stmt_p->num_pinned <= 0)
    {
      stmt_p->num_pinned = 0;

      error = shard_stmt_lru_insert (stmt_p);
      if (error < 0)
    {
      stmt_p->num_pinned += 1;
      return error;
    }
    }

  return error;
}

void
shard_stmt_check_waiter_and_wakeup (T_SHARD_STMT * stmt_p)
{
  int error;
  T_WAIT_CONTEXT *waiter_p = NULL;

  ENTER_FUNC ();

  assert (stmt_p);

  /* clear context owner */
  stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
  stmt_p->ctx_uid = 0;

  while ((waiter_p = (T_WAIT_CONTEXT *) shard_queue_dequeue (&stmt_p->waitq)) != NULL)
    {
      assert (stmt_p->stmt_type == SHARD_STMT_TYPE_PREPARED);

      if (proxy_info_p->stmt_waiter_count > 0)
    {
      proxy_info_p->stmt_waiter_count--;
    }

      PROXY_DEBUG_LOG ("Wakeup context by statement. statement(%s) waiter_count(%d).", shard_str_stmt (stmt_p),
               proxy_info_p->stmt_waiter_count);

      error = proxy_wakeup_context_by_statement (waiter_p);
      if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
             "Failed to wakeup context by statement. "
             "(error:%d, context id:%d, context uid:%d). statement(%s).", error, waiter_p->ctx_cid,
             waiter_p->ctx_uid, shard_str_stmt (stmt_p));

      assert (false);
    }

      FREE_MEM (waiter_p);
    }

  EXIT_FUNC ();
  return;
}

static T_SHARD_STMT *
shard_stmt_new_internal (int stmt_type, char *sql_stmt, int ctx_cid, unsigned int ctx_uid,
             T_BROKER_VERSION client_version)
{
  int error;
  int i, num_cas;
  T_SHARD_STMT *stmt_p = NULL;
  T_PROXY_CONTEXT *ctx_p = NULL;

  assert ((stmt_type != SHARD_STMT_TYPE_SCHEMA_INFO && sql_stmt != NULL)
      || (stmt_type == SHARD_STMT_TYPE_SCHEMA_INFO && sql_stmt == NULL && client_version == 0));

  ctx_p = proxy_context_find (ctx_cid, ctx_uid);
  assert (ctx_p != NULL);

  assert (stmt_type != SHARD_STMT_TYPE_PREPARED
      || shard_stmt_find_by_sql (sql_stmt, ctx_p->database_user, client_version) == NULL);

  num_cas = shard_Stmt.max_num_shard * shard_Stmt.num_cas_per_shard;

  stmt_p = shard_stmt_find_unused ();
  if (stmt_p)
    {
      assert (stmt_p->num_pinned == 0);
      assert (stmt_p->lru_next == NULL && stmt_p->lru_prev == NULL);

      if (!(stmt_p->num_pinned == 0) || !(stmt_p->lru_next == NULL && stmt_p->lru_prev == NULL))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement LRU. statement(%s).", shard_str_stmt (stmt_p));
      return NULL;
    }
    }
  else
    {
      stmt_p = shard_stmt_get_lru ();
      if (stmt_p)
    {
      shard_stmt_free (stmt_p);
    }
      else
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Exceeds max prepared statement. ");
      return NULL;
    }
    }

  if (stmt_p)
    {
      if ((int) stmt_p->num_alloc >= shard_Stmt_max_num_alloc)
    {
      stmt_p->num_alloc = 0;
    }
      stmt_p->stmt_h_id = (stmt_p->num_alloc * shard_Stmt.max_num_stmt) + stmt_p->index;

      stmt_p->status = SHARD_STMT_STATUS_IN_PROGRESS;
      stmt_p->stmt_type = stmt_type;
      if (stmt_p->stmt_type != SHARD_STMT_TYPE_SCHEMA_INFO)
    {
      stmt_p->client_version = shard_stmt_make_protocol_version (client_version);

      stmt_p->parser = sp_create_parser (sql_stmt);
      if (stmt_p->parser == NULL)
        {
          shard_stmt_free (stmt_p);
          return NULL;
        }
    }
      else
    {
      stmt_p->client_version = 0;
      assert (stmt_p->parser == NULL);
      stmt_p->parser = NULL;
    }

      stmt_p->ctx_cid = ctx_cid;
      stmt_p->ctx_uid = ctx_uid;
      strncpy_bufsize (stmt_p->database_user, ctx_p->database_user);

      stmt_p->num_pinned = 0;
      stmt_p->lru_prev = NULL;
      stmt_p->lru_next = NULL;

      /* __FOR_DEBUG */
      assert (stmt_p->request_buffer_length == 0);
      assert (stmt_p->request_buffer == NULL);
      assert (stmt_p->reply_buffer_length == 0);
      assert (stmt_p->reply_buffer == NULL);

      stmt_p->request_buffer_length = 0;
      stmt_p->request_buffer = NULL;
      stmt_p->reply_buffer_length = 0;
      stmt_p->reply_buffer = NULL;

      for (i = 0; i < num_cas; i++)
    {
      stmt_p->srv_h_id_ent[i] = SHARD_STMT_INVALID_HANDLE_ID;
    }

      error = shard_queue_initialize (&stmt_p->waitq);
      if (error)
    {
      assert (false);

      shard_stmt_free (stmt_p);
      return NULL;
    }

      stmt_p->num_alloc += 1;

      if (stmt_p->stmt_type == SHARD_STMT_TYPE_PREPARED)
    {
      shard_stmt_put_statement_to_map (sql_stmt, stmt_p);
    }
    }

  return stmt_p;
}

T_SHARD_STMT *
shard_stmt_new_prepared_stmt (char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
{
  T_SHARD_STMT *stmt_p = NULL;

  stmt_p = shard_stmt_new_internal (SHARD_STMT_TYPE_PREPARED, sql_stmt, ctx_cid, ctx_uid, client_version);
  return stmt_p;
}

T_SHARD_STMT *
shard_stmt_new_schema_info (int ctx_cid, unsigned int ctx_uid)
{
  T_SHARD_STMT *stmt_p = NULL;

  stmt_p = shard_stmt_new_internal (SHARD_STMT_TYPE_SCHEMA_INFO, NULL, ctx_cid, ctx_uid, 0);
  return stmt_p;
}

T_SHARD_STMT *
shard_stmt_new_exclusive (char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
{
  T_SHARD_STMT *stmt_p = NULL;

  stmt_p = shard_stmt_new_internal (SHARD_STMT_TYPE_EXCLUSIVE, sql_stmt, ctx_cid, ctx_uid, client_version);
  return stmt_p;
}

void
shard_stmt_destroy (void)
{
  T_SHARD_STMT *stmt_p = NULL;
  int i;

  if (shard_Stmt.stmt_map)
    {
      mht_destroy (shard_Stmt.stmt_map);
    }

  if (shard_Stmt.stmt_ent)
    {
      for (i = 0; i < shard_Stmt.max_num_stmt; i++)
    {
      stmt_p = &(shard_Stmt.stmt_ent[i]);

      stmt_p->stmt_h_id = SHARD_STMT_INVALID_HANDLE_ID;
      stmt_p->status = SHARD_STMT_STATUS_UNUSED;

      stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
      stmt_p->ctx_uid = 0;

      stmt_p->num_pinned = 0;
      if ((shard_Stmt.lru == stmt_p) || (stmt_p->lru_prev || stmt_p->lru_next))
        {
          PROXY_DEBUG_LOG ("Delete statement from lru. " "statement(%p, %s). ", stmt_p, shard_str_stmt (stmt_p));

          shard_stmt_lru_delete (stmt_p);
        }
      stmt_p->lru_prev = NULL;
      stmt_p->lru_next = NULL;

      if (stmt_p->parser)
        {
          sp_destroy_parser (stmt_p->parser);
        }
      stmt_p->parser = NULL;

      FREE_MEM (stmt_p->request_buffer);
      stmt_p->request_buffer_length = 0;

      FREE_MEM (stmt_p->reply_buffer);
      stmt_p->reply_buffer_length = 0;


      FREE_MEM (stmt_p->srv_h_id_ent);

      shard_queue_destroy (&stmt_p->waitq);
    }

      FREE_MEM (shard_Stmt.stmt_ent);
    }

  shard_Stmt.max_num_stmt = 0;
  shard_Stmt.max_num_shard = 0;
  shard_Stmt.num_cas_per_shard = 0;

  return;
}

void
shard_stmt_free (T_SHARD_STMT * stmt_p)
{
  int num_cas;
  int i;

  ENTER_FUNC ();

  assert (stmt_p);

  num_cas = shard_Stmt.max_num_shard * shard_Stmt.num_cas_per_shard;

  if (stmt_p->stmt_type == SHARD_STMT_TYPE_PREPARED)
    {
      shard_stmt_del_statement_from_map (stmt_p);
    }

  stmt_p->stmt_h_id = SHARD_STMT_INVALID_HANDLE_ID;
  stmt_p->status = SHARD_STMT_STATUS_UNUSED;
  stmt_p->stmt_type = -1;

  stmt_p->client_version = 0;

  stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
  stmt_p->ctx_uid = 0;
  stmt_p->database_user[0] = '\0';

  stmt_p->num_pinned = 0;
  if ((shard_Stmt.lru == stmt_p) || (stmt_p->lru_prev || stmt_p->lru_next))
    {
      PROXY_DEBUG_LOG ("Delete statement from lru. " "statement(%p, %s).", stmt_p, shard_str_stmt (stmt_p));

      shard_stmt_lru_delete (stmt_p);
    }
  stmt_p->lru_prev = NULL;
  stmt_p->lru_next = NULL;
  stmt_p->hash_next = NULL;
  stmt_p->hash_prev = NULL;

  if (stmt_p->parser)
    {
      sp_destroy_parser (stmt_p->parser);
    }
  stmt_p->parser = NULL;

  FREE_MEM (stmt_p->request_buffer);
  stmt_p->request_buffer_length = 0;

  FREE_MEM (stmt_p->reply_buffer);
  stmt_p->reply_buffer_length = 0;


  for (i = 0; i < num_cas; i++)
    {
      stmt_p->srv_h_id_ent[i] = SHARD_STMT_INVALID_HANDLE_ID;
    }

  shard_queue_destroy (&stmt_p->waitq);

  EXIT_FUNC ();
  return;
}


/*
 *
 * return : stored server handle id
 */
int
shard_stmt_find_srv_h_id_for_shard_cas (int stmt_h_id, int shard_id, int cas_id)
{
  int srv_h_id;
  T_SHARD_STMT *stmt_p = NULL;

  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
  if (stmt_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find statement. (stmt_h_id:%d).", stmt_h_id);
      return -1;
    }

  srv_h_id = shard_stmt_find_srv_h_id (stmt_p, shard_id, cas_id);
  if (srv_h_id == SHARD_STMT_INVALID_HANDLE_ID)
    {
      PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL,
         "Unable to find saved statement handle id. " "(shard_id:%d, cas_id:%d). stmt(%s).", shard_id, cas_id,
         shard_str_stmt (stmt_p));
      return -1;
    }

  return srv_h_id;
}

/*
 *
 * return : success or fail
 */
int
shard_stmt_add_srv_h_id_for_shard_cas (int stmt_h_id, int shard_id, int cas_id, int srv_h_id)
{
  int error;

  T_SHARD_STMT *stmt_p = NULL;

  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
  if (stmt_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find statement. (stmt_h_id:%d).", stmt_h_id);
      return -1;
    }

  if (stmt_p->status == SHARD_STMT_STATUS_IN_PROGRESS)
    {
      shard_stmt_set_status_complete (stmt_h_id);

      /* check and wakeup statement waiter */
      shard_stmt_check_waiter_and_wakeup (stmt_p);
    }

  error = shard_stmt_set_srv_h_id (stmt_p, shard_id, cas_id, srv_h_id);
  if (error)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Unable to save statement handle id. " "(shard_id:%d, cas_id:%d, srv_h_id:%d). stmt(%s).", shard_id,
         cas_id, srv_h_id, shard_str_stmt (stmt_p));
      return -1;
    }

  PROXY_DEBUG_LOG ("save statement handle id. " "(shard_id:%d, cas_id:%d, srv_h_id:%d). statement(%s).", shard_id,
           cas_id, srv_h_id, shard_str_stmt (stmt_p));

  return 0;
}

void
shard_stmt_del_srv_h_id_for_shard_cas (int stmt_h_id, int shard_id, int cas_id)
{
  int *srv_h_id_p;
  T_SHARD_STMT *stmt_p = NULL;

  ENTER_FUNC ();

  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
  if (stmt_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find statement. (stmt_h_id:%d).", stmt_h_id);
      return;
    }

  srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
  if (srv_h_id_p)
    {
      *srv_h_id_p = SHARD_STMT_INVALID_HANDLE_ID;
    }

  EXIT_FUNC ();
  return;
}

void
shard_stmt_del_all_srv_h_id_for_shard_cas (int shard_id, int cas_id)
{
  int i;
  int *srv_h_id_p;

  T_SHARD_STMT *stmt_p = NULL;

  ENTER_FUNC ();

  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
    {
      stmt_p = &(shard_Stmt.stmt_ent[i]);
      if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
    {
      continue;
    }

      srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
      if (srv_h_id_p)
    {
      *srv_h_id_p = SHARD_STMT_INVALID_HANDLE_ID;
    }
    }

  EXIT_FUNC ();
  return;
}

int
shard_stmt_set_hint_list (T_SHARD_STMT * stmt_p)
{
  int ret;
  SP_PARSER_HINT *hint_p = NULL;
  SP_HINT_TYPE hint_type = HT_NONE;

  assert (stmt_p->stmt_type != SHARD_STMT_TYPE_SCHEMA_INFO);
  if (stmt_p->stmt_type == SHARD_STMT_TYPE_SCHEMA_INFO)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected statement type. (expect:%d, current:%d).", SHARD_STMT_TYPE_PREPARED,
         stmt_p->stmt_type);
      return -1;
    }

  ret = sp_parse_sql (stmt_p->parser);
  if (ret < 0)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to parse sql statement. " "(error:%d). stmt(%s).", ret,
         shard_str_stmt (stmt_p));
      return -1;
    }

  hint_p = sp_get_first_hint (stmt_p->parser);
  while (hint_p != NULL)
    {
      hint_type = hint_p->hint_type;

      /* currently only support HT_KEY and HT_ID */
      if (hint_type != HT_KEY && hint_type != HT_ID && hint_type != HT_VAL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint type. (hint_type:%d).", hint_type);
      return -1;
    }
      hint_p = sp_get_next_hint (hint_p);
    }
  return 1;
}

int
shard_stmt_get_hint_type (T_SHARD_STMT * stmt_p)
{
  SP_PARSER_HINT *hint_p = NULL;

  if (stmt_p == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement. Statement couldn't be NULL.");
      return HT_INVAL;
    }

  assert (stmt_p->stmt_type != SHARD_STMT_TYPE_SCHEMA_INFO);
  if (stmt_p->stmt_type == SHARD_STMT_TYPE_SCHEMA_INFO)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected statement type. (expect:%d, current:%d).", SHARD_STMT_TYPE_PREPARED,
         stmt_p->stmt_type);
      return -1;
    }

  if (stmt_p->parser == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid parser. Statement parser couldn't be NULL.");
      return HT_INVAL;
    }

  hint_p = sp_get_first_hint (stmt_p->parser);
  if (hint_p == NULL)
    {
      return HT_NONE;
    }

  return hint_p->hint_type;
}

int
shard_stmt_save_prepare_request (T_SHARD_STMT * stmt_p, bool has_shard_val_hint, char **prepare_req,
                 int *prepare_req_len, char *argv_sql_stmt, char *argv_remainder, char *orgzd_sql)
{
  int sql_size;
  int orgzd_sql_size, n_orgzd_sql_size;
  int req_msg_size;
  int remainder_size;
  int expand_size;
  unsigned int offset;
  char *prepare_req_header;
  char *tmp_prepare_req;
  char *sql_stmt;
  char *cur_p;

  prepare_req_header = *(prepare_req);

  net_arg_get_str (&sql_stmt, &sql_size, argv_sql_stmt);

  if (has_shard_val_hint == true)
    {
      req_msg_size = get_msg_length (prepare_req_header);
      orgzd_sql_size = strlen (orgzd_sql) + 1;

      if (sql_size >= orgzd_sql_size)
    {
      expand_size = 0;
    }
      else
    {
      expand_size = orgzd_sql_size - sql_size;
    }


      if (expand_size == 0)
    {
      /* replace organized sql */
      memcpy ((argv_sql_stmt + NET_SIZE_INT), orgzd_sql, orgzd_sql_size);
    }
      else
    {
      stmt_p->request_buffer = (char *) malloc (req_msg_size + expand_size);

      if (stmt_p->request_buffer == NULL)
        {
          PROXY_LOG (PROXY_LOG_MODE_ERROR, "Malloc failed.");
          return -1;
        }

      /* 1. append header and function code */
      offset = argv_sql_stmt - prepare_req_header;
      memcpy (stmt_p->request_buffer, prepare_req_header, offset);

      /* 2. append argv_sql_stmt : sql statement */
      /* 2.1 set length */
      cur_p = (char *) stmt_p->request_buffer + offset;
      n_orgzd_sql_size = htonl (orgzd_sql_size);
      memcpy (cur_p, &n_orgzd_sql_size, NET_SIZE_INT);

      /* 2.2 set organized sql string */
      cur_p += NET_SIZE_INT;
      memcpy (cur_p, orgzd_sql, orgzd_sql_size);

      /* 3. append argv_remainder ~ : the rest argvs */
      cur_p += orgzd_sql_size;
      remainder_size = (req_msg_size) - (argv_remainder - prepare_req_header);
      memcpy (cur_p, argv_remainder, remainder_size);

      /* 4. set length */
      stmt_p->request_buffer_length = (req_msg_size + expand_size);
      set_data_length ((char *) stmt_p->request_buffer, stmt_p->request_buffer_length - MSG_HEADER_SIZE);

      tmp_prepare_req = proxy_dup_msg ((char *) stmt_p->request_buffer);
      if (tmp_prepare_req == NULL)
        {
          PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to duplicate prepare request.");
          return -1;
        }

      FREE_MEM (*prepare_req);
      *prepare_req = tmp_prepare_req;
      *prepare_req_len = stmt_p->request_buffer_length;

      assert (stmt_p->request_buffer_length == get_msg_length (tmp_prepare_req));

      return 0;
    }
    }

  stmt_p->request_buffer = proxy_dup_msg (prepare_req_header);
  if (stmt_p->request_buffer == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to duplicate prepare request.");
      return -1;
    }
  stmt_p->request_buffer_length = get_msg_length (prepare_req_header);

  return 0;
}

#if defined (PROXY_VERBOSE_DEBUG)
void
shard_stmt_dump_title (FILE * fp)
{
  fprintf (fp, "[%-10s][%-5s] %-10s %-2s %-10s %-10s %-10s\n", "INDEX", "NUSED", "STMT_H_ID", "ST", "NUM_PINNED",
       "LRU_NEXT", "LRU_PREV");

  return;
}

void
shard_stmt_dump (FILE * fp, T_SHARD_STMT * stmt_p)
{
  fprintf (fp, "[%-10u][%-5u] %-10d %-2d %-10d %-10p %-10p\n", stmt_p->index, stmt_p->num_alloc, stmt_p->stmt_h_id,
       stmt_p->status, stmt_p->num_pinned, stmt_p->lru_next, stmt_p->lru_prev);

  return;
}

void
shard_stmt_dump_all (FILE * fp)
{
  T_SHARD_STMT *stmt_p;
  int i;

  fprintf (fp, "\n");
  fprintf (fp, "* %-20s : %d\n", "MAX_NUM_STMT", shard_Stmt.max_num_stmt);
  fprintf (fp, "* %-20s : %d\n", "MAX_NUM_SHARD", shard_Stmt.max_num_shard);
  fprintf (fp, "* %-20s : %d\n", "NUM_CAS_PER_SHARD", shard_Stmt.num_cas_per_shard);
  fprintf (fp, "\n");

  shard_stmt_dump_title (fp);
  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
    {
      stmt_p = &(shard_Stmt.stmt_ent[i]);
      shard_stmt_dump (fp, stmt_p);
    }

  return;
}
#endif /* ENABLE_UNUSED_FUNCTION */

char *
shard_str_stmt (T_SHARD_STMT * stmt_p)
{
  static char buffer[BUFSIZ];

  if (stmt_p == NULL)
    {
      return (char *) "-";
    }

  snprintf (buffer, sizeof (buffer),
        "index:%u, num_alloc:%u, " "stmt_h_id:%d, status:%d, " "stmt_type:%d, " "context id:%d, context uid:%d, "
        "num pinned:%d, " "lru_next:%p, lru_prev:%p, " "hash_next:%p, hash_prev:%p, " "db_user:%s, sql_stmt:[%s]",
        stmt_p->index, stmt_p->num_alloc, stmt_p->stmt_h_id, stmt_p->status, stmt_p->stmt_type, stmt_p->ctx_cid,
        stmt_p->ctx_uid, stmt_p->num_pinned, stmt_p->lru_next, stmt_p->lru_prev, stmt_p->hash_next,
        stmt_p->hash_prev, stmt_p->database_user,
        (stmt_p->parser) ? ((stmt_p->parser->sql_stmt) ? shard_str_sqls (stmt_p->parser->sql_stmt) : "-") : "-");

  return (char *) buffer;
}

int
shard_stmt_initialize (int initial_size)
{
  T_SHARD_STMT *stmt_p;
  T_SHARD_INFO *shard_info_p;
  int mem_size;
  int num_cas;
  int i, j;

  static bool is_init = false;

  if (is_init)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Statement have not been initialized.");
      return -1;
    }

  shard_info_p = shard_shm_find_shard_info (proxy_info_p, 0);

  mem_size = initial_size * sizeof (T_SHARD_STMT);
  shard_Stmt.stmt_ent = (T_SHARD_STMT *) malloc (mem_size);
  if (shard_Stmt.stmt_ent == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Not enough virtual memory. " "Failed to alloc statement entries. " "(errno:%d, size:%d).", errno,
         mem_size);

      return -1;
    }
  memset (shard_Stmt.stmt_ent, 0, mem_size);

  shard_Stmt.max_num_stmt = initial_size;
  shard_Stmt.max_num_shard = proxy_info_p->max_shard;
  shard_Stmt.num_cas_per_shard = shard_info_p->max_appl_server;
  shard_Stmt.mru = NULL;
  shard_Stmt.lru = NULL;

  shard_Stmt_max_num_alloc = MAX (shard_Stmt_max_num_alloc, (INT_MAX / shard_Stmt.max_num_stmt) - 1);

  shard_Stmt.stmt_map =
    mht_create ("Proxy statement pool", shard_Stmt.max_num_stmt, mht_1strhash, mht_compare_strings_are_equal);
  if (shard_Stmt.stmt_map == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory. " "Failed to alloc map of statement. " "(errno:%d).",
         errno);

      return -1;
    }

  num_cas = shard_Stmt.max_num_shard * shard_Stmt.num_cas_per_shard;

  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
    {
      stmt_p = &(shard_Stmt.stmt_ent[i]);
      memset (stmt_p, 0, sizeof (T_SHARD_STMT));

      stmt_p->index = i;
      stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
      stmt_p->ctx_uid = 0;
      stmt_p->database_user[0] = '\0';

      stmt_p->client_version = 0;

      stmt_p->srv_h_id_ent = (int *) malloc (num_cas * sizeof (int));
      if (stmt_p->srv_h_id_ent == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
             "Not enough virtual memory. " "Failed to alloc statement handle id entries. "
             "(errno:%d, size:%d).", errno, mem_size);


      return -1;
    }
      for (j = 0; j < num_cas; j++)
    {
      stmt_p->srv_h_id_ent[j] = SHARD_STMT_INVALID_HANDLE_ID;
    }
    }

  return 0;
}

char *
shard_stmt_rewrite_sql (bool * has_shard_val_hint, char *sql_stmt, char appl_server)
{
  int error = NO_ERROR;
  const char *p = NULL;
  const char *next_p = NULL;
  char *q = NULL;
  char *organized_sql_stmt = NULL;
  SP_HINT_TYPE hint_type = HT_NONE;
#if 0               /* for fully rewriting sql statement */
  SP_TOKEN curr_token;
  SP_TOKEN prev_token;
  int whitespace_count = 0;
#endif

  while (*sql_stmt && isspace (*sql_stmt))
    {
      sql_stmt++;
    }

  organized_sql_stmt = (char *) malloc (sizeof (char) * (strlen (sql_stmt) + 2));
  if (organized_sql_stmt == NULL)
    {
      return NULL;
    }

  p = sql_stmt;
  q = organized_sql_stmt;

#if 0               /* for fully rewriting sql statement */
  prev_token = curr_token = TT_NONE;
  while (*p)
    {
      next_p = sp_get_token_type (p, &curr_token);

      switch (prev_token)
    {
    case TT_SINGLE_QUOTED:
    case TT_DOUBLE_QUOTED:
      switch (curr_token)
        {
        case TT_SINGLE_QUOTED:
        case TT_DOUBLE_QUOTED:
          q = shard_stmt_write_buf_to_sql (q, "'", 1, true, appl_server);
          break;
        default:
          q = shard_stmt_write_buf_to_sql (q, p, next_p - p, false, appl_server);
        }
      break;
    case TT_CPP_COMMENT:
    case TT_CSQL_COMMENT:
      if (curr_token == TT_NEWLINE)
        {
          whitespace_count = 0;
          q = shard_stmt_write_buf_to_sql (q, "*/", 2, true, appl_server);
          break;
        }
    default:
      switch (curr_token)
        {
        case TT_NEWLINE:
        case TT_WHITESPACE:
          if (whitespace_count++ < 1)
        {
          q = shard_stmt_write_buf_to_sql (q, " ", 1, true, appl_server);
        }
          break;
        case TT_SINGLE_QUOTED:
        case TT_DOUBLE_QUOTED:
          q = shard_stmt_write_buf_to_sql (q, "'", 1, true, appl_server);
          break;
        case TT_CSQL_COMMENT:
        case TT_CPP_COMMENT:
          q = shard_stmt_write_buf_to_sql (q, "/*", 2, true, appl_server);
          p = next_p;
          if (*p == '+')
        {
          p++;
          p = sp_get_hint_type (p, &hint_type);
          if (hint_type == HT_VAL)
            {
              error = shard_stmt_change_shard_val_to_id (&q, &p, appl_server);
              if (error != NO_ERROR)
            {
              free (organized_sql_stmt);
              organized_sql_stmt = NULL;

              return NULL;
            }
              next_p = p;
            }
        }
          break;
        case TT_HINT:
          q = shard_stmt_write_buf_to_sql (q, "/*+", 3, true, appl_server);
          p = next_p;
          p = sp_get_hint_type (p, &hint_type);
          if (hint_type == HT_VAL)
        {
          error = shard_stmt_change_shard_val_to_id (&q, &p, appl_server);
          if (error != NO_ERROR)
            {
              free (organized_sql_stmt);
              organized_sql_stmt = NULL;

              return NULL;
            }
          next_p = p;
        }
          break;
        default:
          q = shard_stmt_write_buf_to_sql (q, p, next_p - p, true, appl_server);
          break;
        }
    }

      if (curr_token != TT_WHITESPACE && curr_token != TT_NEWLINE)
    {
      whitespace_count = 0;
    }

      if (prev_token == TT_NONE)
    {
      if (sp_is_exist_pair_token (curr_token))
        {
          prev_token = curr_token;
        }
    }
      else if (sp_is_pair_token (prev_token, curr_token))
    {
      prev_token = TT_NONE;
    }

      p = next_p;
    }
#else
  while (*(p))
    {
      if (*(p) == '/' && *(p + 1) == '*' && *(p + 2) == '+')
    {
      q = shard_stmt_write_buf_to_sql (q, "/*+", 3, true, appl_server);
      p = p + 3;
      next_p = p;
      p = sp_get_hint_type (p, &hint_type);
      if (hint_type == HT_VAL)
        {
          error = shard_stmt_change_shard_val_to_id (&q, &p, appl_server);
          if (error != NO_ERROR)
        {
          free (organized_sql_stmt);
          organized_sql_stmt = NULL;

          return NULL;
        }

          *(has_shard_val_hint) = true;
        }
      else
        {
          p = next_p;
        }
    }
      else
    {
      q = shard_stmt_write_buf_to_sql (q, p, 1, true, appl_server);
      p += 1;
    }
    }
#endif

  *(q++) = '\0';        /* NTS */

  return organized_sql_stmt;
}

static int
shard_stmt_change_shard_val_to_id (char **sql_stmt, const char **buf, char appl_server)
{
  int error = NO_ERROR;
  SP_PARSER_HINT *hint_p = NULL;
  T_SHARD_KEY *key_p = NULL;
  T_SHARD_KEY_RANGE *range_p = NULL;
  const char *key_column;
  int shard_key_id = -1;
  char shard_key_id_string[16] = { '\0' };

  hint_p = sp_create_parser_hint ();
  if (hint_p == NULL)
    {
      error = ER_SP_OUT_OF_MEMORY;
      goto FINALLY;
    }

  hint_p->hint_type = HT_VAL;

  *buf = sp_get_hint_arg (*buf, hint_p, &error);
  if (error != NO_ERROR)
    {
      goto FINALLY;
    }

  *buf = sp_check_end_of_hint (*buf, &error);
  if (error != NO_ERROR)
    {
      goto FINALLY;
    }

  key_p = (T_SHARD_KEY *) (&(shm_key_p->shard_key[0]));
  key_column = key_p->key_column;

  shard_key_id = proxy_find_shard_id_by_hint_value (&hint_p->arg, key_column);
  if (shard_key_id < 0)
    {
      error = ER_SP_INVALID_HINT;
      goto FINALLY;
    }

  range_p = shard_metadata_find_shard_range (shm_key_p, key_column, shard_key_id);
  if (range_p == NULL)
    {
      error = ER_SP_INVALID_HINT;

      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find shard key range. " "(key:[%s], key_id:%d).", key_column,
         shard_key_id);

      goto FINALLY;
    }

  snprintf (shard_key_id_string, sizeof (shard_key_id_string), "shard_id(%d)*/", range_p->shard_id);
  *sql_stmt =
    shard_stmt_write_buf_to_sql (*sql_stmt, shard_key_id_string, strlen (shard_key_id_string), true, appl_server);

FINALLY:
  if (hint_p != NULL)
    {
      sp_free_parser_hint (hint_p);
      free (hint_p);
    }
  return error;
}

char *
shard_stmt_write_buf_to_sql (char *sql_stmt, const char *buf, int length, bool is_to_upper, char appl_server)
{
  int i = 0;

  for (; i < length; i++)
    {
#if 0
      if (is_to_upper && appl_server == APPL_SERVER_CAS)
    {
      *(sql_stmt++) = toupper (*(buf++));
    }
      else
    {
      *(sql_stmt++) = *(buf++);
    }
#else
      *(sql_stmt++) = *(buf++);
#endif
    }

  return sql_stmt;
}

void
shard_statement_wait_timer (void)
{
  T_SHARD_STMT *stmt_p;
  int i, now;

  now = time (NULL);

  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
    {
      stmt_p = &(shard_Stmt.stmt_ent[i]);
      if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
    {
      continue;
    }

      proxy_waiter_timeout (&stmt_p->waitq, &proxy_info_p->stmt_waiter_count, now);
    }

  return;
}

void
shard_stmt_set_status_invalid (int stmt_h_id)
{
  shard_stmt_set_status (stmt_h_id, SHARD_STMT_STATUS_INVALID);
}

void
shard_stmt_set_status_complete (int stmt_h_id)
{
  shard_stmt_set_status (stmt_h_id, SHARD_STMT_STATUS_COMPLETE);
}

static void
shard_stmt_put_statement_to_map (const char *sql_stmt, T_SHARD_STMT * stmt_p)
{
  T_SHARD_STMT *hash_stmt_p = NULL;

  hash_stmt_p = (T_SHARD_STMT *) mht_get (shard_Stmt.stmt_map, sql_stmt);
  if (hash_stmt_p == NULL)
    {
      mht_put (shard_Stmt.stmt_map, sp_get_sql_stmt (stmt_p->parser), stmt_p);
      stmt_p->hash_prev = NULL;
      stmt_p->hash_next = NULL;

      PROXY_DEBUG_LOG ("SHARD_STMT PUT : %s", shard_str_stmt (stmt_p));
    }
  else
    {
      while (hash_stmt_p->hash_next)
    {
      hash_stmt_p = hash_stmt_p->hash_next;
    }

      hash_stmt_p->hash_next = stmt_p;
      stmt_p->hash_prev = hash_stmt_p;
      stmt_p->hash_next = NULL;
    }
}

static void
shard_stmt_del_statement_from_map (T_SHARD_STMT * stmt_p)
{
  const char *key = NULL;
  T_SHARD_STMT *hash_stmt_p = NULL;
  T_SHARD_STMT *prev_stmt_p = NULL;
  T_SHARD_STMT *next_stmt_p = NULL;

  key = sp_get_sql_stmt (stmt_p->parser);

  hash_stmt_p = (T_SHARD_STMT *) mht_get (shard_Stmt.stmt_map, key);
  if (hash_stmt_p == NULL)
    {
      return;
    }

  while (hash_stmt_p != NULL && stmt_p != hash_stmt_p)
    {
      hash_stmt_p = hash_stmt_p->hash_next;
    }

  if (hash_stmt_p == NULL || stmt_p != hash_stmt_p)
    {
      assert (false);
      return;
    }

  if (stmt_p->hash_prev)
    {
      prev_stmt_p = stmt_p->hash_prev;
      next_stmt_p = stmt_p->hash_next;

      prev_stmt_p->hash_next = next_stmt_p;
      if (next_stmt_p)
    {
      next_stmt_p->hash_prev = prev_stmt_p;
    }
    }
  else
    {
      mht_rem (shard_Stmt.stmt_map, key, NULL, NULL);

      if (stmt_p->hash_next)
    {
      next_stmt_p = stmt_p->hash_next;

      next_stmt_p->hash_prev = NULL;
      mht_put (shard_Stmt.stmt_map, sp_get_sql_stmt (next_stmt_p->parser), next_stmt_p);
    }
    }
}

static void
shard_stmt_set_status (int stmt_h_id, int status)
{
  T_SHARD_STMT *stmt_p = NULL;

  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
  if (stmt_p == NULL)
    {
      assert (false);
      return;
    }

  if (stmt_p->status == SHARD_STMT_STATUS_INVALID && status == SHARD_STMT_STATUS_COMPLETE)
    {
      assert (false);
    }

  stmt_p->status = status;
}