Skip to content

File connection_sr.c

File List > connection > connection_sr.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * connection_sr.c - Client/Server connection list management
 */

#ident "$Id$"

#include "config.h"

#if defined (WINDOWS)
#include <io.h>
#endif
#include <filesystem>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <assert.h>

#if defined(WINDOWS)
#include <winsock2.h>
#include <windows.h>
#else /* WINDOWS */
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#endif /* WINDOWS */

#if defined(_AIX)
#include <sys/select.h>
#endif /* _AIX */

#if defined(SOLARIS)
#include <sys/filio.h>
#include <netdb.h>
#endif /* SOLARIS */

#if defined(SOLARIS) || defined(LINUX)
#include <unistd.h>
#endif /* SOLARIS || LINUX */

#include "porting.h"
#include "error_manager.h"
#include "connection_globals.h"
#include "filesys.hpp"
#include "filesys_temp.hpp"
#include "memory_alloc.h"
#include "environment_variable.h"
#include "system_parameter.h"
#include "critical_section.h"
#include "log_manager.h"
#include "object_representation.h"
#include "connection_error.h"
#include "log_impl.h"
#include "session.h"
#if defined(WINDOWS)
#include "wintcp.h"
#else /* WINDOWS */
#include "tcp.h"
#endif /* WINDOWS */
#include "connection_context.hpp"
#include "connection_worker.hpp"
#include "connection_sr.h"
#include "server_support.h"
#include "thread_manager.hpp"   // for thread_get_thread_entry_info
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#ifdef PACKET_TRACE
#define TRACE(string, arg)                  \
    do {                            \
        er_log_debug(ARG_FILE_LINE, string, arg);   \
    }                           \
    while(0);
#else /* PACKET_TRACE */
#define TRACE(string, arg)
#endif /* PACKET_TRACE */

typedef struct queue_search_arg
{
  CSS_QUEUE_ENTRY *entry_ptr;
  int key;
  int remove_entry;
} CSS_QUEUE_SEARCH_ARG;

typedef struct wait_queue_search_arg
{
  CSS_WAIT_QUEUE_ENTRY *entry_ptr;
  unsigned int key;
  int remove_entry;
} CSS_WAIT_QUEUE_SEARCH_ARG;

#define NUM_NORMAL_CLIENTS (prm_get_integer_value(PRM_ID_CSS_MAX_CLIENTS))

#define RMUTEX_NAME_CONN_ENTRY "CONN_ENTRY"

static const int CSS_MAX_CLIENT_ID = INT_MAX - 1;

static int css_Client_id = 0;
static pthread_mutex_t css_Client_id_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t css_Conn_rule_lock = PTHREAD_MUTEX_INITIALIZER;
static CSS_CONN_ENTRY *css_Free_conn_anchor = NULL;
static int css_Num_free_conn = 0;
static int css_Num_current_client = 0;  /* default max_clients + 1 for conn with master */
static int css_Num_max_client = 101;    /* default max_clients + 1 for conn with master */
static int css_Num_max_conn = 202;  /* must have a extra conn to avoid issuing the new conn
                       when there is no conn even if the actual connceted client
                       is lower than css_Num_max_client */

CSS_CONN_ENTRY *css_Conn_array = NULL;
CSS_CONN_ENTRY *css_Active_conn_anchor = NULL;
static int css_Num_active_conn = 0;

SYNC_RWLOCK css_Rwlock_active_conn_anchor;
SYNC_RWLOCK css_Rwlock_free_conn_anchor;

static LAST_ACCESS_STATUS *css_Access_status_anchor = NULL;
int css_Num_access_user = 0;

static char css_Server_exec_path[PATH_MAX];
static char **css_Server_argv;

#define CSS_CONN_IDX(conn_arg) ((conn_arg) - css_Conn_array)

#define CSS_FREE_CONN_MSG "Free count = %d, head = %d"
#define CSS_FREE_CONN_ARGS css_Num_free_conn, CSS_CONN_IDX (css_Free_conn_anchor)

#define CSS_ACTIVE_CONN_MSG "Active count = %d, head = %d"
#define CSS_ACTIVE_CONN_ARGS css_Num_active_conn, CSS_CONN_IDX (css_Active_conn_anchor)

static int css_get_next_client_id (void);
static CSS_CONN_ENTRY *css_common_connect (CSS_CONN_ENTRY * conn, unsigned short *rid, const char *host_name,
                       int connect_type, const char *server_name, int server_name_length, int port);
static void css_dealloc_conn (CSS_CONN_ENTRY * conn);

static unsigned int css_make_eid (unsigned short entry_id, unsigned short rid);

static CSS_QUEUE_ENTRY *css_claim_queue_entry (CSS_CONN_ENTRY * conn);
static void css_retire_queue_entry (CSS_CONN_ENTRY * conn, CSS_QUEUE_ENTRY * entry);
static void css_free_queue_entry_list (CSS_CONN_ENTRY * conn);

static CSS_WAIT_QUEUE_ENTRY *css_claim_wait_queue_entry (CSS_CONN_ENTRY * conn);
static void css_retire_wait_queue_entry (CSS_CONN_ENTRY * conn, CSS_WAIT_QUEUE_ENTRY * entry);
static void css_free_wait_queue_list (CSS_CONN_ENTRY * conn);

static CSS_QUEUE_ENTRY *css_make_queue_entry (CSS_CONN_ENTRY * conn, unsigned int key, char *buffer,
                          int size, int rc, int transid, int invalidate_snapshot, int db_error);
static void css_free_queue_entry (CSS_CONN_ENTRY * conn, CSS_QUEUE_ENTRY * entry);
static CSS_QUEUE_ENTRY *css_find_queue_entry (CSS_LIST * list, unsigned int key);
static CSS_QUEUE_ENTRY *css_find_and_remove_queue_entry (CSS_LIST * list, unsigned int key);
static CSS_WAIT_QUEUE_ENTRY *css_make_wait_queue_entry (CSS_CONN_ENTRY * conn, unsigned int key, char **buffer,
                            int *size, int *rc);
static CSS_WAIT_QUEUE_ENTRY *css_add_wait_queue_entry (CSS_CONN_ENTRY * conn, CSS_LIST * list,
                               unsigned short request_id, char **buffer, int *buffer_size,
                               int *rc);
static void css_process_close_packet (CSS_CONN_ENTRY * conn);
static void clear_wait_queue_entry_and_free_buffer (THREAD_ENTRY * thrdp, CSS_CONN_ENTRY * conn, unsigned short rid,
                            char **bufferp, int *sizep);
static int css_return_queued_data_timeout (CSS_CONN_ENTRY * conn, unsigned short rid, char **buffer, int *bufsize,
                       int *rc, int waitsec);

static void css_queue_data_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, const NET_HEADER * header,
                   THREAD_ENTRY ** wait_thrd);
static void css_queue_error_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, const NET_HEADER * header);
static css_error_code css_queue_command_packet (CSS_CONN_ENTRY * conn, unsigned short request_id,
                        const NET_HEADER * header, int size);
static bool css_is_valid_request_id (CSS_CONN_ENTRY * conn, unsigned short request_id);

static css_error_code css_queue_packet (CSS_CONN_ENTRY * conn, int type, unsigned short request_id,
                    const NET_HEADER * header, int size);
static int css_remove_and_free_queue_entry (void *data, void *arg);
static int css_remove_and_free_wait_queue_entry (void *data, void *arg);

static int css_increment_num_conn_internal (CSS_CONN_RULE_INFO * conn_rule_info);
static void css_decrement_num_conn_internal (CSS_CONN_RULE_INFO * conn_rule_info);

/*
 * get_next_client_id() -
 *   return: client id
 */
static int
css_get_next_client_id (void)
{
  static bool overflow = false;
  int next_client_id, rv, i;
  bool retry;

  rv = pthread_mutex_lock (&css_Client_id_lock);
  if (rv != 0)
    {
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_MUTEX_LOCK, 0);
      return ER_FAILED;
    }

  do
    {
      css_Client_id++;
      if (css_Client_id == CSS_MAX_CLIENT_ID)
    {
      css_Client_id = 1;
      overflow = true;
    }

      retry = false;
      for (i = 0; overflow && i < css_Num_max_conn; i++)
    {
      if (css_Conn_array[i].client_id == css_Client_id)
        {
          retry = true;
          break;
        }
    }
    }
  while (retry);

  next_client_id = css_Client_id;

  rv = pthread_mutex_unlock (&css_Client_id_lock);
  if (rv != 0)
    {
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_MUTEX_UNLOCK, 0);
      return ER_FAILED;
    }

  return next_client_id;
}

/*
 * css_initialize_conn() - initialize connection entry
 *   return: void
 *   conn(in):
 *   fd(in):
 */
int
css_initialize_conn (CSS_CONN_ENTRY * conn, SOCKET fd)
{
  int err;

  conn->fd = fd;
  conn->request_id = 0;
  conn->status = CONN_OPEN;
  conn->set_tran_index (NULL_TRAN_INDEX);
  conn->init_pending_request ();
  conn->init_working_task ();
  conn->invalidate_snapshot = 1;
  conn->in_method = false;
  err = css_get_next_client_id ();
  if (err < 0)
    {
      return ER_CSS_CONN_INIT;
    }
  conn->client_id = err;
  conn->db_error = 0;
  conn->in_transaction = false;
  conn->in_flashback = false;
  conn->reset_on_commit = false;
  conn->stop_talk = false;
  conn->ignore_repl_delay = false;
  conn->stop_phase = THREAD_STOP_WORKERS_EXCEPT_LOGWR;
  conn->version_string = NULL;

  /* ignore connection handler thread */
  conn->free_queue_list = NULL;
  conn->free_queue_count = 0;

  conn->free_wait_queue_list = NULL;
  conn->free_wait_queue_count = 0;

  conn->session_id = DB_EMPTY_SESSION;
#if defined(SERVER_MODE)
  conn->worker = nullptr;
  conn->context = nullptr;
  conn->session_p = NULL;
  conn->client_type = DB_CLIENT_TYPE_UNKNOWN;
#endif

  err = css_initialize_list (&conn->request_queue, 0);
  if (err != NO_ERROR)
    {
      return ER_CSS_CONN_INIT;
    }
  err = css_initialize_list (&conn->data_queue, 0);
  if (err != NO_ERROR)
    {
      return ER_CSS_CONN_INIT;
    }
  err = css_initialize_list (&conn->data_wait_queue, 0);
  if (err != NO_ERROR)
    {
      return ER_CSS_CONN_INIT;
    }
  err = css_initialize_list (&conn->abort_queue, 0);
  if (err != NO_ERROR)
    {
      return ER_CSS_CONN_INIT;
    }
  err = css_initialize_list (&conn->buffer_queue, 0);
  if (err != NO_ERROR)
    {
      return ER_CSS_CONN_INIT;
    }
  err = css_initialize_list (&conn->error_queue, 0);
  if (err != NO_ERROR)
    {
      return ER_CSS_CONN_INIT;
    }
  return NO_ERROR;
}

/*
 * css_prepare_shutdown_conn() - prepare to close connection entry
 *   return: void
 *   conn(in):
 */
void
css_prepare_shutdown_conn (CSS_CONN_ENTRY * conn)
{
  int r;

  conn->stop_talk = false;
  conn->in_flashback = false;

  START_EXCLUSIVE_ACCESS_FREE_CONN_ANCHOR (r);

  css_Num_current_client--;
  assert (css_Num_current_client >= 0);

  END_EXCLUSIVE_ACCESS_FREE_CONN_ANCHOR (r);
}

/*
 * css_shutdown_conn() - close connection entry
 *   return: void
 *   conn(in):
 */
void
css_shutdown_conn (CSS_CONN_ENTRY * conn)
{
  int r;

  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  if (!IS_INVALID_SOCKET (conn->fd))
    {
      /* if this is the PC, it also shuts down Winsock */
      css_shutdown_socket (conn->fd);
      conn->fd = INVALID_SOCKET;
    }

  if (conn->status == CONN_OPEN || conn->status == CONN_CLOSING)
    {
      conn->status = CONN_CLOSED;
      conn->stop_phase = THREAD_STOP_WORKERS_EXCEPT_LOGWR;
      if (conn->version_string)
    {
      free_and_init (conn->version_string);
    }

      css_remove_all_unexpected_packets (conn);

      css_finalize_list (&conn->request_queue);
      css_finalize_list (&conn->data_queue);
      css_finalize_list (&conn->data_wait_queue);
      css_finalize_list (&conn->abort_queue);
      css_finalize_list (&conn->buffer_queue);
      css_finalize_list (&conn->error_queue);
    }

  if (conn->free_queue_list != NULL)
    {
      assert (conn->free_queue_count > 0);
      css_free_queue_entry_list (conn);
    }

  if (conn->free_wait_queue_list != NULL)
    {
      assert (conn->free_wait_queue_count > 0);
      css_free_wait_queue_list (conn);
    }

#if defined(SERVER_MODE)
  if (conn->session_p)
    {
      session_state_decrease_ref_count (NULL, conn->session_p);
      conn->session_p = NULL;
      conn->session_id = DB_EMPTY_SESSION;
    }
#endif

  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);
}

/*
 * css_init_conn_list() - initialize connection list
 *   return: NO_ERROR if success, or error code
 */
int
css_init_conn_list (void)
{
  int i, err;
  CSS_CONN_ENTRY *conn;

  css_init_conn_rules ();

  css_Num_max_client = css_get_max_conn () + NUM_MASTER_CHANNEL;

  if (css_Conn_array != NULL)
    {
      return NO_ERROR;
    }

  err = rwlock_initialize (CSS_RWLOCK_ACTIVE_CONN_ANCHOR, CSS_RWLOCK_ACTIVE_CONN_ANCHOR_NAME);
  if (err != NO_ERROR)
    {
      ASSERT_ERROR ();
      return err;
    }

  err = rwlock_initialize (CSS_RWLOCK_FREE_CONN_ANCHOR, CSS_RWLOCK_FREE_CONN_ANCHOR_NAME);
  if (err != NO_ERROR)
    {
      ASSERT_ERROR ();
      (void) rwlock_finalize (CSS_RWLOCK_ACTIVE_CONN_ANCHOR);
      return err;
    }

  /*
   * allocate NUM_MASTER_CHANNEL + the total number of
   *  conn entries
   */
  css_Num_max_conn = css_Num_max_client * 2;
  css_Conn_array = (CSS_CONN_ENTRY *) malloc (sizeof (CSS_CONN_ENTRY) * (css_Num_max_conn));
  if (css_Conn_array == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
          sizeof (CSS_CONN_ENTRY) * (css_Num_max_conn));
      err = ER_OUT_OF_VIRTUAL_MEMORY;
      goto error;
    }

  /* initialize all CSS_CONN_ENTRY */
  for (i = 0; i < css_Num_max_conn; i++)
    {
      conn = &css_Conn_array[i];
      conn->idx = i;
      err = css_initialize_conn (conn, -1);
      if (err != NO_ERROR)
    {
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_CONN_INIT, 0);
      err = ER_CSS_CONN_INIT;
      goto error;
    }

      err = rmutex_initialize (&conn->rmutex, RMUTEX_NAME_CONN_ENTRY);
      if (err != NO_ERROR)
    {
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_CONN_INIT, 0);
      err = ER_CSS_CONN_INIT;
      goto error;
    }

      err = rmutex_initialize (&conn->cmutex, RMUTEX_NAME_CONN_ENTRY);
      if (err != NO_ERROR)
    {
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_CONN_INIT, 0);
      err = ER_CSS_CONN_INIT;
      goto error;
    }

      if (i < css_Num_max_conn - 1)
    {
      conn->next = &css_Conn_array[i + 1];
    }
      else
    {
      conn->next = NULL;
    }
    }

  /* initialize active conn list, used for stopping all threads */
  css_Active_conn_anchor = NULL;
  css_Free_conn_anchor = &css_Conn_array[0];
  css_Num_current_client = 0;
  css_Num_free_conn = css_Num_max_conn;

  return NO_ERROR;

error:
  (void) rwlock_finalize (CSS_RWLOCK_ACTIVE_CONN_ANCHOR);
  (void) rwlock_finalize (CSS_RWLOCK_FREE_CONN_ANCHOR);

  if (css_Conn_array != NULL)
    {
      free_and_init (css_Conn_array);
    }

  return err;
}

/*
 * css_final_conn_list() - free connection list
 *   return: void
 */
void
css_final_conn_list (void)
{
  CSS_CONN_ENTRY *conn, *next;
  int i;

  if (css_Active_conn_anchor != NULL)
    {
      for (conn = css_Active_conn_anchor; conn != NULL; conn = next)
    {
      next = conn->next;
      css_shutdown_conn (conn);
      css_dealloc_conn (conn);

      css_Num_active_conn--;
      assert (css_Num_active_conn >= 0);
    }

      css_Active_conn_anchor = NULL;
    }

  assert (css_Num_active_conn == 0);
  assert (css_Active_conn_anchor == NULL);

  if (css_Conn_array != NULL)
    {
      for (i = 0; i < css_Num_max_conn; i++)
    {
      conn = &css_Conn_array[i];

#if defined(SERVER_MODE)
      assert (conn->idx == i);
#endif
      (void) rmutex_finalize (&conn->rmutex);
      (void) rmutex_finalize (&conn->cmutex);
    }

      free_and_init (css_Conn_array);

      (void) rwlock_finalize (CSS_RWLOCK_ACTIVE_CONN_ANCHOR);
      (void) rwlock_finalize (CSS_RWLOCK_FREE_CONN_ANCHOR);
    }
}

/*
 * css_make_conn() - make new connection entry, but not insert into active
 *                   conn list
 *   return: new connection entry
 *   fd(in): socket discriptor
 */
CSS_CONN_ENTRY *
css_make_conn (SOCKET fd)
{
  CSS_CONN_ENTRY *conn = NULL;
  int r;

  START_EXCLUSIVE_ACCESS_FREE_CONN_ANCHOR (r);

  assert (css_Num_current_client <= css_Num_max_client);

  if (css_Num_current_client < css_Num_max_client)
    {
      if (css_Free_conn_anchor != NULL)
    {
      conn = css_Free_conn_anchor;
      css_Free_conn_anchor = css_Free_conn_anchor->next;
      conn->next = NULL;

      css_Num_free_conn--;
      css_Num_current_client++;

      CSS_LOG_STACK ("css_make_conn: conn = %d, " CSS_FREE_CONN_MSG, CSS_CONN_IDX (conn), CSS_FREE_CONN_ARGS);
    }
    }

  assert (css_Num_free_conn >= 0);
  assert (css_Num_current_client <= css_Num_max_client);

  END_EXCLUSIVE_ACCESS_FREE_CONN_ANCHOR (r);

  if (conn != NULL)
    {
      if (css_initialize_conn (conn, fd) != NO_ERROR)
    {
      css_prepare_shutdown_conn (conn);
      css_free_conn (conn);
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_CONN_INIT, 0);
      return NULL;
    }
    }

  return conn;
}

/*
 * css_insert_into_active_conn_list() - insert/remove into/from active conn
 *                                      list. this operation must be called
 *                                      after/before css_free_conn etc.
 *   return: void
 *   conn(in): connection entry will be inserted
 */
void
css_insert_into_active_conn_list (CSS_CONN_ENTRY * conn)
{
  int r;

  START_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);

  CSS_LOG_STACK ("css_insert_into_active_conn_list conn = %d, prev " CSS_ACTIVE_CONN_MSG, CSS_CONN_IDX (conn),
         CSS_ACTIVE_CONN_ARGS);

  conn->next = css_Active_conn_anchor;
  css_Active_conn_anchor = conn;

  css_Num_active_conn++;

  assert (css_Num_active_conn > 0);
  assert (css_Num_active_conn <= css_Num_max_conn);

  END_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);
}

/*
 * css_dealloc_conn() - free connection entry
 *   return: void
 *   conn(in): connection entry will be free
 */
static void
css_dealloc_conn (CSS_CONN_ENTRY * conn)
{
  int r;

  START_EXCLUSIVE_ACCESS_FREE_CONN_ANCHOR (r);

  CSS_LOG_STACK ("css_dealloc_conn conn = %d, prev " CSS_FREE_CONN_MSG, CSS_CONN_IDX (conn), CSS_FREE_CONN_ARGS);

  conn->next = css_Free_conn_anchor;
  css_Free_conn_anchor = conn;

  css_Num_free_conn++;

  assert (css_Num_free_conn > 0);
  assert (css_Num_free_conn <= css_Num_max_conn);

  END_EXCLUSIVE_ACCESS_FREE_CONN_ANCHOR (r);
}

/*
 * css_get_num_free_conn -
 */
int
css_get_num_free_conn (void)
{
  return css_Num_free_conn;
}

/*
 * css_increment_num_conn_internal() - increments conn counter
 *   based on client type
 *   return: error code
 *   client_type(in): a type of a client trying
 *   to release the connection
 */
static int
css_increment_num_conn_internal (CSS_CONN_RULE_INFO * conn_rule_info)
{
  int error = NO_ERROR;

  switch (conn_rule_info->rule)
    {
    case CR_NORMAL_ONLY:
      if (conn_rule_info->num_curr_conn == conn_rule_info->max_num_conn)
    {
      error = ER_CSS_CLIENTS_EXCEEDED;
    }
      else
    {
      conn_rule_info->num_curr_conn++;
    }
      break;
    case CR_NORMAL_FIRST:
      /* tries to use a normal conn first */
      if (css_increment_num_conn_internal (&css_Conn_rules[CSS_CR_NORMAL_ONLY_IDX]) != NO_ERROR)
    {
      /* if normal conns are all occupied, uses a reserved conn */
      if (conn_rule_info->num_curr_conn == conn_rule_info->max_num_conn)
        {
          error = ER_CSS_CLIENTS_EXCEEDED;
        }
      else
        {
          conn_rule_info->num_curr_conn++;
          assert (conn_rule_info->num_curr_conn <= conn_rule_info->max_num_conn);
        }
    }
      break;
    case CR_RESERVED_FIRST:
      /* tries to use a reserved conn first */
      if (conn_rule_info->num_curr_conn < conn_rule_info->max_num_conn)
    {
      conn_rule_info->num_curr_conn++;
    }
      else          /* uses a normal conn if no reserved conn is available */
    {
      if (css_increment_num_conn_internal (&css_Conn_rules[CSS_CR_NORMAL_ONLY_IDX]) != NO_ERROR)
        {
          error = ER_CSS_CLIENTS_EXCEEDED;
        }
      else
        {
          /* also increments its own conn counter */
          conn_rule_info->num_curr_conn++;
          assert (conn_rule_info->num_curr_conn <=
              (css_Conn_rules[CSS_CR_NORMAL_ONLY_IDX].max_num_conn + conn_rule_info->max_num_conn));
        }
    }
      break;
    default:
      assert (false);
      break;
    }

  return error;
}

/*
 * css_decrement_num_conn_internal() - decrements conn counter
 *   based on client type
 *   return:
 *   client_type(in): a type of a client trying
 *   to release the connection
 */
static void
css_decrement_num_conn_internal (CSS_CONN_RULE_INFO * conn_rule_info)
{
  int i;

  switch (conn_rule_info->rule)
    {
    case CR_NORMAL_ONLY:
      /* When a normal client decrements the counter, it should first check that other normal-first-reserved-last
       * clients need to take the released connection first. */
      for (i = 1; i < css_Conn_rules_size; i++)
    {
      if (css_Conn_rules[i].rule == CR_NORMAL_FIRST && css_Conn_rules[i].num_curr_conn > 0)
        {
          css_Conn_rules[i].num_curr_conn--;

          return;
        }
    }
      conn_rule_info->num_curr_conn--;
      break;

    case CR_NORMAL_FIRST:
      /* decrements reserved conn counter first if exists */
      if (conn_rule_info->num_curr_conn > 0)
    {
      conn_rule_info->num_curr_conn--;
    }
      else          /* decrements normal conn counter if no reserved conn is in use */
    {
      css_decrement_num_conn_internal (&css_Conn_rules[CSS_CR_NORMAL_ONLY_IDX]);
    }
      break;

    case CR_RESERVED_FIRST:
      /* decrements normal conn counter if exists */
      if (conn_rule_info->num_curr_conn > conn_rule_info->max_num_conn)
    {
      css_decrement_num_conn_internal (&css_Conn_rules[CSS_CR_NORMAL_ONLY_IDX]);
    }
      /* also decrements its own conn counter */
      conn_rule_info->num_curr_conn--;
      break;

    default:
      assert (false);
      break;
    }

  assert (conn_rule_info->num_curr_conn >= 0);

  return;
}

/*
 * css_increment_num_conn() - increment a connection counter
 * and check if a client can take its connection
 *   return: error code
 *   client_type(in): a type of a client trying
 *   to take the connection
 */
int
css_increment_num_conn (BOOT_CLIENT_TYPE client_type)
{
  int i;
  int error = NO_ERROR;

  for (i = 0; i < css_Conn_rules_size; i++)
    {
      if (css_Conn_rules[i].check_client_type_fn (client_type))
    {
      pthread_mutex_lock (&css_Conn_rule_lock);
      error = css_increment_num_conn_internal (&css_Conn_rules[i]);
      pthread_mutex_unlock (&css_Conn_rule_lock);
      break;
    }
    }

  return error;
}

/*
 * css_decrement_num_conn() - decrement a connection counter
 *   return:
 *   client_type(in): a type of a client trying
 *   to release the connection
 */
void
css_decrement_num_conn (BOOT_CLIENT_TYPE client_type)
{
  int i;

  if (client_type == DB_CLIENT_TYPE_UNKNOWN)
    {
      return;
    }

  for (i = 0; i < css_Conn_rules_size; i++)
    {
      if (css_Conn_rules[i].check_client_type_fn (client_type))
    {
      pthread_mutex_lock (&css_Conn_rule_lock);
      css_decrement_num_conn_internal (&css_Conn_rules[i]);
      pthread_mutex_unlock (&css_Conn_rule_lock);
      break;
    }
    }

  return;
}

/*
 * css_free_conn() - destroy all connection related structures, and free conn
 *                   entry, delete from css_Active_conn_anchor list
 *   return: void
 *   conn(in): connection entry will be free
 */
void
css_free_conn (CSS_CONN_ENTRY * conn)
{
  CSS_CONN_ENTRY *p, *prev = NULL, *next;
  int r;

  START_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);

  /* find and remove from active conn list */
  for (p = css_Active_conn_anchor; p != NULL; p = next)
    {
      next = p->next;

      if (p == conn)
    {
      if (prev == NULL)
        {
          css_Active_conn_anchor = next;
        }
      else
        {
          prev->next = next;
        }

      css_Num_active_conn--;

      assert (css_Num_active_conn >= 0);
      assert (css_Num_active_conn < css_Num_max_conn);

      CSS_LOG_STACK ("css_free_conn - removed conn = %d from " CSS_ACTIVE_CONN_MSG, CSS_CONN_IDX (conn),
             CSS_ACTIVE_CONN_ARGS);

      break;
    }

      prev = p;
    }

  if (p == NULL)
    {
      CSS_LOG_STACK ("css_free_conn - not found conn = %p in " CSS_ACTIVE_CONN_MSG, conn, CSS_ACTIVE_CONN_ARGS);
    }

  css_shutdown_conn (conn);
  css_dealloc_conn (conn);
  css_decrement_num_conn (conn->client_type);

  END_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);
}

/*
 * css_print_conn_entry_info() - print connection entry information to stderr
 *   return: void
 *   conn(in): connection entry
 */
void
css_print_conn_entry_info (CSS_CONN_ENTRY * conn)
{
  fprintf (stderr,
       "CONN_ENTRY: %p, next(%p), idx(%d),fd(%lld),request_id(%d),transaction_id(%d),client_id(%d)\n",
       conn, conn->next, conn->idx, (long long) conn->fd, conn->request_id, conn->get_tran_index (),
       conn->client_id);
}

/*
 * css_print_conn_list() - print active connection list to stderr
 *   return: void
 */
void
css_print_conn_list (void)
{
  CSS_CONN_ENTRY *conn, *next;
  int i, r;

  if (css_Active_conn_anchor != NULL)
    {
      START_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);

      fprintf (stderr, "active conn list (%d)\n", css_Num_active_conn);

      for (conn = css_Active_conn_anchor, i = 0; conn != NULL; conn = next, i++)
    {
      next = conn->next;
      css_print_conn_entry_info (conn);
    }

      assert (i == css_Num_active_conn);

      END_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
    }
}

/*
 * css_print_free_conn_list() - print free connection list to stderr
 *   return: void
 */
void
css_print_free_conn_list (void)
{
  CSS_CONN_ENTRY *conn, *next;
  int i, r;

  if (css_Free_conn_anchor != NULL)
    {
      START_SHARED_ACCESS_FREE_CONN_ANCHOR (r);

      fprintf (stderr, "free conn list (%d)\n", css_Num_free_conn);

      for (conn = css_Free_conn_anchor, i = 0; conn != NULL; conn = next, i++)
    {
      next = conn->next;
      css_print_conn_entry_info (conn);
    }

      assert (i == css_Num_free_conn);

      END_SHARED_ACCESS_FREE_CONN_ANCHOR (r);
    }
}

/*
 * css_common_connect() - actually try to make a connection to a server.
 *   return: connection entry if success, or NULL
 *   conn(in): connection entry will be connected
 *   rid(out): request id
 *   host_name(in): host name of server
 *   connect_type(in):
 *   server_name(in):
 *   server_name_length(in):
 *   port(in):
 */
static CSS_CONN_ENTRY *
css_common_connect (CSS_CONN_ENTRY * conn, unsigned short *rid,
            const char *host_name, int connect_type, const char *server_name, int server_name_length, int port)
{
  SOCKET fd;

  fd = css_tcp_client_open ((char *) host_name, port);

  if (!IS_INVALID_SOCKET (fd))
    {
      conn->fd = fd;

      if (css_send_magic (conn) != NO_ERRORS)
    {
      return NULL;
    }

      if (css_send_request (conn, connect_type, rid, server_name, server_name_length) == NO_ERRORS)
    {
      return conn;
    }
    }

  return NULL;
}

/*
 * css_set_proc_register() - make a server proc register.
 *   return:
 *   server_name(in):
 *   server_name_lenth(in):
 *   proc_register(out):
 */
static void
css_set_proc_register (const char *server_name, int server_name_length, CSS_SERVER_PROC_REGISTER * proc_register)
{
  char *p, *last;
  char **argv;

  memcpy (proc_register->server_name, server_name, server_name_length);
  proc_register->server_name_length = server_name_length;
  proc_register->pid = getpid ();
  strncpy_bufsize (proc_register->exec_path, css_Server_exec_path);

  p = (char *) proc_register->args;
  last = p + proc_register->CSS_SERVER_MAX_SZ_PROC_ARGS;
  for (argv = css_Server_argv; *argv; argv++)
    {
      p += snprintf (p, MAX ((last - p), 0), "%s ", *argv);
    }
}

/*
 * css_connect_to_master_server() - Connect to the master from the server.
 *   return: connection entry if success, or NULL
 *   master_port_id(in):
 *   server_name(in): name + version
 *   name_length(in):
 */
CSS_CONN_ENTRY *
css_connect_to_master_server (int master_port_id, const char *server_name, int name_length)
{
  char hname[CUB_MAXHOSTNAMELEN];
  CSS_CONN_ENTRY *conn;
  unsigned short rid;
  int response, response_buff;
  int server_port_id;
  int connection_protocol;
#if !defined(WINDOWS)
  std::string pname;
  int datagram_fd, socket_fd;
#endif
  const char *data;
  int data_length;
  CSS_SERVER_PROC_REGISTER proc_register = CSS_SERVER_PROC_REGISTER_INITIALIZER;

  if (GETHOSTNAME (hname, CUB_MAXHOSTNAMELEN) != 0)
    {
      return NULL;
    }

  conn = css_make_conn (0);
  if (conn == NULL)
    {
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1, server_name);
      return NULL;
    }

  /* select the connection protocol */

  //  TODO : When supporting the Windows environment, It will be modified to send the same data
  //  (proc_register) for the Windows protocol (SERVER_REQUEST_NEW) as well.

  if (css_Server_use_new_connection_protocol)
    {
      // Windows
      connection_protocol = SERVER_REQUEST_NEW;
      data = server_name;
      data_length = name_length;
    }
  else
    {
      // Linux and Unix
      connection_protocol = SERVER_REQUEST_FROM_SERVER;
      css_set_proc_register (server_name, name_length, &proc_register);
      data = (const char *) &proc_register;
      data_length = sizeof (proc_register);
    }

  if (css_common_connect (conn, &rid, hname, connection_protocol, data, data_length, master_port_id) == NULL)
    {
      goto fail_end;
    }

  if (css_readn (conn->fd, (char *) &response_buff, sizeof (int), -1) != sizeof (int))
    {
      goto fail_end;
    }

  response = ntohl (response_buff);
  TRACE ("css_connect_to_master_server received %d as response from master\n", response);

  switch (response)
    {
    case SERVER_ALREADY_EXISTS:
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_SERVER_ALREADY_EXISTS, 1, server_name);
      goto fail_end;

    case SERVER_REQUEST_ACCEPTED_NEW:
      /*
       * Master requests a new-style connect, must go get
       * our port id and set up our connection socket.
       * For drivers, we don't need a connection socket and we
       * don't want to allocate a bunch of them.  Let a flag variable
       * control whether or not we actually create one of these.
       */
      if (css_Server_inhibit_connection_socket)
    {
      server_port_id = -1;
    }
      else
    {
      server_port_id = css_open_server_connection_socket ();
    }

      response = htonl (server_port_id);
      css_net_send (conn, (char *) &response, sizeof (int), -1);
      /* this connection remains our only contact with the master */
      return conn;

    case SERVER_REQUEST_ACCEPTED:
#if defined(WINDOWS)
      /* PC's can't handle this style of connection at all */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1, server_name);
      goto fail_end;
#else /* WINDOWS */
      /* send the "pathname" for the datagram */
      /* be sure to open the datagram first.  */
      pname = filesys::temp_directory_path ();
      pname += "/cubrid_tcp_setup_server" + std::to_string (getpid ());
      (void) unlink (pname.c_str ());   // make sure file is deleted

      if (!css_tcp_setup_server_datagram (pname.c_str (), &socket_fd))
    {
      (void) unlink (pname.c_str ());
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1, server_name);
      goto fail_end;
    }
      if (css_send_data (conn, rid, pname.c_str (), pname.length () + 1) != NO_ERRORS)
    {
      (void) unlink (pname.c_str ());
      close (socket_fd);
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1, server_name);
      goto fail_end;
    }
      if (!css_tcp_listen_server_datagram (socket_fd, &datagram_fd))
    {
      (void) unlink (pname.c_str ());
      close (socket_fd);
      er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1, server_name);
      goto fail_end;
    }
      // success
      (void) unlink (pname.c_str ());
      css_free_conn (conn);
      close (socket_fd);
      return (css_make_conn (datagram_fd));
#endif /* WINDOWS */
    }

fail_end:
  css_free_conn (conn);
  return NULL;
}

/*
 * css_find_conn_by_tran_index() - find connection entry having given
 *                                 transaction id
 *   return: connection entry if find, or NULL
 *   tran_index(in): transaction id
 */
CSS_CONN_ENTRY *
css_find_conn_by_tran_index (int tran_index)
{
  CSS_CONN_ENTRY *conn = NULL, *next;
  int r;

  if (css_Active_conn_anchor != NULL)
    {
      START_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);

      for (conn = css_Active_conn_anchor; conn != NULL; conn = next)
    {
      next = conn->next;
      if (conn->get_tran_index () == tran_index)
        {
          break;
        }
    }

      END_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
    }

  return conn;
}

/*
 * css_find_conn_from_fd() - find a connection having given socket fd.
 *   return: connection entry if find, or NULL
 *   fd(in): socket fd
 */
CSS_CONN_ENTRY *
css_find_conn_from_fd (SOCKET fd)
{
  CSS_CONN_ENTRY *conn = NULL, *next;
  int r;

  if (css_Active_conn_anchor != NULL)
    {
      START_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);

      for (conn = css_Active_conn_anchor; conn != NULL; conn = next)
    {
      next = conn->next;
      if (conn->fd == fd)
        {
          break;
        }
    }

      END_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
    }
  return conn;
}

/*
 * css_get_session_ids_for_active_connections () - get active session ids
 * return : error code or NO_ERROR
 * session_ids (out)  : holder for session ids
 * count (out)        : number of session ids
 */
int
css_get_session_ids_for_active_connections (SESSION_ID ** session_ids, int *count)
{
  CSS_CONN_ENTRY *conn = NULL, *next = NULL;
  SESSION_ID *sessions_p = NULL;
  int error = NO_ERROR, i = 0, r;

  assert (count != NULL);
  if (count == NULL)
    {
      error = ER_FAILED;
      goto error_return;
    }

  if (css_Active_conn_anchor == NULL)
    {
      *session_ids = NULL;
      *count = 0;
      return NO_ERROR;
    }

  START_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
  *count = css_Num_active_conn;
  sessions_p = (SESSION_ID *) malloc (css_Num_active_conn * sizeof (SESSION_ID));

  if (sessions_p == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, css_Num_active_conn * sizeof (SESSION_ID));
      error = ER_FAILED;
      END_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
      goto error_return;
    }

  for (conn = css_Active_conn_anchor; conn != NULL; conn = next)
    {
      next = conn->next;
      sessions_p[i] = conn->session_id;
      i++;
    }

  END_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
  *session_ids = sessions_p;
  return error;

error_return:
  if (sessions_p != NULL)
    {
      free_and_init (sessions_p);
    }

  *session_ids = NULL;

  if (count != NULL)
    {
      *count = 0;
    }

  return error;
}

/*
 * css_shutdown_conn_by_tran_index() - shutdown connection having given
 *                                     transaction id
 *   return: error code
 *   tran_index(in): transaction id
 *   wait_time(in): wait time (-1: infinite, 0: no wait, : wait time)
 */
int
css_shutdown_conn_by_tran_index (int tran_index, int wait_time)
{
  CSS_CONN_ENTRY *conn = NULL;
  int error = ER_FAILED;
  int r;

  if (css_Active_conn_anchor != NULL)
    {
      START_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);

      for (conn = css_Active_conn_anchor; conn != NULL; conn = conn->next)
    {
      if (conn->get_tran_index () == tran_index)
        {
          if (conn->status == CONN_OPEN)
        {
          conn->status = CONN_CLOSING;

          css_request_shutdown_conn (conn,
                         static_cast < uint8_t >
                         (cubconn::connection::ignore_level::DONT_IGNORE), false, wait_time);

          error = NO_ERROR;
        }
          break;
        }
    }

      END_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);
    }

  return error;
}

/*
 * css_get_request_id() - return the next valid request id
 *   return: request id
 *   conn(in): connection entry
 */
unsigned short
css_get_request_id (CSS_CONN_ENTRY * conn)
{
  unsigned short old_rid;
  unsigned short request_id;
  int r;

  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  old_rid = conn->request_id++;
  if (conn->request_id == 0)
    {
      conn->request_id++;
    }

  while (conn->request_id != old_rid)
    {
      if (css_is_valid_request_id (conn, conn->request_id))
    {
      request_id = conn->request_id;

      r = rmutex_unlock (NULL, &conn->rmutex);
      assert (r == NO_ERROR);

      return (request_id);
    }
      else
    {
      conn->request_id++;
      if (conn->request_id == 0)
        {
          conn->request_id++;
        }
    }
    }

  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  /* Should never reach this point */
  er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ERR_CSS_REQUEST_ID_FAILURE, 0);
  return (0);
}

/*
 * css_read_header() - helper routine that will read a header from the socket.
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   local_header(in):
 *
 * Note: It is a blocking read.
 */
int
css_read_header (CSS_CONN_ENTRY * conn, const NET_HEADER * local_header)
{
  int buffer_size;
  int rc = 0;
  unsigned short flags = 0;

  buffer_size = sizeof (NET_HEADER);

  if (conn->stop_talk == true)
    {
      return CONNECTION_CLOSED;
    }

  rc = css_net_read_header (conn, (char *) local_header, &buffer_size, -1);
  if (rc == NO_ERRORS && ntohl (local_header->type) == CLOSE_TYPE)
    {
      return CONNECTION_CLOSED;
    }
  if (rc != NO_ERRORS && rc != RECORD_TRUNCATED)
    {
      return CONNECTION_CLOSED;
    }

  conn->set_tran_index (ntohl (local_header->transaction_id));
  conn->db_error = (int) ntohl (local_header->db_error);

  flags = ntohs (local_header->flags);
  conn->invalidate_snapshot = flags & NET_HEADER_FLAG_INVALIDATE_SNAPSHOT ? 1 : 0;
  conn->in_method = flags & NET_HEADER_FLAG_METHOD_MODE ? true : false;

  return rc;
}

/*
 * css_receive_request() - receive request from client
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   rid(out): request id
 *   request(out): request
 *   buffer_size(out): request data size
 */
int
css_receive_request (CSS_CONN_ENTRY * conn, unsigned short *rid, int *request, int *buffer_size)
{
  return css_return_queued_request (conn, rid, request, buffer_size);
}

/*
 * css_receive_data() - receive a data for an associated request.
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   req_id(in): request id
 *   buffer(out): buffer for data
 *   buffer_size(out): buffer size
 *   timeout(in):
 *
 * Note: this is a blocking read.
 */
int
css_receive_data (CSS_CONN_ENTRY * conn, unsigned short req_id, char **buffer, int *buffer_size, int timeout)
{
  int *r, rc;

  /* at here, do not use stack variable; must alloc it */
  r = (int *) malloc (sizeof (int));
  if (r == NULL)
    {
      return NO_DATA_AVAILABLE;
    }

  css_return_queued_data_timeout (conn, req_id, buffer, buffer_size, r, timeout);
  rc = *r;

  free_and_init (r);
  return rc;
}

/*
 * css_return_eid_from_conn() - get enquiry id from connection entry
 *   return: enquiry id
 *   conn(in): connection entry
 *   rid(in): request id
 */
unsigned int
css_return_eid_from_conn (CSS_CONN_ENTRY * conn, unsigned short rid)
{
  return css_make_eid ((unsigned short) conn->idx, rid);
}

/*
 * css_make_eid() - make enquiry id
 *   return: enquiry id
 *   entry_id(in): connection entry id
 *   rid(in): request id
 */
static unsigned int
css_make_eid (unsigned short entry_id, unsigned short rid)
{
  int top;

  top = entry_id;
  return ((top << 16) | rid);
}

/* CSS_CONN_ENTRY's queues related functions */

/*
 * css_claim_queue_entry() - claim a queue entry from free list.
 *   return: CSS_QUEUE_ENTRY *
 *   conn(in): connection entry
 */
static CSS_QUEUE_ENTRY *
css_claim_queue_entry (CSS_CONN_ENTRY * conn)
{
  CSS_QUEUE_ENTRY *p;

  assert (conn != NULL);

  p = conn->free_queue_list;
  if (p == NULL)
    {
      return NULL;
    }

  conn->free_queue_list = p->next;

  conn->free_queue_count--;
  assert (0 <= conn->free_queue_count);

  p->next = NULL;

  return p;
}

/*
 * css_retire_queue_entry() - retire a queue entry to free list.
 *   return: void
 *   conn(in): connection entry
 *   entry(in): CSS_QUEUE_ENTRY * to be retired
 */
static void
css_retire_queue_entry (CSS_CONN_ENTRY * conn, CSS_QUEUE_ENTRY * entry)
{
  assert (conn != NULL && entry != NULL);

  entry->next = conn->free_queue_list;
  conn->free_queue_list = entry;

  conn->free_queue_count++;
  assert (0 < conn->free_queue_count);
}

/*
 * css_free_queue_entry_list() - free all entries of free queue list
 *   return: void
 *   conn(in): connection entry
 */
static void
css_free_queue_entry_list (CSS_CONN_ENTRY * conn)
{
  CSS_QUEUE_ENTRY *p;

  assert (conn != NULL);

  while (conn->free_queue_list != NULL)
    {
      p = conn->free_queue_list;
      conn->free_queue_list = p->next;

      free (p);
      conn->free_queue_count--;
    }

  conn->free_queue_list = NULL;
  assert (conn->free_queue_count == 0);
}

/*
 * css_claim_wait_queue_entry() - claim a wait queue entry from free list.
 *   return: CSS_WAIT_QUEUE_ENTRY *
 *   conn(in): connection entry
 */
static CSS_WAIT_QUEUE_ENTRY *
css_claim_wait_queue_entry (CSS_CONN_ENTRY * conn)
{
  CSS_WAIT_QUEUE_ENTRY *p;

  assert (conn != NULL);

  p = conn->free_wait_queue_list;
  if (p == NULL)
    {
      return NULL;
    }

  conn->free_wait_queue_list = p->next;

  conn->free_wait_queue_count--;
  assert (0 <= conn->free_wait_queue_count);

  p->next = NULL;

  return p;
}

/*
 * css_retire_wait_queue_entry() - retire a wait_queue entry to free list.
 *   return: void
 *   conn(in): connection entry
 *   entry(in): CSS_WAIT_QUEUE_ENTRY * to be retired
 */
static void
css_retire_wait_queue_entry (CSS_CONN_ENTRY * conn, CSS_WAIT_QUEUE_ENTRY * entry)
{
  assert (conn != NULL && entry != NULL);

  entry->next = conn->free_wait_queue_list;
  conn->free_wait_queue_list = entry;

  conn->free_wait_queue_count++;
  assert (0 < conn->free_wait_queue_count);
}

/*
 * css_free_wait_queue_list() - free all entries of free wait queue list
 *   return: void
 *   conn(in): connection entry
 */
static void
css_free_wait_queue_list (CSS_CONN_ENTRY * conn)
{
  CSS_WAIT_QUEUE_ENTRY *p;

  assert (conn != NULL);

  while (conn->free_wait_queue_list != NULL)
    {
      p = conn->free_wait_queue_list;
      conn->free_wait_queue_list = p->next;

      free (p);
      conn->free_wait_queue_count--;
    }

  conn->free_wait_queue_list = NULL;
  assert (conn->free_wait_queue_count == 0);
}

/*
 * css_make_queue_entry() - make queue entey
 *   return: queue entry
 *   conn(in): connection entry
 *   key(in):
 *   buffer(in):
 *   size(in):
 *   rc(in):
 *   transid(in):
 *   db_error(in):
 */
static CSS_QUEUE_ENTRY *
css_make_queue_entry (CSS_CONN_ENTRY * conn, unsigned int key, char *buffer,
              int size, int rc, int transid, int invalidate_snapshot, int db_error)
{
  CSS_QUEUE_ENTRY *p;

  if (conn->free_queue_list != NULL)
    {
      p = css_claim_queue_entry (conn);
    }
  else
    {
      p = (CSS_QUEUE_ENTRY *) malloc (sizeof (CSS_QUEUE_ENTRY));
    }

  if (p == NULL)
    {
      return NULL;
    }

  p->key = key;
  p->buffer = buffer;
  p->size = size;
  p->rc = rc;
  p->transaction_id = transid;
  p->invalidate_snapshot = invalidate_snapshot;
  p->db_error = db_error;

  return p;
}

/*
 * css_free_queue_entry() - free queue entry
 *   return: void
 *   conn(in): connection entry
 *   entry(in): queue entry
 */
static void
css_free_queue_entry (CSS_CONN_ENTRY * conn, CSS_QUEUE_ENTRY * entry)
{
  if (entry == NULL)
    {
      return;
    }

  if (entry->buffer != NULL)
    {
      conn->release_packet (entry->buffer);
    }

  css_retire_queue_entry (conn, entry);
}

/*
 * css_add_queue_entry() - add queue entry
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   list(in): queue list
 *   request_id(in): request id
 *   buffer(in):
 *   buffer_size(in):
 *   rc(in):
 *   transid(in):
 *   db_error(in):
 */
css_error_code
css_add_queue_entry (CSS_CONN_ENTRY * conn, CSS_LIST * list, unsigned short request_id, char *buffer, int buffer_size,
             int rc, int transid, int invalidate_snapshot, int db_error)
{
  CSS_QUEUE_ENTRY *p;
  int r;

  p = css_make_queue_entry (conn, request_id, buffer, buffer_size, rc, transid, invalidate_snapshot, db_error);
  if (p == NULL)
    {
      return CANT_ALLOC_BUFFER;
    }

  r = css_add_list (list, p);
  if (r != NO_ERROR)
    {
      css_retire_queue_entry (conn, p);
      return CANT_ALLOC_BUFFER;
    }

  return NO_ERRORS;
}

/*
 * css_find_queue_entry_by_key() - find queue entry using key
 *   return: status of traverse
 *   data(in): queue entry
 *   user(in): search argument
 */
static int
css_find_queue_entry_by_key (void *data, void *user)
{
  CSS_QUEUE_SEARCH_ARG *arg = (CSS_QUEUE_SEARCH_ARG *) user;
  CSS_QUEUE_ENTRY *p = (CSS_QUEUE_ENTRY *) data;

  if (p->key == arg->key)
    {
      arg->entry_ptr = p;
      if (arg->remove_entry)
    {
      return TRAV_STOP_DELETE;
    }
      else
    {
      return TRAV_STOP;
    }
    }

  return TRAV_CONT;
}

/*
 * css_find_queue_entry() - find queue entry
 *   return: queue entry
 *   list(in): queue list
 *   key(in): key
 */
static CSS_QUEUE_ENTRY *
css_find_queue_entry (CSS_LIST * list, unsigned int key)
{
  CSS_QUEUE_SEARCH_ARG arg;

  arg.entry_ptr = NULL;
  arg.key = key;
  arg.remove_entry = 0;

  css_traverse_list (list, css_find_queue_entry_by_key, &arg);

  return arg.entry_ptr;
}

/*
 * css_find_and_remove_queue_entry() - find queue entry and remove it
 *   return: queue entry
 *   list(in): queue list
 *   key(in): key
 */
static CSS_QUEUE_ENTRY *
css_find_and_remove_queue_entry (CSS_LIST * list, unsigned int key)
{
  CSS_QUEUE_SEARCH_ARG arg;

  arg.entry_ptr = NULL;
  arg.key = key;
  arg.remove_entry = 1;

  css_traverse_list (list, css_find_queue_entry_by_key, &arg);

  return arg.entry_ptr;
}

/*
 * css_make_wait_queue_entry() - make wait queue entry
 *   return: wait queue entry
 *   conn(in): connection entry
 *   key(in):
 *   buffer(out):
 *   size(out):
 *   rc(out):
 */
static CSS_WAIT_QUEUE_ENTRY *
css_make_wait_queue_entry (CSS_CONN_ENTRY * conn, unsigned int key, char **buffer, int *size, int *rc)
{
  CSS_WAIT_QUEUE_ENTRY *p;

  if (conn->free_wait_queue_list != NULL)
    {
      p = css_claim_wait_queue_entry (conn);
    }
  else
    {
      p = (CSS_WAIT_QUEUE_ENTRY *) malloc (sizeof (CSS_WAIT_QUEUE_ENTRY));
    }

  if (p == NULL)
    {
      return NULL;
    }

  p->key = key;
  p->buffer = buffer;
  p->size = size;
  p->rc = rc;
  p->thrd_entry = thread_get_thread_entry_info ();

  return p;
}

/*
 * css_free_wait_queue_entry() - free wait queue entry
 *   return: void
 *   conn(in): connection entry
 *   entry(in): wait queue entry
 */
void
css_free_wait_queue_entry (CSS_CONN_ENTRY * conn, CSS_WAIT_QUEUE_ENTRY * entry)
{
  if (entry == NULL)
    {
      return;
    }

  if (entry->thrd_entry != NULL)
    {
      thread_lock_entry (entry->thrd_entry);

      assert (entry->thrd_entry->resume_status == THREAD_CSS_QUEUE_SUSPENDED);
      thread_wakeup_already_had_mutex (entry->thrd_entry, THREAD_CSS_QUEUE_RESUMED);

      thread_unlock_entry (entry->thrd_entry);
    }

  css_retire_wait_queue_entry (conn, entry);
}

/*
 * css_add_wait_queue_entry() - add wait queue entry
 *   return: wait queue entry
 *   conn(in): connection entry
 *   list(in): wait queue list
 *   request_id(in): request id
 *   buffer(out):
 *   buffer_size(out):
 *   rc(out):
 */
static CSS_WAIT_QUEUE_ENTRY *
css_add_wait_queue_entry (CSS_CONN_ENTRY * conn, CSS_LIST * list, unsigned short request_id, char **buffer,
              int *buffer_size, int *rc)
{
  CSS_WAIT_QUEUE_ENTRY *p;

  p = css_make_wait_queue_entry (conn, request_id, buffer, buffer_size, rc);
  if (p == NULL)
    {
      return NULL;
    }

  if (css_add_list (list, p) != NO_ERROR)
    {
      css_retire_wait_queue_entry (conn, p);
      return NULL;
    }

  return p;
}

/*
 * find_wait_queue_entry_by_key() - find wait queue entry using key
 *   return: status of traverse
 *   data(in): wait queue entry
 *   user(in): search argument
 */
static int
find_wait_queue_entry_by_key (void *data, void *user)
{
  CSS_WAIT_QUEUE_SEARCH_ARG *arg = (CSS_WAIT_QUEUE_SEARCH_ARG *) user;
  CSS_WAIT_QUEUE_ENTRY *p = (CSS_WAIT_QUEUE_ENTRY *) data;

  if (p->key == arg->key)
    {
      arg->entry_ptr = p;
      if (arg->remove_entry)
    {
      return TRAV_STOP_DELETE;
    }
      else
    {
      return TRAV_STOP;
    }
    }

  return TRAV_CONT;
}

/*
 * css_find_and_remove_wait_queue_entry() - find wait queue entry and remove it
 *   return: wait queue entry
 *   list(in): wait queue list
 *   key(in):
 */
CSS_WAIT_QUEUE_ENTRY *
css_find_and_remove_wait_queue_entry (CSS_LIST * list, unsigned int key)
{
  CSS_WAIT_QUEUE_SEARCH_ARG arg;

  arg.entry_ptr = NULL;
  arg.key = key;
  arg.remove_entry = 1;

  css_traverse_list (list, find_wait_queue_entry_by_key, &arg);

  return arg.entry_ptr;
}


/*
 * css_queue_packet() - queue packet
 *   return: void
 *   conn(in): connection entry
 *   type(in): packet type
 *   request_id(in): request id
 *   header(in): network header
 *   size(in): packet size
 */
static css_error_code
css_queue_packet (CSS_CONN_ENTRY * conn, int type, unsigned short request_id, const NET_HEADER * header, int size)
{
  THREAD_ENTRY *wait_thrd = NULL, *p, *next;
  unsigned short flags = 0;
  int r;
  int transaction_id, db_error, invalidate_snapshot;
  css_error_code rc = NO_ERRORS;

  transaction_id = ntohl (header->transaction_id);
  db_error = (int) ntohl (header->db_error);
  flags = ntohs (header->flags);
  invalidate_snapshot = flags & NET_HEADER_FLAG_INVALIDATE_SNAPSHOT ? 1 : 0;
  bool in_method = flags & NET_HEADER_FLAG_METHOD_MODE ? true : false;

  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  if (conn->stop_talk)
    {
      r = rmutex_unlock (NULL, &conn->rmutex);
      assert (r == NO_ERROR);
      return CONNECTION_CLOSED;
    }

  conn->set_tran_index (transaction_id);
  conn->db_error = db_error;
  conn->invalidate_snapshot = invalidate_snapshot;
  conn->in_method = in_method;

  switch (type)
    {
    case CLOSE_TYPE:
      css_process_close_packet (conn);
      break;

    case ABORT_TYPE:
      css_process_abort_packet (conn, request_id);
      break;

    case DATA_TYPE:
      css_queue_data_packet (conn, request_id, header, &wait_thrd);
      break;

    case ERROR_TYPE:
      css_queue_error_packet (conn, request_id, header);
      break;

    case COMMAND_TYPE:
      rc = css_queue_command_packet (conn, request_id, header, size);
      if (rc != NO_ERRORS)
    {
      r = rmutex_unlock (NULL, &conn->rmutex);
      assert (r == NO_ERROR);
      return rc;
    }
      break;

    default:
      CSS_TRACE2 ("Asked to queue an unknown packet id = %d.\n", type);
      assert (false);
      return WRONG_PACKET_TYPE;
    }

  p = wait_thrd;
  while (p != NULL)
    {
      thread_lock_entry (p);

      assert (p->resume_status == THREAD_CSS_QUEUE_SUSPENDED || p->resume_status == THREAD_CSECT_WRITER_SUSPENDED);
      next = p->next_wait_thrd;
      p->next_wait_thrd = NULL;

      /* When the resume_status is THREAD_CSS_QUEUE_SUSPENDED, it means the data waiting thread is still waiting on the
       * data queue. Otherwise, in case of THREAD_CSECT_WRITER_SUSPENDED, it means that the thread was timed out, is
       * trying to clear its queue buffer (see clear_wait_queue_entry_and_free_buffer function), and waiting for its
       * conn->csect. We don't need to wakeup the thread for this case. We may send useless signal for it, but it may
       * bring other anomalies: the thread may sleep on another resources which we don't know at this moment. */
      if (p->resume_status == THREAD_CSS_QUEUE_SUSPENDED)
    {
      thread_wakeup_already_had_mutex (p, THREAD_CSS_QUEUE_RESUMED);
    }

      thread_unlock_entry (p);
      p = next;
    }

  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  return NO_ERRORS;
}

/*
 * css_process_close_packet() - prccess close packet
 *   return: void
 *   conn(in): conenction entry
 */
static void
css_process_close_packet (CSS_CONN_ENTRY * conn)
{
  if (!IS_INVALID_SOCKET (conn->fd))
    {
      css_shutdown_socket (conn->fd);
      conn->fd = INVALID_SOCKET;
    }

  conn->status = CONN_CLOSED;
}

/*
 * css_process_abort_packet() - process abort packet
 *   return: void
 *   conn(in): connection entry
 *   request_id(in): request id
 */
void
css_process_abort_packet (CSS_CONN_ENTRY * conn, unsigned short request_id)
{
  CSS_QUEUE_ENTRY *request, *data;

  request = css_find_and_remove_queue_entry (&conn->request_queue, request_id);
  if (request)
    {
      css_free_queue_entry (conn, request);
    }

  data = css_find_and_remove_queue_entry (&conn->data_queue, request_id);
  if (data)
    {
      css_free_queue_entry (conn, data);
    }

  if (css_find_queue_entry (&conn->abort_queue, request_id) == NULL)
    {
      css_add_queue_entry (conn, &conn->abort_queue, request_id, NULL, 0,
               NO_ERRORS, conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
    }
}

/*
 * css_queue_data_packet() - queue data packet
 *   return: void
 *   conn(in): connection entry
 *   request_id(in): request id
 *   header(in): network header
 *   wake_thrd(out): thread that wake up
 */
static void
css_queue_data_packet (CSS_CONN_ENTRY * conn, unsigned short request_id,
               const NET_HEADER * header, THREAD_ENTRY ** wake_thrd)
{
  THREAD_ENTRY *thrd = NULL, *last = NULL;
  CSS_QUEUE_ENTRY *buffer_entry;
  CSS_WAIT_QUEUE_ENTRY *data_wait = NULL;
  char *buffer = NULL;
  int rc;
  int size;         /* size to be read */

  /* setup wake_thrd. hmm.. consider recursion */
  if (*wake_thrd != NULL)
    {
      last = *wake_thrd;
      while (last->next_wait_thrd != NULL)
    {
      last = last->next_wait_thrd;
    }
    }

  size = ntohl (header->buffer_size);
  /* check if user have given a buffer */
  buffer_entry = css_find_and_remove_queue_entry (&conn->buffer_queue, request_id);
  if (buffer_entry != NULL)
    {
      /* compare data and buffer size. if different? something wrong!!! */
      if (size > buffer_entry->size)
    {
      size = buffer_entry->size;
    }
      buffer = buffer_entry->buffer;
      buffer_entry->buffer = NULL;

      css_free_queue_entry (conn, buffer_entry);
    }
  else if (size == 0)
    {
      buffer = NULL;
    }
  else
    {
      buffer = (char *) malloc (size);
    }

  /*
   * check if there exists thread waiting for data.
   * Add to wake_thrd list.
   */
  data_wait = css_find_and_remove_wait_queue_entry (&conn->data_wait_queue, request_id);

  if (data_wait != NULL)
    {
      thrd = data_wait->thrd_entry;
      thrd->next_wait_thrd = NULL;
      if (last == NULL)
    {
      *wake_thrd = thrd;
    }
      else
    {
      last->next_wait_thrd = thrd;
    }
      last = thrd;
    }

  /* receive data into buffer and queue data if there's no waiting thread */
  if (buffer != NULL)
    {
      rc = css_net_recv (conn, buffer, &size, -1);
      if (rc == NO_ERRORS || rc == RECORD_TRUNCATED)
    {
      if (!css_is_request_aborted (conn, request_id))
        {
          if (data_wait == NULL)
        {
          /* if waiter not exists, add to data queue */
          css_add_queue_entry (conn, &conn->data_queue, request_id, buffer, size, rc, conn->get_tran_index (),
                       conn->invalidate_snapshot, conn->db_error);
          return;
        }
          else
        {
          *data_wait->buffer = buffer;
          *data_wait->size = size;
          *data_wait->rc = rc;
          data_wait->thrd_entry = NULL;
          css_free_wait_queue_entry (conn, data_wait);
          return;
        }
        }
    }
      /* if error occurred */
      free_and_init (buffer);
    }
  else
    {
      rc = CANT_ALLOC_BUFFER;
      css_read_remaining_bytes (conn, sizeof (int) + size);
      if (!css_is_request_aborted (conn, request_id))
    {
      if (data_wait == NULL)
        {
          css_add_queue_entry (conn, &conn->data_queue, request_id, NULL,
                   0, rc, conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
          return;
        }
    }
    }

  /* if error was occurred, setup error status */
  if (data_wait != NULL)
    {
      *data_wait->buffer = NULL;
      *data_wait->size = 0;
      *data_wait->rc = rc;
    }
}

/*
 * css_queue_error_packet() - queue error packet
 *   return: void
 *   conn(in): connection entry
 *   request_id(in): request id
 *   header(in): network header
 */
static void
css_queue_error_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, const NET_HEADER * header)
{
  char *buffer;
  int rc;
  int size;

  size = ntohl (header->buffer_size);
  buffer = (char *) malloc (size);

  if (buffer != NULL)
    {
      rc = css_net_recv (conn, buffer, &size, -1);
      if (rc == NO_ERRORS || rc == RECORD_TRUNCATED)
    {
      if (!css_is_request_aborted (conn, request_id))
        {
          css_add_queue_entry (conn, &conn->error_queue, request_id,
                   buffer, size, rc, conn->get_tran_index (), conn->invalidate_snapshot,
                   conn->db_error);
          return;
        }
    }
      free_and_init (buffer);
    }
  else
    {
      rc = CANT_ALLOC_BUFFER;
      css_read_remaining_bytes (conn, sizeof (int) + size);
      if (!css_is_request_aborted (conn, request_id))
    {
      css_add_queue_entry (conn, &conn->error_queue, request_id, NULL, 0,
                   rc, conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
    }
    }
}

/*
 * css_queue_command_packet() - queue command packet
 *   return: void
 *   conn(in): connection entry
 *   request_id(in): request id
 *   header(in): network header
 *   size(in): packet size
 */
static css_error_code
css_queue_command_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, const NET_HEADER * header, int size)
{
  NET_HEADER *p;
  NET_HEADER data_header = DEFAULT_HEADER_DATA;
  css_error_code rc = NO_ERRORS;

  assert (!conn->stop_talk);

  if (css_is_request_aborted (conn, request_id))
    {
      // ignore
      return NO_ERRORS;
    }

  p = (NET_HEADER *) malloc (sizeof (NET_HEADER));
  if (p == NULL)
    {
      assert (false);
      return CANT_ALLOC_BUFFER;
    }

  memcpy ((char *) p, (char *) header, sizeof (NET_HEADER));

  rc = css_add_queue_entry (conn, &conn->request_queue, request_id, (char *) p, size, NO_ERRORS,
                conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
  if (rc != NO_ERRORS)
    {
      free (p);
      return rc;
    }

  if (ntohl (header->buffer_size) <= 0)
    {
      // a request without a buffer, e.g, NET_SERVER_LOG_CHECKPOINT, NET_SERVER_TM_SERVER_ABORT.
      return NO_ERRORS;
    }

  rc = (css_error_code) css_read_header (conn, &data_header);
  if (rc != NO_ERRORS)
    {
      // what to do?
      return rc;
    }

  rc = css_queue_packet (conn, (int) ntohl (data_header.type), (unsigned short) ntohl (data_header.request_id),
             &data_header, sizeof (NET_HEADER));
  return rc;
}

/*
 * css_request_aborted() - check request is aborted
 *   return: true if aborted, or false
 *   conn(in): connection entry
 *   request_id(in): request id
 */
bool
css_is_request_aborted (CSS_CONN_ENTRY * conn, unsigned short request_id)
{
  CSS_QUEUE_ENTRY *p;

  p = css_find_queue_entry (&conn->abort_queue, request_id);
  if (p != NULL)
    {
      return true;
    }
  else
    {
      return false;
    }
}

/*
 * css_return_queued_request() - get request from queue
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   rid(out): request id
 *   request(out): request
 *   buffer_size(out): request buffer size
 */
int
css_return_queued_request (CSS_CONN_ENTRY * conn, unsigned short *rid, int *request, int *buffer_size)
{
  CSS_QUEUE_ENTRY *p;
  NET_HEADER *buffer;
  int rc, r;

  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  if (conn->status == CONN_OPEN)
    {
      p = (CSS_QUEUE_ENTRY *) css_remove_list_from_head (&conn->request_queue);
      if (p != NULL)
    {
      *rid = p->key;

      buffer = (NET_HEADER *) p->buffer;

      *request = ntohs (buffer->function_code);
      *buffer_size = ntohl (buffer->buffer_size);

      conn->set_tran_index (p->transaction_id);
      conn->invalidate_snapshot = p->invalidate_snapshot;
      conn->in_method = p->in_method;
      conn->db_error = p->db_error;

      css_free_queue_entry (conn, p);
      rc = NO_ERRORS;
    }
      else
    {
      rc = NO_DATA_AVAILABLE;
    }
    }
  else
    {
      rc = CONN_CLOSED;
    }

  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);
  return rc;
}

/*
 * clear_wait_queue_entry_and_free_buffer () - remove data_wait_queue entry when completing or aborting
 *                                             to receive buffer from data_wait_queue.
 *   return: void
 *   conn(in): connection entry
 *   rid(in): request id
 *   bufferp(in): data buffer
 *   sizep(in): data buffer size
 */
static void
clear_wait_queue_entry_and_free_buffer (THREAD_ENTRY * thrdp, CSS_CONN_ENTRY * conn, unsigned short rid, char **bufferp,
                    int *sizep)
{
  CSS_WAIT_QUEUE_ENTRY *data_wait;
  int r;

  r = rmutex_lock (thrdp, &conn->rmutex);
  assert (r == NO_ERROR);

  /* check the deadlock related problem */
  data_wait = css_find_and_remove_wait_queue_entry (&conn->data_wait_queue, rid);

  /* data_wait might be always not NULL except the actual connection close */
  if (data_wait)
    {
      assert (data_wait->thrd_entry == thrdp);  /* it must be me */
      data_wait->thrd_entry = NULL;
      css_free_wait_queue_entry (conn, data_wait);
    }
  else
    {
      /* connection_handler_thread may proceed ahead of me right after timeout has happened. If the case, we must free
       * the buffer. */
      if (*bufferp != NULL)
    {
      thrdp->conn_entry->release_packet (*bufferp);
    }
    }

  r = rmutex_unlock (thrdp, &conn->rmutex);
  assert (r == NO_ERROR);
}

/*
 * css_return_queued_data_timeout() - get request data from queue until timeout
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   rid(out): request id
 *   buffer(out): data buffer
 *   bufsize(out): buffer size
 *   rc(out):
 *   waitsec: timeout second
 */
static int
css_return_queued_data_timeout (CSS_CONN_ENTRY * conn, unsigned short rid,
                char **buffer, int *bufsize, int *rc, int waitsec)
{
  CSS_QUEUE_ENTRY *data_entry, *buffer_entry;
  CSS_WAIT_QUEUE_ENTRY *data_wait;

  int r;

  /* enter the critical section of this connection */
  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  *buffer = NULL;
  *bufsize = -1;

  /* if conn is closed or to be closed, return CONECTION_CLOSED */
  if (conn->status == CONN_OPEN)
    {
      /* look up the data queue first to see if the required data is arrived and queued already */
      data_entry = css_find_and_remove_queue_entry (&conn->data_queue, rid);

      if (data_entry)
    {
      /* look up the buffer queue to see if the user provided the receive data buffer */
      buffer_entry = css_find_and_remove_queue_entry (&conn->buffer_queue, rid);

      if (buffer_entry)
        {
          /* copy the received data to the user provided buffer area */
          *buffer = buffer_entry->buffer;
          *bufsize = MIN (data_entry->size, buffer_entry->size);
          if (*buffer != data_entry->buffer || *bufsize != data_entry->size)
        {
          memcpy (*buffer, data_entry->buffer, *bufsize);
        }

          /* destroy the buffer queue entry */
          buffer_entry->buffer = NULL;
          css_free_queue_entry (conn, buffer_entry);
        }
      else
        {
          /* set the buffer to point to the data queue entry */
          *buffer = data_entry->buffer;
          *bufsize = data_entry->size;
          data_entry->buffer = NULL;
        }

      /* set return code, transaction id, and error code */
      *rc = data_entry->rc;
      conn->set_tran_index (data_entry->transaction_id);
      conn->invalidate_snapshot = data_entry->invalidate_snapshot;
      conn->in_method = data_entry->in_method;
      conn->db_error = data_entry->db_error;

      css_free_queue_entry (conn, data_entry);

      r = rmutex_unlock (NULL, &conn->rmutex);
      assert (r == NO_ERROR);

      return NO_ERRORS;
    }
      else
    {
      THREAD_ENTRY *thrd;

      /* no data queue entry means that the data is not arrived yet; wait until the data arrives */
      *rc = NO_DATA_AVAILABLE;

      /* lock thread entry before enqueue an entry to data wait queue in order to prevent being woken up by
       * 'css_queue_packet()' before this thread suspends */
      thrd = thread_get_thread_entry_info ();
      thread_lock_entry (thrd);
      /* make a data wait queue entry */
      data_wait = css_add_wait_queue_entry (conn, &conn->data_wait_queue, rid, buffer, bufsize, rc);
      if (data_wait)
        {
          /* exit the critical section before to be suspended */
          r = rmutex_unlock (NULL, &conn->rmutex);
          assert (r == NO_ERROR);

          /* fall to the thread sleep until the socket listener 'css_server_thread()' receives and enqueues the
           * data */
          if (waitsec < 0)
        {
          thread_suspend_wakeup_and_unlock_entry (thrd, THREAD_CSS_QUEUE_SUSPENDED);

          if (thrd->resume_status != THREAD_CSS_QUEUE_RESUMED)
            {
              assert (thrd->resume_status == THREAD_RESUME_DUE_TO_INTERRUPT);

              clear_wait_queue_entry_and_free_buffer (thrd, conn, rid, buffer, bufsize);
              *buffer = NULL;
              *bufsize = -1;
              return NO_DATA_AVAILABLE;
            }
          else
            {
              assert (thrd->resume_status == THREAD_CSS_QUEUE_RESUMED);
            }
        }
          else
        {
          int r;
          struct timespec abstime;

          abstime.tv_sec = (int) time (NULL) + waitsec;
          abstime.tv_nsec = 0;

          r = thread_suspend_timeout_wakeup_and_unlock_entry (thrd, &abstime, THREAD_CSS_QUEUE_SUSPENDED);

          if (r == ER_CSS_PTHREAD_COND_TIMEDOUT)
            {
              clear_wait_queue_entry_and_free_buffer (thrd, conn, rid, buffer, bufsize);
              *rc = TIMEDOUT_ON_QUEUE;
              *buffer = NULL;
              *bufsize = -1;
              return TIMEDOUT_ON_QUEUE;
            }
          else if (thrd->resume_status != THREAD_CSS_QUEUE_RESUMED)
            {
              assert (thrd->resume_status == THREAD_RESUME_DUE_TO_INTERRUPT);

              clear_wait_queue_entry_and_free_buffer (thrd, conn, rid, buffer, bufsize);
              *buffer = NULL;
              *bufsize = -1;
              return NO_DATA_AVAILABLE;
            }
          else
            {
              assert (thrd->resume_status == THREAD_CSS_QUEUE_RESUMED);
            }
        }

          if (*buffer == NULL || *bufsize < 0)
        {
          return CONNECTION_CLOSED;
        }

          if (*rc == CONNECTION_CLOSED)
        {
          clear_wait_queue_entry_and_free_buffer (thrd, conn, rid, buffer, bufsize);
        }

          return NO_ERRORS;
        }
      else
        {
          /* oops! error! unlock thread entry */
          thread_unlock_entry (thrd);
          /* allocation error */
          *rc = CANT_ALLOC_BUFFER;
        }
    }
    }
  else
    {
      /* conn->status == CONN_CLOSED || CONN_CLOSING; the connection was closed */
      *rc = CONNECTION_CLOSED;
    }

  /* exit the critical section */
  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);
  return *rc;
}

/*
 * css_return_queued_data() - get data from queue
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   rid(out): request id
 *   buffer(out): data buffer
 *   bufsize(out): buffer size
 *   rc(out):
 */
int
css_return_queued_data (CSS_CONN_ENTRY * conn, unsigned short rid, char **buffer, int *bufsize, int *rc)
{
  return css_return_queued_data_timeout (conn, rid, buffer, bufsize, rc, -1);
}

/*
 * css_return_queued_error() - get error from queue
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   request_id(out): request id
 *   buffer(out): data buffer
 *   buffer_size(out): buffer size
 *   rc(out):
 */
int
css_return_queued_error (CSS_CONN_ENTRY * conn, unsigned short request_id, char **buffer, int *buffer_size, int *rc)
{
  CSS_QUEUE_ENTRY *p;
  int ret = 0, r;

  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  p = css_find_and_remove_queue_entry (&conn->error_queue, request_id);
  if (p != NULL)
    {
      *buffer = p->buffer;
      *buffer_size = p->size;
      *rc = p->db_error;
      p->buffer = NULL;
      css_free_queue_entry (conn, p);
      ret = 1;
    }

  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  return ret;
}

/*
 * css_is_valid_request_id() - check request id id valid
 *   return: true if valid, or false
 *   conn(in): connection entry
 *   request_id(in): request id
 */
static bool
css_is_valid_request_id (CSS_CONN_ENTRY * conn, unsigned short request_id)
{
  if (css_find_queue_entry (&conn->data_queue, request_id) != NULL)
    {
      return false;
    }

  if (css_find_queue_entry (&conn->request_queue, request_id) != NULL)
    {
      return false;
    }

  if (css_find_queue_entry (&conn->abort_queue, request_id) != NULL)
    {
      return false;
    }

  if (css_find_queue_entry (&conn->error_queue, request_id) != NULL)
    {
      return false;
    }

  return true;
}

/*
 * css_remove_unexpected_packets() - remove unexpected packet
 *   return: void
 *   conn(in): connection entry
 *   request_id(in): request id
 */
void
css_remove_unexpected_packets (CSS_CONN_ENTRY * conn, unsigned short request_id)
{
  css_free_queue_entry (conn, css_find_and_remove_queue_entry (&conn->request_queue, request_id));
  css_free_queue_entry (conn, css_find_and_remove_queue_entry (&conn->data_queue, request_id));
  css_free_queue_entry (conn, css_find_and_remove_queue_entry (&conn->error_queue, request_id));
}

/*
 * css_queue_user_data_buffer() - queue user data
 *   return: 0 if success, or error code
 *   conn(in): connection entry
 *   request_id(in): request id
 *   size(in): buffer size
 *   buffer(in): buffer
 */
int
css_queue_user_data_buffer (CSS_CONN_ENTRY * conn, unsigned short request_id, int size, char *buffer)
{
  int rc = NO_ERRORS, r;

  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  if (buffer && (!css_is_request_aborted (conn, request_id)))
    {
      rc = css_add_queue_entry (conn, &conn->buffer_queue, request_id, buffer,
                size, NO_ERRORS, conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
    }

  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  return rc;
}

/*
 * css_remove_and_free_queue_entry() - free queue entry
 *   return: status if traverse
 *   data(in): connection entry
 *   arg(in): queue entry
 */
static int
css_remove_and_free_queue_entry (void *data, void *arg)
{
  css_free_queue_entry ((CSS_CONN_ENTRY *) arg, (CSS_QUEUE_ENTRY *) data);
  return TRAV_CONT_DELETE;
}

/*
 * css_remove_and_free_wait_queue_entry() - free wait queue entry
 *   return: status if traverse
 *   data(in): connection entry
 *   arg(in): wait queue entry
 */
static int
css_remove_and_free_wait_queue_entry (void *data, void *arg)
{
  css_free_wait_queue_entry ((CSS_CONN_ENTRY *) arg, (CSS_WAIT_QUEUE_ENTRY *) data);
  return TRAV_CONT_DELETE;
}

/*
 * css_remove_all_unexpected_packets() - remove all unexpected packets
 *   return: void
 *   conn(in): connection entry
 */
void
css_remove_all_unexpected_packets (CSS_CONN_ENTRY * conn)
{
  int r;

  r = rmutex_lock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);

  css_traverse_list (&conn->request_queue, css_remove_and_free_queue_entry, conn);

  css_traverse_list (&conn->data_queue, css_remove_and_free_queue_entry, conn);

  css_traverse_list (&conn->data_wait_queue, css_remove_and_free_wait_queue_entry, conn);

  css_traverse_list (&conn->abort_queue, css_remove_and_free_queue_entry, conn);

  css_traverse_list (&conn->error_queue, css_remove_and_free_queue_entry, conn);

  r = rmutex_unlock (NULL, &conn->rmutex);
  assert (r == NO_ERROR);
}

/*
 * css_set_user_access_status() - set user access status information
 *   return: void
 *   db_user(in):
 *   host(in):
 *   program_name(in):
 */
void
css_set_user_access_status (const char *db_user, const char *host, const char *program_name)
{
  LAST_ACCESS_STATUS *access = NULL;

  assert (db_user != NULL);
  assert (host != NULL);
  assert (program_name != NULL);

  csect_enter (NULL, CSECT_ACCESS_STATUS, INF_WAIT);

  for (access = css_Access_status_anchor; access != NULL; access = access->next)
    {
      if (strcmp (access->db_user, db_user) == 0)
    {
      break;
    }
    }

  if (access == NULL)
    {
      access = (LAST_ACCESS_STATUS *) malloc (sizeof (LAST_ACCESS_STATUS));
      if (access == NULL)
    {
      /* if memory allocation fail, just ignore and return */
      csect_exit (NULL, CSECT_ACCESS_STATUS);
      return;
    }
      css_Num_access_user++;

      memset (access, 0, sizeof (LAST_ACCESS_STATUS));

      access->next = css_Access_status_anchor;
      css_Access_status_anchor = access;

      strncpy (access->db_user, db_user, sizeof (access->db_user) - 1);
    }

  access->time = time (NULL);
  strncpy (access->host, host, sizeof (access->host) - 1);
  strncpy (access->program_name, program_name, sizeof (access->program_name) - 1);

  csect_exit (NULL, CSECT_ACCESS_STATUS);

  return;
}

/*
 * css_get_user_access_status() - get user access status informations
 *   return: void
 *   num_user(in):
 *   access_status_array(out):
 */
void
css_get_user_access_status (int num_user, LAST_ACCESS_STATUS ** access_status_array)
{
  int i = 0;
  LAST_ACCESS_STATUS *access = NULL;

  csect_enter_as_reader (NULL, CSECT_ACCESS_STATUS, INF_WAIT);

  for (access = css_Access_status_anchor; (access != NULL && i < num_user); access = access->next, i++)
    {
      access_status_array[i] = access;
    }

  csect_exit (NULL, CSECT_ACCESS_STATUS);

  return;
}

/*
 * css_free_user_access_status() - free all user access status information
 *   return: void
 */
void
css_free_user_access_status (void)
{
  LAST_ACCESS_STATUS *access = NULL;

  csect_enter (NULL, CSECT_ACCESS_STATUS, INF_WAIT);

  while (css_Access_status_anchor != NULL)
    {
      access = css_Access_status_anchor;
      css_Access_status_anchor = access->next;

      free_and_init (access);
    }

  css_Num_access_user = 0;

  csect_exit (NULL, CSECT_ACCESS_STATUS);

  return;
}

/*
 * css_set_exec_path () -
 *   return: none
 *
 *   exec_path(in):
 */
void
css_set_exec_path (char *exec_path)
{
  assert (exec_path != NULL);
  strncpy (css_Server_exec_path, exec_path, sizeof (css_Server_exec_path) - 1);
}

char *
css_get_exec_path (void)
{
  assert (css_Server_exec_path != NULL);
  return css_Server_exec_path;
}

/*
 * css_set_argv () -
 *   return: none
 *
 *   argv(in):
 */
void
css_set_argv (char **argv)
{
  assert (argv != NULL);
  css_Server_argv = argv;
}

char **
css_get_argv (void)
{
  assert (css_Server_argv != NULL);
  return css_Server_argv;
}

void
css_request_shutdown_conn (css_conn_entry * conn, uint8_t ignore, bool retry, int wait_time)
{
  cubconn::connection::worker::message request;
  int r;

  assert (conn);

  request.type = cubconn::connection::worker::message_type::SHUTDOWN_CLIENT;
  request.conn = conn;
  request.ignore = static_cast < cubconn::connection::ignore_level > (ignore);
  request.retry = retry;

  /* lock to access worker and context */
  r = rmutex_lock (NULL, &conn->cmutex);
  assert (r == NO_ERROR);

  if (conn->worker == nullptr || conn->context == nullptr)
    {
      _er_log_debug (__FILE__, __LINE__,
             "css_request_shutdown_conn: worker already cleared for conn = %p, fd = %d\n", (void *) conn,
             conn->fd);

      /* unlock */
      r = rmutex_unlock (NULL, &conn->cmutex);
      assert (r == NO_ERROR);

      return;
    }

  auto func =[conn] ()noexcept {
    /* unlock */
    rmutex_unlock (NULL, &conn->cmutex);
  };

  if (!conn->worker->enqueue_and_notify (cubconn::connection::worker::queue_type::LAZY,
                     std::move (request), func, wait_time))
    {
      assert_release (false);
    }
}

void
css_request_release_packet (css_conn_entry * conn, void *buffer)
{
  cubconn::connection::worker::message request;
  int r;

  assert (conn && buffer);

  request.type = cubconn::connection::worker::message_type::RELEASE_PACKET;
  request.conn = conn;
  request.packet.emplace_back ((std::byte *) buffer, 0 /* idk the size */ );

  /* lock to access worker and context */
  r = rmutex_lock (NULL, &conn->cmutex);
  assert (r == NO_ERROR);

  if (conn->worker == nullptr || conn->context == nullptr)
    {
      _er_log_debug (__FILE__, __LINE__,
             "css_request_release_packet: worker already cleared for conn = %p, fd = %d, buffer = %p\n",
             (void *) conn, conn->fd, buffer);

      /* unlock */
      r = rmutex_unlock (NULL, &conn->cmutex);
      assert (r == NO_ERROR);

      return;
    }

  conn->worker->enqueue (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request));

  /* unlock */
  r = rmutex_unlock (NULL, &conn->cmutex);
  assert (r == NO_ERROR);
}

void
css_wakeup_handler (css_conn_entry * conn)
{
  int r;

  assert (conn);

  /* lock to access worker and context */
  r = rmutex_lock (NULL, &conn->cmutex);
  assert (r == NO_ERROR);

  if (conn->worker == nullptr || conn->context == nullptr)
    {
      /* unlock */
      r = rmutex_unlock (NULL, &conn->cmutex);
      assert (r == NO_ERROR);

      _er_log_debug (__FILE__, __LINE__, "css_wakeup_handler: worker already cleared for conn = %p, fd = %d\n",
             (void *) conn, conn->fd);
      return;
    }

  if (!conn->worker->notify ())
    {
      assert_release (false);
    }

  /* unlock */
  r = rmutex_unlock (NULL, &conn->cmutex);
  assert (r == NO_ERROR);
}