Skip to content

File connection_worker.cpp

File List > connection > connection_worker.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * connection_worker.cpp
 */

#include <atomic>
#include <iostream>
#include <chrono>
#include <array>
#include <thread>
#include <cstdint>
#include <cstdio>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <time.h>

#include "resources.hpp"
#include "porting.h"
#include "tcp.h"
#include "network.h"
#include "network_interface_sr.h"
#include "server_support.h"
#include "connection_pool.hpp"
#include "connection_sr.h"
#include "connection_defs.h"
#include "connection_worker.hpp"
#include "buffer.hpp"
#include "thread_manager.hpp"
#include "error_manager.h"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#if 0
#define er_log_conn(...) er_log_debug (__VA_ARGS__)
#else
#define er_log_conn(...)
#endif

#if !defined (NDEBUG)
std::atomic<uint64_t> message_counter (0);
#endif

#define NEXT_STATE(ctx, sel, x) do { \
    er_log_conn (__FILE__, __LINE__, "fd = %d, set state = %d\n", ctx->m_conn ? ctx->m_conn->fd : -1, state::x); \
    (ctx->sel.m_state = state::x); \
} while (0)

namespace cubconn::connection
{
  REGISTER_CONNECTION (connection_worker, []()
  {
    return prm_get_integer_value (PRM_ID_CSS_MAX_CONNECTION_WORKER);
  });

  worker::worker (pool *pool, std::shared_ptr<coordinator> coord, std::shared_ptr<thread_watcher> watcher,
          std::size_t core, std::size_t index) :
    m_parent (pool),
    m_coordinator (coord),
    m_watcher (watcher),
    m_core (core),
    m_status (status::READY),
    m_stop (false),
    m_entry (nullptr),
    m_index (index),
    m_has_retry (false)
  {
    std::size_t i;

    m_context.reserve (256);

    /* limiter */
    m_recv_budget = static_cast<size_t> (prm_get_integer_value (PRM_ID_CSS_RECV_BUDGET_PER_CONNECTION));
    m_send_budget = static_cast<size_t> (prm_get_integer_value (PRM_ID_CSS_SEND_BUDGET_PER_CONNECTION));
    m_exhausted.reserve (128);

    /* notifier */
    m_eventfd = eventfd (0, EFD_NONBLOCK | EFD_CLOEXEC);
    m_timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
    if (m_eventfd < 0 || m_timerfd < 0)
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker: failed to create fd. %s\n", strerror (errno));
    assert_release (false);
      }

    if (!this->eventfd_register (m_eventfd) ||
    !this->eventfd_register (m_timerfd))
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker: failed to register fd\n");
    assert_release (false);
      }

    /* timer */
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    m_timer_handler[i].valid = false;
    m_timer_handler[i].latency = timer_latency::NA;
    m_timer_handler[i].function = nullptr;
    m_timer_handler[i].last_time = this->get_time_ns (CLOCK_MONOTONIC);
      }
    if (!this->eventfd_addtimer (timer_type::HIBERNATE, timer_latency::MEDIUM_LATENCY,
                 std::bind (&worker::hibernate_check, this)))
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker: failed to add timer\n");
    assert_release (false);
      }
    if (!this->eventfd_addtimer (timer_type::STATISTICS, timer_latency::MEDIUM_LATENCY,
                 std::bind (&worker::statistics_metrics_to_coordinator, this)))
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker: failed to add timer\n");
    assert_release (false);
      }
    if (!this->eventfd_addtimer (timer_type::HA, timer_latency::HIGH_LATENCY,
                 std::bind (&worker::ha_close_all_connections, this)))
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker: failed to add timer\n");
    assert_release (false);
      }

    /* request queue */
    for (i = 0; i < static_cast<std::size_t> (queue_type::TYPE_COUNT); i++)
      {
    m_queue_size[i].store (0, std::memory_order_relaxed);
      }

    m_thread = std::thread (&worker::attach, this);
  }

  worker::~worker ()
  {
    if (m_thread.joinable ())
      {
    m_thread.join ();
      }
    ::close (m_eventfd);
    ::close (m_timerfd);

    assert (m_context.size () == 0);
  }

  void worker::enqueue (queue_type type, message &&item)
  {
    assert ((item.conn ? (item.conn->fd != -1) : false) ||
        (item.ctx ? (item.ctx->m_conn ? item.ctx->m_conn->fd != -1 : false) : false) ||
        (item.id > 0) ||
        (item.type == message_type::START || item.type == message_type::SHUTDOWN ||
         item.type == message_type::HIBERNATE || item.type == message_type::AWAKEN));

#if !defined (NDEBUG)
    item.message_id = message_counter++;
#endif

    m_queue[static_cast<std::size_t> (type)].push (std::move (item));
    m_queue_size[static_cast<std::size_t> (type)].fetch_add (1, std::memory_order_release);

#if !defined (NDEBUG)
    er_log_conn (__FILE__, __LINE__,
         "enqueued message_id = %lld, request_type = %d to the worker index = %d, queue_type = %d\n", item.message_id,
         item.type, m_index, type);
#endif
  }

  bool worker::notify ()
  {
    std::uint64_t u;
    ssize_t bytes;

    u = 1;
    while (true)
      {
    bytes = ::write (m_eventfd, &u, sizeof (u));
    if (bytes == sizeof (u))
      {
        break;
      }

    if (bytes == 0 || (bytes > 0 && static_cast<unsigned long> (bytes) < sizeof (u)))
      {
        return false;
      }

    assert (bytes < 0);

    if (errno == EINTR)
      {
        continue;
      }
    if (errno == EAGAIN)
      {
        break;
      }
    return false;
      }

    er_log_conn (__FILE__, __LINE__, "requested to wake up the worker index = %d\n", m_index);
    return true;
  }

  bool worker::enqueue_and_notify (queue_type type, message &&item, std::function<void ()> func,
                   int wait_time)
  {
    /* wait_time is implemented only for  */
    /*  START                 */
    /*  SHUTDOWN_CLIENT           */
    /*  SEND_PACKET           */
    /* you must implement a logic to use a waiter whose message type is not in above */

    assert (!wait_time ||
        (item.type == message_type::START ||
         item.type == message_type::SHUTDOWN_CLIENT ||
         item.type == message_type::SEND_PACKET));

    std::shared_ptr<message_blocker> handle;
    std::unique_lock<std::mutex> lock;

    if (wait_time)
      {
    /* acquire the lock to prevent a lost wakeup */
    handle = std::make_shared<message_blocker> ();
    handle->done = false;

    lock = std::unique_lock<std::mutex> (handle->m);
    item.waiter_handle = handle;
      }

    this->enqueue (type, std::move (item));
    if (!this->notify ())
      {
    if (func)
      {
        func ();
      }
    return false;
      }

    if (func)
      {
    func ();
      }

    if (wait_time)
      {
    if (wait_time < 0)
      {
        handle->cv.wait (lock, [&] { return handle->done; });
      }
    else
      {
        handle->cv.wait_for (lock, std::chrono::seconds (wait_time), [&] { return handle->done; });
      }
      }

    return true;
  }

  uint64_t worker::get_time_ns (clockid_t type)
  {
    struct timespec ts;

    if (clock_gettime (type, &ts) == -1)
      {
    er_log_conn (__FILE__, __LINE__, "clock_gettime (CLOCK_MONOTONIC) failed: %s\n", strerror (errno));
    return 0;
      }

    return (uint64_t) ts.tv_sec * 1000000000ULL + ts.tv_nsec;
  }

  void worker::push_task_into_worker_pool (context *ctx)
  {
    /* push new task into worker pool */
    css_push_server_task (*ctx->m_conn);
  }

  void worker::purge_stale_contexts ()
  {
    coordinator::message message;

    message.type = coordinator::message_type::RETURN_TO_POOL;
    message.resource = m_removed_context;

    /* remove from the context list */
    for (context *ctx : m_removed_context)
      {
    if (m_context.erase (ctx) == 0)
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->purge_stale_contexts: context not found\n");
        continue;
      }
    m_exhausted.erase (ctx->m_id);
      }
    m_removed_context.clear ();

    m_coordinator->enqueue (std::move (message));
    if (!m_coordinator->notify ())
      {
    assert_release (false);
      }
  }

  void worker::wakeup_blocked_worker (std::shared_ptr<message_blocker> handle)
  {
    if (handle)
      {
    std::lock_guard<std::mutex> lock (handle->m);
    handle->done = true;
    handle->cv.notify_one ();
      }
  }

  bool worker::is_wait_required (context *ctx)
  {
    if (ctx->m_conn->fd == cdc_Gl.conn.fd)
      {
    return false;
      }

    return true;
  }

  bool worker::has_remaining_tasks (context *ctx)
  {
    /* handle the entries in the message queue as there may be a queued request to release the memory in ctx */
    this->handle_message_queue_by_index (queue_type::IMMEDIATE);

    if (ctx->m_ignore < ignore_level::IGNORE_ALL && !ctx->m_send.m_transmitter.empty ())
      {
    er_log_conn (ARG_FILE_LINE,
             "connection::worker->handle_connection_close: retry shutdown (conn %p): send buffer not empty\n", ctx->m_conn);
    return true;
      }

    return false;
  }

  std::pair<int, int> worker::start_connection_close (context *ctx)
  {
    css_conn_entry *conn;
    int tran_index, client_id;

    conn = ctx->m_conn;
    tran_index = conn->get_tran_index ();
    if (tran_index == NULL_TRAN_INDEX)
      {
    /* the connected client does not yet finished boot_client_register */
    /* retry */
    return { -1, -1 };
      }
    client_id = conn->client_id;
    m_entry->conn_entry = ctx->m_conn;
    css_set_thread_info (m_entry, client_id, 0, tran_index, NET_SERVER_SHUTDOWN);

    return { tran_index, client_id };
  }

  void worker::end_connection_close ()
  {
    pthread_mutex_lock (&m_entry->tran_index_lock);

    css_set_thread_info (m_entry, -1, 0, -1, -1);
    m_entry->conn_entry = NULL;
    m_entry->m_status = cubthread::entry::status::TS_RUN;

    pthread_mutex_unlock (&m_entry->tran_index_lock);
  }

  bool worker::handle_connection_close (context *ctx, bool retry, std::shared_ptr<message_blocker> handle)
  {
    std::chrono::time_point<std::chrono::steady_clock> start, end;
    message request;
    int tran_index, client_id;
    int status;

    if (ctx->m_removed)
      {
    /* wake up the thread blocked until this request is complete */
    this->wakeup_blocked_worker (handle);

    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_connection_close: already removed context. conn = %p\n",
             ctx->m_conn);
    return true;
      }

    assert_release (ctx->m_conn);

    /* change status */

    rmutex_lock (m_entry, &ctx->m_conn->rmutex);
    if (ctx->m_conn->status == CONN_OPEN)
      {
    ctx->m_conn->status = CONN_CLOSING;
      }
    status = ctx->m_conn->status;
    rmutex_unlock (m_entry, &ctx->m_conn->rmutex);

    /* get tran index and client id */

    /* the thread_p is only accessible from this connection thread (myself), but  */
    /* the entry may be accessed through unknown path because it is in the thread */
    /* manager's entry list. */
    pthread_mutex_lock (&m_entry->tran_index_lock);

    std::tie (tran_index, client_id) = this->start_connection_close (ctx);
    if (tran_index < 0 && client_id < 0)
      {
    if (this->is_wait_required (ctx))
      {
        pthread_mutex_unlock (&m_entry->tran_index_lock);

        er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_connection_close: wait for transaction index. conn = %p, fd = %d\n", ctx->m_conn,
             ctx->m_conn->fd);

        /* the connected client does not yet finished boot_client_register */
        /* this case is unusual */
        /* DO NOT RETRY. retrying may result in duplicate shutdown client requests */
        thread_sleep (50);

        pthread_mutex_lock (&m_entry->tran_index_lock);

        tran_index = ctx->m_conn->get_tran_index ();
        client_id = ctx->m_conn->client_id;
      }
      }

    pthread_mutex_unlock (&m_entry->tran_index_lock);

    /* stop the sessions associated with conn */

    css_end_server_request (ctx->m_conn);
    /* avoid infinite waiting with xtran_wait_server_active_trans() */
    m_entry->m_status = cubthread::entry::status::TS_CHECK;
    if (ctx->m_conn->session_p != NULL)
      {
    ssession_stop_attached_threads (m_entry, ctx->m_conn->session_p);
      }

    if (!retry)
      {
    /* interrupt and wake up */

    net_server_wakeup_workers (m_entry, tran_index, client_id);
      }

    /* retry until the worker related to the connection is complete */

    if (net_server_active_workers (m_entry, ctx->m_conn, tran_index, client_id) > 0)
      {
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_connection_close: net_server_active_workers. conn = %p, fd = %d\n", ctx->m_conn,
             ctx->m_conn->fd);
    goto retry;
      }

    /* check if there is any remaining task */

    if (this->has_remaining_tasks (ctx))
      {
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_connection_close: has_remaining_tasks. conn = %p, fd = %d\n", ctx->m_conn,
             ctx->m_conn->fd);
    goto retry;
      }

    /* this context has no remaining tasks */

    _er_log_debug (ARG_FILE_LINE,
           "handle_connection_close: conn %p { fd %d status %d transaction_id %d db_error %d stop_talk %d stop_phase %d }\n",
           ctx->m_conn, ctx->m_conn->fd, status, tran_index, ctx->m_conn->db_error, ctx->m_conn->stop_talk,
           ctx->m_conn->stop_phase);

    /* remove and close */

    m_events.remove_descriptor (ctx->m_conn->fd);
    if (tran_index != NULL_TRAN_INDEX)
      {
    net_server_conn_down (m_entry, tran_index);
      }

    this->end_connection_close ();

    /* clear resource */

    ctx->m_send.m_transmitter.clear ();

    start = std::chrono::steady_clock::now ();

    rmutex_lock (m_entry, &ctx->m_conn->cmutex);

    ctx->m_conn->worker = nullptr;
    ctx->m_conn->context = nullptr;

    rmutex_unlock (m_entry, &ctx->m_conn->cmutex);

    end = std::chrono::steady_clock::now ();
    m_stats.add (statistics::worker::BLOCKED_RMUTEX,
         std::chrono::duration_cast<std::chrono::microseconds> (end - start).count ());

    /* close the socket */
    css_shutdown_socket (ctx->m_conn->fd);
    ctx->m_conn->fd = INVALID_SOCKET;

    /* wake up the thread blocked until this request is complete */
    this->wakeup_blocked_worker (handle);
    this->wakeup_blocked_worker (ctx->m_send.m_blocker);

    /* any sessions that are nat cleared (e.g. cdc, flashback) should be handled here */
    css_prepare_shutdown_conn (ctx->m_conn);

    /* mark deleted and lazily release this */
    ctx->m_removed = true;
    m_removed_context.push_back (ctx);

    m_stats.sub (statistics::worker::CLIENT_NUM, 1);

    return true;

retry:
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_connection_close: retry fd = %d, ignore = %d, conn = %p\n",
         ctx->m_conn ? ctx->m_conn->fd : -1, ctx->m_ignore, ctx->m_conn);

    this->end_connection_close ();

    request.type = message_type::SHUTDOWN_CLIENT;
    request.conn = ctx->m_conn;
    request.ignore = ctx->m_ignore;
    request.retry = true;
    request.waiter_handle = handle;

    /* this request must be handled as razily */
    this->enqueue (queue_type::LAZY, std::move (request));

    /* rezily notified */
    m_has_retry = true;

    return this->eventfd_addtimer (
           timer_type::QUEUE,
           timer_latency::LOW_LATENCY,
           std::bind (&worker::handle_message_queue, this)
       );
  }

  bool worker::statistics_metrics_to_coordinator ()
  {
    coordinator::message message;

    message.type = coordinator::message_type::STATISTICS;

    message.statistics.cpu_time_ns = get_time_ns (CLOCK_THREAD_CPUTIME_ID);
    message.statistics.time_ns = get_time_ns (CLOCK_MONOTONIC);
    message.statistics.worker.first = m_index;
    message.statistics.worker.second = m_stats;
    message.statistics.contexts.reserve (m_context.size ());
    for (context *ctx : m_context)
      {
    message.statistics.contexts.emplace_back (ctx->m_id, ctx->m_stats);
      }

    /* just enqueue */
    m_coordinator->enqueue (std::move (message));

    return true;
  }

  bool worker::hibernate_check ()
  {
    if (m_status != status::HIBERNATING || !m_context.empty ())
      {
    return true;
      }

    if (!this->eventfd_stoptimer ())
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->hibernate_check: failed to stop the timer\n");
    assert_release (false);
      }

    assert (m_context.empty ());
    assert (m_exhausted.empty ());

    /* reset counters so resumed workers don't report stale totals */
    m_stats.reset ();

    return true;
  }

  bool worker::ha_close_all_connections ()
  {
    /* HA msut continue to check */
    if (css_ha_server_state () != HA_SERVER_STATE_TO_BE_STANDBY)
      {
    return true;
      }

    /* alive context */
    for (auto &ctx : m_context)
      {
    if (!ctx->m_conn->in_transaction
        && css_count_transaction_worker_threads (m_entry, ctx->m_conn->get_tran_index (), ctx->m_conn->client_id) == 0)
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->ha_close_all_connections: close fd = %d, conn = %p\n",
             ctx->m_conn->fd, ctx->m_conn);
        this->handle_connection_close (ctx);
      }
      }

    /* the actual release of the context is handled last */
    this->purge_stale_contexts ();

    return true;
  }

  bool worker::eventfd_register (int fd)
  {
    context *ctx;
    css_conn_entry *conn;

    ctx = new context ();
    conn = reinterpret_cast<css_conn_entry *> (new int { fd });
    if (!ctx || !conn)
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->eventfd_register: failed to allocate memory\n");
    return false;
      }
    ctx->m_conn = conn;

    if (!m_events.add_descriptor (fd, EPOLLET | EPOLLIN, ctx))
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->eventfd_register: add_descriptor failed\n");

    delete ctx;
    delete conn;
    return false;
      }

    return true;
  }

  bool worker::eventfd_clear (int fd)
  {
    ssize_t bytes;
    uint64_t u;

    /* read counter */
    while (true)
      {
    bytes = ::read (fd, &u, sizeof (u));
    if (bytes == sizeof (u))
      {
        break;
      }

    if (bytes == 0 || (bytes > 0 && static_cast<unsigned long> (bytes) < sizeof (u)))
      {
        return false;
      }

    assert (bytes < 0);

    if (errno == EINTR)
      {
        continue;
      }
    if (errno == EAGAIN)
      {
        break;
      }
    return false;
      }
    return true;
  }

  bool worker::eventfd_settimer (int fd, uint64_t sec, uint64_t nsec)
  {
    struct itimerspec its;

    memset (&its, 0, sizeof (its));
    its.it_value.tv_sec = sec;
    its.it_value.tv_nsec = nsec;
    its.it_interval.tv_sec = sec;
    its.it_interval.tv_nsec = nsec;

    if (timerfd_settime (fd, 0, &its, NULL) < 0)
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->eventfd_settimer: %s\n", strerror (errno));
    return false;
      }

    return true;
  }

  bool worker::eventfd_settimer (int fd, timer_latency latency)
  {
    uint64_t sec, nsec;

    sec = static_cast<uint64_t> (latency) / static_cast<uint64_t> (1e9);
    nsec = static_cast<uint64_t> (latency) % static_cast<uint64_t> (1e9);
    if (eventfd_settimer (fd, sec, nsec))
      {
    return true;
      }
    return false;
  }

  bool worker::eventfd_starttimer ()
  {
    timer_latency min;
    std::size_t i;

    min = timer_latency::NA;
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    if (m_timer_handler[i].valid)
      {
        if (min == timer_latency::NA || static_cast<uint64_t> (min) > static_cast<uint64_t> (m_timer_handler[i].latency))
          {
        min = m_timer_handler[i].latency;
          }
      }
      }

    if (min != timer_latency::NA)
      {
    return this->eventfd_settimer (m_timerfd, min);
      }
    return this->eventfd_stoptimer ();
  }

  bool worker::eventfd_stoptimer ()
  {
    return eventfd_settimer (m_timerfd, 0, 0);
  }

  bool worker::eventfd_addtimer (timer_type type, timer_latency latency, std::function<bool ()> handle)
  {
    timer_latency min;
    std::size_t i;

    if (m_timer_handler[static_cast<std::size_t> (type)].valid)
      {
    return true;
      }

    m_timer_handler[static_cast<std::size_t> (type)].valid = true;
    m_timer_handler[static_cast<std::size_t> (type)].latency = latency;
    m_timer_handler[static_cast<std::size_t> (type)].function = handle;
    m_timer_handler[static_cast<std::size_t> (type)].last_time = this->m_timens;

    min = timer_latency::NA;
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    if (m_timer_handler[i].valid)
      {
        if (min == timer_latency::NA || static_cast<uint64_t> (min) > static_cast<uint64_t> (m_timer_handler[i].latency))
          {
        min = m_timer_handler[i].latency;
          }
      }
      }

    assert (min != timer_latency::NA);

    return this->eventfd_settimer (m_timerfd, min);
  }

  bool worker::eventfd_removetimer (timer_type type)
  {
    timer_latency min;
    std::size_t i;

    /* avoid resetting the timerfd unnecessarily */
    if (!m_timer_handler[static_cast<std::size_t> (type)].valid)
      {
    return true;
      }

    m_timer_handler[static_cast<std::size_t> (type)].valid = false;

    min = timer_latency::NA;
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    if (m_timer_handler[i].valid)
      {
        if (min == timer_latency::NA || static_cast<uint64_t> (min) > static_cast<uint64_t> (m_timer_handler[i].latency))
          {
        min = m_timer_handler[i].latency;
          }
      }
      }

    if (min != timer_latency::NA)
      {
    return this->eventfd_settimer (m_timerfd, min);
      }
    return this->eventfd_stoptimer ();
  }

  bool worker::eventfd_handler (bool *eventfds)
  {
    std::size_t i;

    m_has_retry = false;

    /* event fd */
    if (eventfds[0])
      {
    eventfds[0] = false;

    if (!this->eventfd_clear (m_eventfd))
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->eventfd_handler: eventfd_clear failed\n");
        return false;
      }

    if (!this->handle_message_queue ())
      {
        return false;
      }

    m_timer_handler[static_cast<std::size_t> (timer_type::QUEUE)].last_time = m_timens;
      }

    /* timer fd */
    if (eventfds[1])
      {
    eventfds[1] = false;

    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
        if (!m_timer_handler[i].valid)
          {
        continue;
          }

        if (m_timens - m_timer_handler[i].last_time > static_cast<uint64_t> (m_timer_handler[i].latency))
          {
        if (!m_timer_handler[i].function ())
          {
            return false;
          }

        m_timer_handler[i].last_time = m_timens;
          }
      }

    if (!this->eventfd_clear (m_timerfd))
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->eventfd_handler: eventfd_clear failed\n");
        return false;
      }

    if (!m_has_retry)
      {
        return this->eventfd_removetimer (timer_type::QUEUE);
      }
      }

    return true;
  }

  bool worker::handle_message_queue_send_packet (message &item)
  {
    context *ctx;
    result status;
    int r;

    assert (item.conn);

    r = rmutex_lock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    ctx = reinterpret_cast<context *> (item.conn->context);
    if (ctx == nullptr)
      {
    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    if (item.deleter)
      {
        item.deleter ();
      }

#if !defined (NDEBUG)
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue_send_packet: message_id = %lld, context is already cleared for conn = %p\n",
             item.message_id, static_cast<void *> (item.conn));
#endif
    this->wakeup_blocked_worker (item.waiter_handle);

    return true;
      }

#if !defined (NDEBUG)
    er_log_conn (__FILE__, __LINE__, "new packet to send. message_id = %lld, fd = %d in the worker = %d\n", item.message_id,
         item.conn->fd, m_index);
#endif

    for (auto &packet : item.packet)
      {
    ctx->m_send.m_transmitter.push_for_send ({ packet.data (), packet.size () });
      }
    ctx->m_send.m_transmitter.stamp ();
    ctx->m_send.m_transmitter.push_for_deleter (std::move (item.deleter));

    /* first, try to send the packets */
    status = ctx->m_send.m_transmitter.fill (ctx->m_conn->fd);
    if (status == result::PeerReset || status == result::Error)
      {
    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    this->wakeup_blocked_worker (item.waiter_handle);

    /* ctx will be forcibly removed */
    ctx->m_ignore = ignore_level::IGNORE_ALL;

    /* this connection will be removed by main loop */
    return true;
      }

    assert (status == result::Ok || status == result::Pending);

    if (status == result::Ok)
      {
    ctx->m_send.m_transmitter.clear ();
#if !defined (NDEBUG)
    er_log_conn (__FILE__, __LINE__, "fully sent. message_id = %lld, fd = %d in the worker = %d\n", item.message_id,
             ctx->m_conn->fd, m_index);
#else
    er_log_conn (__FILE__, __LINE__, "fully sent. fd = %d in the worker = %d\n", ctx->m_conn->fd, m_index);
#endif

    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    this->wakeup_blocked_worker (item.waiter_handle);

    return true;
      }

    /* if buffer is full, register the fd to epoll loop and wait to send the others */
    if (!m_events.modify_descriptor (ctx->m_conn->fd, EPOLLET | EPOLLIN | EPOLLOUT | EPOLLRDHUP, ctx))
      {
    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    this->wakeup_blocked_worker (item.waiter_handle);

    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue_send_packet: modify_descriptor failed\n");
    return false;
      }

    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    ctx->m_send.m_blocker = std::move (item.waiter_handle);

    return true;
  }

  bool worker::handle_message_queue_release_packet (message &item)
  {
    context *ctx;
    int r;

    assert (item.conn);
    assert (item.packet.size () > 0);

    r = rmutex_lock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    ctx = reinterpret_cast<context *> (item.conn->context);
    if (ctx == nullptr)
      {
    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue_release_packet: context is already cleared for conn = %p\n",
             static_cast<void *> (item.conn));
    return true;
      }

    for (cubbase::span<std::byte> &packet : item.packet)
      {
    ctx->m_recv.m_receiver.release (packet.data ());
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue_release_packet: release packet pointer = %p\n", packet.data ());
      }

    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    return true;
  }

  bool worker::handle_message_queue_new_client (message &item)
  {
    context *ctx;

    assert (item.conn && item.conn->fd != -1);

    ctx = item.ctx;
    ctx->m_conn = item.conn;

    rmutex_lock (NULL, &ctx->m_conn->cmutex);
    ctx->m_conn->worker = this;
    ctx->m_conn->context = ctx;

    if (!m_events.add_descriptor (ctx->m_conn->fd, EPOLLET | EPOLLIN | EPOLLRDHUP, ctx))
      {
    ctx->m_conn->worker = nullptr;
    ctx->m_conn->context = nullptr;
    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    m_removed_context.push_back (ctx);
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue_new_client: add_descriptor failed\n");
    return false;
      }
    if (!m_context.insert (ctx).second)
      {
    ctx->m_conn->worker = nullptr;
    ctx->m_conn->context = nullptr;
    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    m_events.remove_descriptor (ctx->m_conn->fd);

    m_removed_context.push_back (ctx);
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue_new_client: context can not be duplicated\n");
    return false;
      }

    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    ctx->m_stats.set (statistics::context::OPEND_NS, m_timens);
    ctx->m_stats.set (statistics::context::LAST_ACTIVE_NS, m_timens);
    ctx->m_stats.set (statistics::context::LAST_MOVED_NS, 0);
    ctx->m_stats.set (statistics::context::MOVE_COUNT, 0);

    er_log_conn (__FILE__, __LINE__, "add new client that has fd = %d in the worker = %d\n", item.conn->fd, m_index);

    m_stats.add (statistics::worker::CLIENT_NUM, 1);

    /* The condition below theoretically cannot be true, so there is no need to start the timer. */
    /* But if the concurrency queue gets out of sync and the MQ processing order is disrupted,   */
    /* we must start the timer.                                  */
    if (m_status == status::HIBERNATING)
      {
    if (!this->eventfd_starttimer ())
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue_new_client: failed to start the timer\n");
        assert_release (false);
      }
      }

    return true;
  }

  bool worker::handle_message_queue_handoff_client (message &item)
  {
    coordinator::message response;
    message request;
    context *ctx;
    uint64_t id;
    bool transferred = true;

    id = item.id;
    auto iterator = std::find_if (m_context.begin (), m_context.end (), [id] (context *ptr)
    {
      return ptr->m_id == id;
    });

    if (iterator == m_context.end ())
      {
    /* the connection has already been cleaned up */
    transferred = false;
    goto respond;
      }
    ctx = *iterator;

    rmutex_lock (NULL, &ctx->m_conn->cmutex);

    /* handle the entries in the message queue */
    this->handle_message_queue_by_index (queue_type::IMMEDIATE);
    this->handle_message_queue_by_index (queue_type::LAZY);

    if (ctx->m_conn->status != CONN_OPEN || ctx->m_removed)
      {
    /* this connection will be terminated soon */
    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    transferred = false;
    goto respond;
      }

    ctx->m_conn->worker = item.worker_ptr;
    ctx->m_worker = item.worker_index;

    if (!m_events.remove_descriptor (ctx->m_conn->fd))
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue_handoff_client: add_descriptor failed\n");
    assert_release (false);
      }
    if (m_context.erase (ctx) == 0)
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue_handoff_client: context not found\n");
    assert_release (false);
      }
    m_exhausted.erase (ctx->m_id);

    /* hand off the context */
    request.type = message_type::TAKEOVER_CLIENT;
    request.ctx = ctx;
    item.worker_ptr->enqueue (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request));
    if (!item.worker_ptr->notify ())
      {
    assert_release (false);
      }

    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    ctx->m_stats.add (statistics::context::MOVE_COUNT, 1);
    m_stats.sub (statistics::worker::CLIENT_NUM, 1);

respond:
    /* respond to the coordinator */
    response.type = coordinator::message_type::HANDOFF_REPLY;
    response.transferred = transferred;
    response.from = m_index;
    response.to = item.worker_index;
    response.id = id;
    m_coordinator->enqueue (std::move (response));
    if (!m_coordinator->notify ())
      {
    assert_release (false);
      }
    return true;
  }

  bool worker::handle_message_queue_takeover_client (message &item)
  {
    context *ctx;
    int flags;

    ctx = item.ctx;

    rmutex_lock (NULL, &ctx->m_conn->cmutex);

    assert (ctx->m_conn->worker == this);
    assert (static_cast<std::size_t> (ctx->m_worker) == m_index);

    flags = EPOLLET | EPOLLIN | EPOLLRDHUP;
    if (!ctx->m_send.m_transmitter.empty ())
      {
    flags |= EPOLLOUT;
      }

    if (!m_events.add_descriptor (ctx->m_conn->fd, flags, ctx))
      {
    ctx->m_conn->worker = nullptr;
    ctx->m_conn->context = nullptr;
    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    m_removed_context.push_back (ctx);
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue_new_client: add_descriptor failed\n");
    return false;
      }
    if (!m_context.insert (ctx).second)
      {
    ctx->m_conn->worker = nullptr;
    ctx->m_conn->context = nullptr;
    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    m_events.remove_descriptor (ctx->m_conn->fd);

    m_removed_context.push_back (ctx);
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue_new_client: context can not be duplicated\n");
    return false;
      }

    rmutex_unlock (NULL, &ctx->m_conn->cmutex);

    ctx->m_stats.set (statistics::context::LAST_MOVED_NS, m_timens);

    er_log_conn (__FILE__, __LINE__, "take over the client that has fd = %d in the worker = %d\n", ctx->m_conn->fd,
         m_index);

    m_stats.add (statistics::worker::CLIENT_NUM, 1);

    /* The condition below theoretically cannot be true, so there is no need to start the timer. */
    /* But if the concurrency queue gets out of sync and the MQ processing order is disrupted,   */
    /* we must start the timer.                                  */
    if (m_status == status::HIBERNATING)
      {
    if (!this->eventfd_starttimer ())
      {
        er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue_takeover_client: failed to start the timer\n");
        assert_release (false);
      }
      }

    return true;
  }

  bool worker::handle_message_queue_shutdown_client (message &item)
  {
    context *ctx;
    int r;

    assert (item.conn);

    r = rmutex_lock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    ctx = reinterpret_cast<context *> (item.conn->context);
    if (ctx == nullptr)
      {
    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue_shutdown_client: context is already cleared for conn = %p\n",
             static_cast<void *> (item.conn));
    return true;
      }

    ctx->m_ignore = item.ignore;

    r = rmutex_unlock (m_entry, &item.conn->cmutex);
    assert (r == NO_ERROR);

    this->handle_connection_close (ctx, item.retry, item.waiter_handle);

    return true;
  }

  bool worker::handle_message_queue_start (message &item)
  {
    assert (m_status == status::READY || m_status == status::RUNNING);

    m_status = status::RUNNING;
    this->wakeup_blocked_worker (item.waiter_handle);

    return true;
  }

  bool worker::handle_message_queue_hibernate (message &item)
  {
    m_status = status::HIBERNATING;

    return true;
  }

  bool worker::handle_message_queue_awaken (message &item)
  {
    if (!this->eventfd_starttimer ())
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue_awaken: failed to start the timer\n");
    assert_release (false);
      }

    m_status = status::RUNNING;

    return true;
  }

  bool worker::handle_message_queue_shutdown (message &item)
  {
    m_status = status::TERMINATING;
    m_stop = true;

    return true;
  }

  bool worker::handle_message_queue_by_index (queue_type type)
  {
    static constexpr std::array<
    std::pair<bool (worker::*) (message &), statistics::worker>, static_cast<std::size_t> (message_type::TYPE_COUNT)
    > handler =
    {
      {
    /* START       */ { &worker::handle_message_queue_start,        statistics::worker::NA },
    /* HIBERNATE       */ { &worker::handle_message_queue_hibernate,    statistics::worker::NA },
    /* AWAKEN      */ { &worker::handle_message_queue_awaken,       statistics::worker::NA },
    /* SHUTDOWN    */ { &worker::handle_message_queue_shutdown,     statistics::worker::NA },
    /* NEW_CLIENT      */ { &worker::handle_message_queue_new_client,   statistics::worker::MQ_NEW_CLIENT },
    /* HANDOFF_CLIENT  */ { &worker::handle_message_queue_handoff_client,   statistics::worker::MQ_HANDOFF_CLIENT },
    /* TAKEOVER_CLIENT */ { &worker::handle_message_queue_takeover_client,  statistics::worker::MQ_TAKEOVER_CLIENT },
    /* SHUTDOWN_CLIENT */ { &worker::handle_message_queue_shutdown_client,  statistics::worker::MQ_SHUTDOWN_CLIENT },
    /* SEND_PACKET     */ { &worker::handle_message_queue_send_packet,  statistics::worker::MQ_SEND_PACKET },
    /* RELEASE_PACKET  */ { &worker::handle_message_queue_release_packet,   statistics::worker::MQ_RELEASE_PACKET }
      }
    };
    message request;
    uint64_t size, i;

    static_assert (static_cast<int> (message_type::START) == 0, "message_type must start at 0");
    static_assert (static_cast<int> (message_type::TYPE_COUNT) == handler.size (), "handler table size must match");
    static_assert (static_cast<int> (message_type::TYPE_COUNT) == 10, "this must be modified");

    i = 0;
    size = m_queue_size[static_cast<std::size_t> (type)].exchange (0, std::memory_order_acquire);
    m_stats.add (statistics::worker::MQ_REQUESTED, size);

    while (i++ < size && m_queue[static_cast<std::size_t> (type)].try_pop (request))
      {
#if !defined (NDEBUG)
    er_log_conn (__FILE__, __LINE__,
             "recevied message_id = %lld, request_type = %d from message queue in the worker = %d\n", request.message_id,
             request.type, m_index);
#endif
    if (! (message_type::START <= request.type && message_type::TYPE_COUNT > request.type))
      {
        er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_message_queue: received unknown event from eventfd in the worker = %d\n", m_index);
        assert_release (false);
        continue;
      }

    if (! (this->*handler[static_cast <std::size_t> (request.type)].first) (request))
      {
        return false;
      }
    if (handler[static_cast <std::size_t> (request.type)].second != statistics::worker::NA)
      {
        m_stats.add (handler[static_cast <std::size_t> (request.type)].second, 1);
      }
    m_stats.add (statistics::worker::MQ_COMPLETED, 1);
      }

    return true;
  }

  bool worker::handle_message_queue ()
  {
    std::size_t i;

    for (i = 0; i < static_cast<std::size_t> (queue_type::TYPE_COUNT); i++)
      {
    if (!this->handle_message_queue_by_index (static_cast<queue_type> (i)))
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->handle_message_queue: handle_message_queue_by_index failed\n");
        return false;
      }
      }

    /* the actual release of the context is handled last */
    this->purge_stale_contexts ();

    return true;
  }

  void worker::handle_hangup_or_error (context *ctx, bool err)
  {
    socklen_t length;
    int error;

    if (err)
      {
    error = 0;
    length = sizeof (error);
    if (getsockopt (ctx->m_conn->fd, SOL_SOCKET, SO_ERROR, &error, &length) == 0)
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->handle_hangup_or_error: socket error (EPOLLERR) on fd %d: %s",
             ctx->m_conn->fd,
             strerror (error));
      }
    else
      {
        er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_hangup_or_error: socket error (EPOLLERR) on fd %d, but getsockopt failed.",
             ctx->m_conn->fd);
      }
      }
    else
      {
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_hangup_or_error: connection closed by peer (HUP/RDHUP) on fd %d.",
             ctx->m_conn->fd);
      }

    /* ctx will be forcibly removed */
    ctx->m_ignore = ignore_level::IGNORE_ALL;

    this->handle_connection_close (ctx);
  }

  result worker::handle_error_packet (context *ctx, cubbase::span<std::byte> &packet)
  {
    css_conn_entry *conn;
    css_error_code error;
    NET_HEADER *header;
    int size;

    assert (ctx->m_recv.m_header.size () == sizeof (NET_HEADER));

    conn = ctx->m_conn;
    header = reinterpret_cast<NET_HEADER *> (ctx->m_recv.m_header.data ());

    size = ntohl (header->buffer_size);
    if (packet.size () != static_cast<std::size_t> (size) && packet.size () != ((static_cast<std::size_t> (size) + 7) & ~7))
      {
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_error_packet: the expected size by header and packet size is different\n");
    return result::Skewed;
      }

    if (!css_is_request_aborted (conn, ctx->m_recv.m_request_id))
      {
    error = css_add_queue_entry (conn, &conn->error_queue, ctx->m_recv.m_request_id,
                     reinterpret_cast<char *> (packet.data ()),
                     packet.size (), NO_ERRORS, conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
    if (error != NO_ERRORS)
      {
        ctx->m_recv.m_receiver.release (packet.data ());
        return result::Error;
      }
      }
    else
      {
    ctx->m_recv.m_receiver.release (packet.data ());
      }
    ctx->m_recv.m_command = false;
    NEXT_STATE (ctx, m_recv, HEADER);
    return result::Ok;
  }

  result worker::handle_data_packet (context *ctx, cubbase::span<std::byte> &packet)
  {
    THREAD_ENTRY *waiter_thread;
    css_wait_queue_entry *waiter;
    css_conn_entry *conn;
    css_error_code error;
    NET_HEADER *header;
    int size;

    assert (ctx->m_recv.m_header.size () == sizeof (NET_HEADER));

    conn = ctx->m_conn;
    header = reinterpret_cast<NET_HEADER *> (ctx->m_recv.m_header.data ());

    size = ntohl (header->buffer_size);
    if (packet.size () != static_cast<std::size_t> (size) && packet.size () != ((static_cast<std::size_t> (size) + 7) & ~7))
      {
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_data_packet: the expected size by header and packet size is different\n");
    return result::Skewed;
      }

    /* check if there is thread waiting for data */
    waiter_thread = NULL;
    waiter = css_find_and_remove_wait_queue_entry (&conn->data_wait_queue, ctx->m_recv.m_request_id);
    if (waiter != NULL)
      {
    waiter_thread = waiter->thrd_entry;
    waiter_thread->next_wait_thrd = NULL;
      }

    if (!css_is_request_aborted (conn, ctx->m_recv.m_request_id))
      {
    if (waiter)
      {
        *waiter->buffer = reinterpret_cast<char *> (packet.data ());
        *waiter->size = packet.size ();
        *waiter->rc = NO_ERRORS;
        waiter->thrd_entry = NULL;
        css_free_wait_queue_entry (conn, waiter);
      }
    else
      {
        /* if waiter not exists, add to data queue */
        error = css_add_queue_entry (conn, &conn->data_queue, ctx->m_recv.m_request_id,
                     reinterpret_cast<char *> (packet.data ()),
                     packet.size (), NO_ERRORS, conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
        if (error != NO_ERRORS)
          {
        ctx->m_recv.m_receiver.release (packet.data ());
        return result::Error;
          }
      }
      }
    else
      {
    if (waiter)
      {
        *waiter->buffer = NULL;
        *waiter->size = 0;
        *waiter->rc = SERVER_ABORTED;
      }
      }

    if (waiter_thread)
      {
    thread_lock_entry (waiter_thread);

    assert (waiter_thread->resume_status == THREAD_CSS_QUEUE_SUSPENDED
        || waiter_thread->resume_status == THREAD_CSECT_WRITER_SUSPENDED);
    assert (waiter_thread->next_wait_thrd == NULL);

    /* When the resume_status is THREAD_CSS_QUEUE_SUSPENDED, it means the data waiting thread is still waiting on the
     * data queue. Otherwise, in case of THREAD_CSECT_WRITER_SUSPENDED, it means that the thread was timed out, is
     * trying to clear its queue buffer (see clear_wait_queue_entry_and_free_buffer function), and waiting for its
     * conn->csect. We don't need to wakeup the thread for this case. We may send useless signal for it, but it may
     * bring other anomalies: the thread may sleep on another resources which we don't know at this moment. */
    if (waiter_thread->resume_status == THREAD_CSS_QUEUE_SUSPENDED)
      {
        thread_wakeup_already_had_mutex (waiter_thread, THREAD_CSS_QUEUE_RESUMED);
      }

    thread_unlock_entry (waiter_thread);
      }

    if (ctx->m_recv.m_command)
      {
    this->push_task_into_worker_pool (ctx);
    ctx->m_recv.m_command = false;
      }

    NEXT_STATE (ctx, m_recv, HEADER);
    return result::Ok;
  }

  result worker::handle_command_header_packet (context *ctx)
  {
    css_conn_entry *conn;
    NET_HEADER *header;
    css_error_code error;

    if (css_is_request_aborted (ctx->m_conn, ctx->m_recv.m_request_id))
      {
    ctx->m_recv.m_receiver.release (ctx->m_recv.m_header.data ());
    return result::Aborted;
      }

    assert (ctx->m_recv.m_header.size () == sizeof (NET_HEADER));

    conn = ctx->m_conn;
    header = reinterpret_cast<NET_HEADER *> (ctx->m_recv.m_header.data ());

    error = css_add_queue_entry (conn, &conn->request_queue, ctx->m_recv.m_request_id,
                 reinterpret_cast<char *> (ctx->m_recv.m_header.data ()), ctx->m_recv.m_header.size (), NO_ERRORS,
                 conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
    if (error != NO_ERRORS)
      {
    ctx->m_recv.m_receiver.release (ctx->m_recv.m_header.data ());
    return result::Error;
      }

    if (ntohl (header->buffer_size) > 0)
      {
    /* data packet will be received belongs to this command */
    ctx->m_recv.m_command = true;
      }
    else
      {
    /* there is a request without no data following.            */
    /* e.g. NET_SERVER_LOG_CHECKPOINT, NET_SERVER_TM_SERVER_ABORT.  */
    this->push_task_into_worker_pool (ctx);
      }

    return result::Ok;
  }

  result worker::handle_header_packet (context *ctx, cubbase::span<std::byte> &packet)
  {
    css_conn_entry *conn;
    NET_HEADER *header;
    unsigned short flags;
    result status;

    assert (ctx->m_conn);

    if (packet.size () != sizeof (NET_HEADER))
      {
    /* 1. the state was wrong or                      */
    /* 2. the incoming packet was wrong               */
    /* in this case, we must reset the context and drain all data */
    /* from the socket to recover this state machine and handle   */
    /* the next request properly.                     */
    er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_header_packet: the expected size, sizeof (NET_HEADER) and packet size is different\n");
    return result::Skewed;
      }

    ctx->m_recv.m_header = packet;

    conn = ctx->m_conn;
    header = reinterpret_cast<NET_HEADER *> (ctx->m_recv.m_header.data ());

    ctx->m_recv.m_request_id = ntohl (header->request_id);

    if (conn->stop_talk)
      {
    return result::ClosedConnection;
      }

    conn->set_tran_index (ntohl (header->transaction_id));
    conn->db_error = (int) ntohl (header->db_error);
    flags = ntohs (header->flags);
    conn->invalidate_snapshot = flags & NET_HEADER_FLAG_INVALIDATE_SNAPSHOT ? 1 : 0;
    conn->in_method = flags & NET_HEADER_FLAG_METHOD_MODE ? true : false;

    status = result::Ok;
    switch (ntohl (header->type))
      {
      case COMMAND_TYPE:
    /* no more packets are requested */
    status = this->handle_command_header_packet (ctx);
    break;

      case DATA_TYPE:
    ctx->m_recv.m_receiver.release (packet.data ());
    NEXT_STATE (ctx, m_recv, DATA);
    break;

      case ABORT_TYPE:
    /* no more packets are requested */
    ctx->m_recv.m_receiver.release (packet.data ());
    ctx->m_recv.m_command = false;
    css_process_abort_packet (ctx->m_conn, ctx->m_recv.m_request_id);
    break;

      case CLOSE_TYPE:
    ctx->m_recv.m_receiver.release (packet.data ());
    /* no more packets are requested */
    status = result::ClosedConnection;
    break;

      case ERROR_TYPE:
    ctx->m_recv.m_receiver.release (packet.data ());
    NEXT_STATE (ctx, m_recv, ERROR);
    break;

      default:
    er_log_conn (ARG_FILE_LINE,
             "connection::worker->handle_header_packet: unknown state - will be reset by skew handler\n");
    status = result::Skewed;
    break;
      }

    return status;
  }

  result worker::handle_packet (context *ctx, cubbase::span<std::byte> &packet)
  {
    result status;

    switch (ctx->m_recv.m_state)
      {
      case state::HEADER:
    status = this->handle_header_packet (ctx, packet);
    break;

      case state::DATA:
    status = this->handle_data_packet (ctx, packet);
    break;

      case state::ERROR:
    status = this->handle_error_packet (ctx, packet);
    break;

      default:
    status = result::Error;
    er_log_conn (ARG_FILE_LINE, "connection::worker->handle_packet: unknown state\n");
    assert_release (false);
    break;
      }

    return status;
  }

  result worker::handle_reception (context *ctx, bool in_exhausted)
  {
    std::chrono::time_point<std::chrono::steady_clock> start, end;
    std::vector<cubbase::span<std::byte>> *packets;
    result status, io_status;
    int mtx;

    rmutex_lock (m_entry, &ctx->m_conn->rmutex);
    if (ctx->m_conn->status != CONN_OPEN || ctx->m_conn->stop_talk == true)
      {
    rmutex_unlock (m_entry, &ctx->m_conn->rmutex);
    this->handle_connection_close (ctx);
    return result::ClosedConnection;
      }
    rmutex_unlock (m_entry, &ctx->m_conn->rmutex);

    io_status = ctx->m_recv.m_receiver.drain (ctx->m_conn->fd, m_recv_budget);
    if (io_status == result::PeerReset || io_status == result::Error)
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_reception: status = %d\n", io_status);
    ctx->m_send.m_transmitter.empty ();
    this->handle_connection_close (ctx);
    return io_status;
      }

    assert (io_status == result::Pending || io_status == result::BudgetExhausted);

    if (!in_exhausted && io_status == result::BudgetExhausted)
      {
    handle_exhausted_add_context (ctx, EPOLLIN);
      }

    if (ctx->m_recv.m_receiver.get_result ()->empty ())
      {
    return io_status;
      }

    m_stats.add (statistics::worker::PACKET_COUNT, ctx->m_recv.m_receiver.get_result ()->size ());

    start = std::chrono::steady_clock::now ();

    /* hold m_conn */
    mtx = rmutex_lock (m_entry, &ctx->m_conn->rmutex);
    if (mtx != NO_ERROR)
      {
    return result::Error;
      }

    end = std::chrono::steady_clock::now ();
    m_stats.add (statistics::worker::BLOCKED_RMUTEX,
         std::chrono::duration_cast<std::chrono::microseconds> (end - start).count ());

    /* received at least one packet */
    packets = ctx->m_recv.m_receiver.get_result ();
    for (auto &packet : *packets)
      {
    status = this->handle_packet (ctx, packet);

    if (status == result::Skewed)
      {
        /* the packet must be ignored */
        ctx->m_recv.m_receiver.release (packet.data ());
      }
    else if (status == result::ClosedConnection)
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->handle_reception: requested to close the connection\n");
        mtx = rmutex_unlock (m_entry, &ctx->m_conn->rmutex);
        if (mtx != NO_ERROR)
          {
        return result::Error;
          }
        this->handle_connection_close (ctx);
        return status;
      }
      }

    /* release m_conn */
    mtx = rmutex_unlock (m_entry, &ctx->m_conn->rmutex);
    if (mtx != NO_ERROR)
      {
    return result::Error;
      }

    packets->clear ();

    return io_status;
  }

  result worker::handle_transmission (context *ctx, bool in_exhausted)
  {
    result status;

    status = ctx->m_send.m_transmitter.fill (ctx->m_conn->fd, m_send_budget);
    if (status == result::PeerReset || status == result::Error)
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->handle_transmission: status = %d\n", status);

    /* ctx will be forcibly removed */
    ctx->m_ignore = ignore_level::IGNORE_ALL;

    this->wakeup_blocked_worker (ctx->m_send.m_blocker);
    this->handle_connection_close (ctx);
    return status;
      }

    assert (status == result::Ok || status == result::Pending || status == result::BudgetExhausted);

    if (status == result::Ok)
      {
    this->wakeup_blocked_worker (ctx->m_send.m_blocker);

    rmutex_lock (m_entry, &ctx->m_conn->rmutex);
    if (ctx->m_conn->status == CONN_CLOSING)
      {
        rmutex_unlock (m_entry, &ctx->m_conn->rmutex);
        /* this transmission is the last handling on this connection */
        this->handle_connection_close (ctx);
        er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_transmission: this transmission is the last handling on this connection: closed\n");
        return result::ClosedConnection;
      }
    rmutex_unlock (m_entry, &ctx->m_conn->rmutex);

    if (!m_events.modify_descriptor (ctx->m_conn->fd, EPOLLET | EPOLLIN | EPOLLRDHUP, ctx))
      {
        er_log_conn (__FILE__, __LINE__, "connection::worker->handle_transmission: modify_descriptor failed\n");

        /* ctx will be forcibly removed */
        ctx->m_ignore = ignore_level::IGNORE_ALL;

        this->handle_connection_close (ctx);
        return result::Error;
      }
    ctx->m_send.m_transmitter.clear ();
    er_log_conn (__FILE__, __LINE__, "fully sent. fd = %d in the worker = %d\n", ctx->m_conn->fd, m_index);
      }
    else if (!in_exhausted && status == result::BudgetExhausted)
      {
    handle_exhausted_add_context (ctx, EPOLLOUT);
      }
    return status;
  }

  void worker::handle_exhausted_add_context (context *ctx, uint32_t event)
  {
    if (m_exhausted.find (ctx->m_id) == m_exhausted.end ())
      {
    m_exhausted[ctx->m_id].prepared = false;
    m_exhausted[ctx->m_id].events = event;
    m_exhausted[ctx->m_id].ctx = ctx;
      }
    else
      {
    assert (m_exhausted[ctx->m_id].ctx == ctx);
    m_exhausted[ctx->m_id].events |= event;
      }
    er_log_conn (__FILE__, __LINE__,
         "connection::worker->handle_exhausted_add_context: add new context into exhausted list: fd = %d\n", ctx->m_conn->fd);
  }

  bool worker::handle_exhausted ()
  {
    result status;
    context *ctx;

    for (auto it = m_exhausted.begin (); it != m_exhausted.end (); )
      {
    if (!it->second.prepared)
      {
        it->second.prepared = true;
        it++;
        continue;
      }

    ctx = it->second.ctx;
    ctx->m_stats.set (statistics::context::LAST_ACTIVE_NS, m_timens);

    if (it->second.events & EPOLLIN)
      {
        er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_exhausted: try to receive from fd = %d\n", ctx->m_conn->fd);

        status = this->handle_reception (ctx, true);
        if (status == result::ClosedConnection || status == result::PeerReset)
          {
        it = m_exhausted.erase (it);
        continue;
          }
        if (status == result::Error)
          {
        er_log_conn (__FILE__, __LINE__, "connection::worker->handle_exhausted: handle_reception failed");
        return false;
          }
        if (status == result::Pending)
          {
        er_log_conn (__FILE__, __LINE__,
                 "connection::worker->handle_exhausted: complete the reception: fd = %d\n", ctx->m_conn->fd);

        assert (it->second.events & EPOLLIN);

        it->second.events &= ~EPOLLIN;
        if (!it->second.events)
          {
            it = m_exhausted.erase (it);
            er_log_conn (__FILE__, __LINE__,
                 "connection::worker->handle_exhausted: remove context from exhausted list: fd = %d\n", ctx->m_conn->fd);
            continue;
          }
          }
      }
    if (it->second.events & EPOLLOUT)
      {
        er_log_conn (__FILE__, __LINE__,
             "connection::worker->handle_exhausted: try to send to fd = %d\n", ctx->m_conn->fd);

        status = this->handle_transmission (ctx, true);
        if (status == result::ClosedConnection || status == result::PeerReset)
          {
        it = m_exhausted.erase (it);
        continue;
          }
        if (status == result::Error)
          {
        er_log_conn (__FILE__, __LINE__, "connection::worker->handle_exhausted: handle_transmission failed");
        return false;
          }
        if (status == result::Ok || status == result::Pending)
          {
        er_log_conn (__FILE__, __LINE__,
                 "connection::worker->handle_exhausted: complete the transmission: fd = %d\n", ctx->m_conn->fd);

        assert (it->second.events & EPOLLOUT);

        it->second.events &= ~EPOLLOUT;
        if (!it->second.events)
          {
            it = m_exhausted.erase (it);
            er_log_conn (__FILE__, __LINE__,
                 "connection::worker->handle_exhausted: remove context from exhausted list: fd = %d\n", ctx->m_conn->fd);
            continue;
          }
          }
      }
    it++;
      }

    return true;
  }

  void worker::initialize ()
  {
    /* watch me */
    m_watcher->mtx.lock ();
    m_watcher->active++;
    m_watcher->mtx.unlock ();

    /* set name */
    pthread_setname_np (pthread_self (), "connections");

    /* pin myself */
    os::resources::cpu::setaffinity (m_core);

    /* entry */
    m_entry = cubthread::get_manager ()->claim_entry ();
    if (m_entry == nullptr)
      {
    er_log_conn (__FILE__, __LINE__, "connection::worker->initialize: claim_entry failed\n");
    assert_release (false);
      }
    m_entry->register_id ();
    m_entry->type = TT_SERVER;
    m_entry->tran_index = -1;
    m_entry->m_status = cubthread::entry::status::TS_RUN;
    m_entry->shutdown = false;

    m_entry->get_error_context ().register_thread_local ();

    m_context.clear ();
    m_removed_context.clear ();
  }

  void worker::finalize ()
  {
    /* alive context */
    for (auto &ctx : m_context)
      {
    ctx->m_ignore = ignore_level::IGNORE_ALL;
    this->handle_connection_close (ctx);
      }

    while (!m_context.empty ())
      {
    this->handle_message_queue_by_index (queue_type::LAZY);

    /* the actual release of the context is handled last */
    this->purge_stale_contexts ();

    /* 1 ms */
    thread_sleep (1);
      }
    m_context.clear ();

    m_entry->unregister_id ();
    cubthread::get_manager ()->retire_entry (*m_entry);

    /* remove the watcher */
    m_watcher->mtx.lock ();
    m_watcher->active--;
    m_watcher->mtx.unlock ();

    m_watcher->cv.notify_one ();
  }

  bool worker::run ()
  {
    std::array<epoll_event, 512> events;
    bool eventfds[2] = { false, false };
    result status;
    context *ctx;
    int nfds, i;

    while (!m_stop)
      {
    nfds = m_events.wait (events.data (), events.size (), m_exhausted.empty () ? TIMEOUT_INFINITE : TIMEOUT_NOWAIT);
    if (nfds < 0)
      {
        if (errno == EINTR)
          {
        continue;
          }
        er_log_conn (__FILE__, __LINE__, "master::connector->execute: m_events->wait failed: %s", strerror (errno));
        assert_release (false);
        continue;
      }

    /* criterion time to use during this loop */
    m_timens = this->get_time_ns (CLOCK_MONOTONIC);

    for (i = 0; i < nfds; i++)
      {
        assert (events[i].data.ptr);

        ctx = reinterpret_cast<context *> (events[i].data.ptr);
        if ((events[i].events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR)) &&
        ctx->m_conn->fd != m_eventfd &&
        ctx->m_conn->fd != m_timerfd)
          {
        this->handle_hangup_or_error (ctx, events[i].events & EPOLLERR);
        continue;
          }
        ctx->m_stats.set (statistics::context::LAST_ACTIVE_NS, m_timens);
        if (events[i].events & EPOLLIN)
          {
        if (ctx->m_conn->fd == m_eventfd)
          {
            eventfds[0] = true;
            continue;
          }
        else if (ctx->m_conn->fd == m_timerfd)
          {
            eventfds[1] = true;
            continue;
          }
        status = this->handle_reception (ctx, false);
        if (status == result::ClosedConnection || status == result::PeerReset)
          {
            continue;
          }
        if (status == result::Error)
          {
            er_log_conn (__FILE__, __LINE__, "connection::worker->run: handle_reception failed");
            return false;
          }
          }
        if (events[i].events & EPOLLOUT)
          {
        status = this->handle_transmission (ctx, false);
        if (status == result::ClosedConnection || status == result::PeerReset)
          {
            continue;
          }
        if (status == result::Error)
          {
            er_log_conn (__FILE__, __LINE__, "connection::worker->run: handle_transmission failed");
            return false;
          }
          }
      }

    /* exhausted */
    if (m_exhausted.size () > 0)
      {
        if (!this->handle_exhausted ())
          {
        er_log_conn (__FILE__, __LINE__, "connection::worker->run: handle_exhausted failed");
        return false;
          }
      }

    /* lazy handling */
    if (eventfds[0] || eventfds[1])
      {
        if (!this->eventfd_handler (eventfds))
          {
        er_log_conn (__FILE__, __LINE__, "connection::worker->run: eventfd_handler failed");
        return false;
          }
      }
      }

    return true;
  }

  void worker::attach ()
  {
    this->initialize ();
    this->run ();
    this->finalize ();
  }
}