Skip to content

File coordinator.cpp

File List > connection > coordinator.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * coordinator.cpp
 */

#include <random>
#include <algorithm>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <sys/epoll.h>

#include "resources.hpp"
#include "system_parameter.h"
#include "thread_manager.hpp"
#include "connection_pool.hpp"
#include "coordinator.hpp"
#include "connection_sr.h"
#include "server_support.h"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#define EWMA_ALPHA 0.06

#define VAL_TO_SCORE(w, m, s) ((w) * static_cast<double> (s) / (m))
#define EVAL_WORKER(mq, rmutex) (VAL_TO_SCORE (25, 3.5, (mq)) + VAL_TO_SCORE (500, 1, (rmutex)))
#define EVAL_CONTEXT(bytes, budget) (VAL_TO_SCORE (50, 1000, (bytes)) + VAL_TO_SCORE (10, 1, (budget)))

#if 0
#define er_log_conn(...) er_log_debug (__VA_ARGS__)
#else
#define er_log_conn(...)
#endif

namespace cubconn::connection
{
  REGISTER_CONNECTION (coordinator, 1);

  coordinator::coordinator (pool *pool, std::shared_ptr<thread_watcher> watcher, std::size_t core,
                std::uint32_t max_worker, std::uint32_t min_worker) :
    m_parent (pool),
    m_watcher (watcher),
    m_core (core),
    m_status (status::PREPARING),
    m_stop (false),
    m_max_worker (max_worker),
    m_min_worker (min_worker),
    m_current_worker (max_worker),
    m_statistics (max_worker)
  {
    std::size_t i;

    /* external controller */
    if (!m_controller.open ("/tmp/cub_server_" + std::to_string (getpid ()) + "_coordinator.sock",
                SOCK_NONBLOCK | SOCK_CLOEXEC))
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator: failed to attach controller: %s\n", strerror (errno));
    assert_release (false);
      }
    m_ctrlfd = m_controller.get_fd ();
    /* notifier */
    m_eventfd = eventfd (0, EFD_NONBLOCK | EFD_CLOEXEC);
    m_timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
    if (m_eventfd < 0 || m_timerfd < 0)
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator: failed to create fd. %s\n", strerror (errno));
    assert_release (false);
      }

    if (!this->eventfd_register (m_eventfd) ||
    !this->eventfd_register (m_timerfd) ||
    !this->eventfd_register (m_ctrlfd))
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator: failed to register fd\n");
    assert_release (false);
      }

    /* timer */
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    m_timer_handler[i].valid = false;
    m_timer_handler[i].latency = timer_latency::NA;
    m_timer_handler[i].function = nullptr;
    m_timer_handler[i].last_time = 0;
      }
    if (!this->eventfd_addtimer (timer_type::STATISTICS, timer_latency::LOW_LATENCY,
                 std::bind (&coordinator::statistics_update, this)))
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator: failed to add timer\n");
    assert_release (false);
      }
    if (!this->eventfd_addtimer (timer_type::REBALANCING, timer_latency::MEDIUM_LATENCY,
                 std::bind (&coordinator::statistics_rebalancing, this)))
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator: failed to add timer\n");
    assert_release (false);
      }
    if (!this->eventfd_addtimer (timer_type::SCALING, timer_latency::HIGH_LATENCY,
                 std::bind (&coordinator::statistics_scaling, this)))
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator: failed to add timer\n");
    assert_release (false);
      }

    /* request queue */
    m_queue_size.store (0, std::memory_order_relaxed);

    /* scaling */
    m_scaling.last_drain_ns = 0;
    m_scaling.last_expand_ns = 0;
    m_scaling.draining_worker = -1;

    /* auto scaling */
    m_scaling_statistics.status = scaling_status::STABLE;
    m_scaling_statistics.window_size =
        static_cast<std::size_t> (prm_get_integer_value (PRM_ID_CSS_AUTO_SCALING_WINDOW_SIZE));
    m_scaling_statistics.history.reserve (m_scaling_statistics.window_size + 1);
    m_scaling_statistics.previous_direction = scaling_direction::UP;
    m_scaling_statistics.previous_scale = m_max_worker;

    /* task statistics */
    m_task_statistics.workers = static_cast<std::size_t> (prm_get_integer_value (PRM_ID_TASK_WORKER));
    m_task_statistics.time_ns = get_monotonic_ns ();
    m_task_statistics.requested = { 0, 0 };
    m_task_statistics.started = { 0, 0 };
    m_task_statistics.completed = { 0, 0 };
    m_task_statistics.depth = { 0, 0 };

    /* connection statistics */
    for (i = 0; i < max_worker; i++)
      {
    m_statistics[i].m_score = 0;

    m_statistics[i].m_core = 0;
    m_statistics[i].m_last_cpu_time = 0;

    m_statistics[i].m_client_num = 0;
    m_statistics[i].m_last_updated = 0;

    /* this doesn't use much memory */
    m_statistics[i].m_contexts.reserve (256);
      }

    m_thread = std::thread (&coordinator::attach, this);
  }

  coordinator::~coordinator ()
  {
    if (m_thread.joinable ())
      {
    m_thread.join ();
      }
    ::close (m_eventfd);
    ::close (m_timerfd);
  }

  void coordinator::enqueue (message &&item)
  {
    m_queue.push (std::move (item));
    m_queue_size.fetch_add (1, std::memory_order_release);
  }

  bool coordinator::notify ()
  {
    std::uint64_t u;
    ssize_t bytes;

    u = 1;
    while (true)
      {
    bytes = ::write (m_eventfd, &u, sizeof (u));
    if (bytes == sizeof (u))
      {
        break;
      }

    if (bytes == 0 || (bytes > 0 && static_cast<unsigned long> (bytes) < sizeof (u)))
      {
        return false;
      }

    assert (bytes < 0);

    if (errno == EINTR)
      {
        continue;
      }
    if (errno == EAGAIN)
      {
        break;
      }
    return false;
      }

    return true;
  }

  uint64_t coordinator::get_monotonic_ns ()
  {
    struct timespec ts;

    if (clock_gettime (CLOCK_MONOTONIC, &ts) == -1)
      {
    er_log_conn (__FILE__, __LINE__, "clock_gettime (CLOCK_MONOTONIC) failed: %s\n", strerror (errno));
    return 0;
      }

    return (uint64_t) ts.tv_sec * 1000000000ULL + ts.tv_nsec;
  }

  bool coordinator::random_bit ()
  {
    static std::mt19937 gen (std::random_device { } ());
    static std::bernoulli_distribution d (0.5);

    return d (gen);
  }

  bool coordinator::transfer_connection (uint64_t id, int from, int to)
  {
    std::vector<std::unique_ptr<worker>> &workers = m_parent->get_workers ();
    connection::worker::message request;

    assert (static_cast<std::size_t> (from) < m_max_worker);
    assert (static_cast<std::size_t> (to) < m_max_worker);
    assert (from != to);
    assert (id > 0);

    if (m_migrating.find (id) != m_migrating.end ())
      {
    /* already in flight */
    return false;
      }
    m_migrating.insert (id);

    assert (m_statistics[from].m_contexts.find (id) != m_statistics[from].m_contexts.end ());
    assert (m_statistics[to].m_contexts.find (id) == m_statistics[to].m_contexts.end ());

    auto stats = m_statistics[from].m_contexts.find (id);
    m_statistics[to].m_contexts.emplace (
        stats->first,
        std::pair<
        statistics::metrics<statistics::context, double>,
        statistics::metrics<statistics::context>
        > (stats->second.first, stats->second.second)
    );
    /* the stats in worker[from] are removed when the worker responds. */

    request.type = connection::worker::message_type::HANDOFF_CLIENT;
    request.id = stats->first;
    request.worker_ptr = workers[to].get ();
    request.worker_index = to;

    workers[from]->enqueue (cubconn::connection::worker::queue_type::LAZY, std::move (request));
    if (!workers[from]->notify ())
      {
    assert_release (false);
      }

    return true;
  }

  bool coordinator::scale_up ()
  {
    std::vector<std::unique_ptr<worker>> &workers = m_parent->get_workers ();
    connection::worker::message request;

    if (m_current_worker >= m_max_worker ||
    m_status != status::STABLE)
      {
    /* there is no extra worker */
    return false;
      }

    assert (m_current_worker < m_max_worker);

    m_scaling.draining_worker = -1;

    m_status = status::EXPANDING;

    /* new clients must be entered in this worker */
    m_statistics[m_current_worker].m_score = 0;

    request.type = connection::worker::message_type::AWAKEN;
    workers[m_current_worker]->enqueue (cubconn::connection::worker::queue_type::LAZY, std::move (request));
    if (!workers[m_current_worker]->notify ())
      {
    assert_release (false);
      }
    m_current_worker++;

    m_scaling.last_expand_ns = get_monotonic_ns ();

    m_status = status::STABLE;

    return true;
  }

  bool coordinator::scale_down_finish ()
  {
    std::vector<std::unique_ptr<worker>> &workers = m_parent->get_workers ();
    connection::worker::message request;

    request.type = connection::worker::message_type::HIBERNATE;
    workers[m_scaling.draining_worker]->enqueue (cubconn::connection::worker::queue_type::LAZY, std::move (request));
    if (!workers[m_scaling.draining_worker]->notify ())
      {
    assert_release (false);
      }

    /* clear the statistics */
    m_statistics[m_scaling.draining_worker].m_score = 0;
    m_statistics[m_scaling.draining_worker].m_core = 0;
    m_statistics[m_scaling.draining_worker].m_last_cpu_time = 0;
    m_statistics[m_scaling.draining_worker].m_client_num = 0;
    m_statistics[m_scaling.draining_worker].m_last_updated = 0;
    m_statistics[m_scaling.draining_worker].m_sum.reset ();
    m_statistics[m_scaling.draining_worker].m_worker.first.reset ();
    m_statistics[m_scaling.draining_worker].m_worker.second.reset ();
    m_statistics[m_scaling.draining_worker].m_contexts.clear ();

    m_scaling.last_drain_ns = get_monotonic_ns ();
    m_scaling.draining_worker = -1;

    m_status = status::STABLE;

    return true;
  }

  bool coordinator::scale_down ()
  {
    std::size_t newhome;

    if (m_current_worker <= m_min_worker ||
    m_status != status::STABLE)
      {
    /* the number of workers cannot be further reduced */
    return false;
      }

    m_current_worker--;

    /* TODO: we need more graceful migration method using statistics */
    for (auto &stats : m_statistics[m_current_worker].m_contexts)
      {
    std::tie (newhome, std::ignore) = this->statistics_find_score_extremes ();
    transfer_connection (stats.first, m_current_worker, newhome);

    /* TODO: m_statistics[newhome].m_score += CLIENT_SCORE */
      }

    /* register the target worker index */
    m_scaling.draining_worker = m_current_worker;

    m_status = status::DRAINING;

    return true;
  }

  void coordinator::scale_trial ()
  {
    m_scaling_statistics.history.clear ();

    if (m_scaling_statistics.previous_scale == m_current_worker)
      {
    m_scaling_statistics.direction = m_scaling_statistics.previous_direction == scaling_direction::DOWN ?
                     scaling_direction::UP : scaling_direction::DOWN;
      }
    else
      {
    m_scaling_statistics.direction = m_scaling_statistics.previous_direction;
      }

    if (m_scaling_statistics.direction == scaling_direction::DOWN)
      {
    m_scaling_statistics.count = std::min (m_current_worker - m_min_worker,
                           static_cast<std::uint32_t> (m_scaling_statistics.window_size));
      }
    else
      {
    m_scaling_statistics.count = std::min (m_max_worker - m_current_worker,
                           static_cast<std::uint32_t> (m_scaling_statistics.window_size));
      }

    if (m_scaling_statistics.count == 0)
      {
    m_scaling_statistics.previous_direction = (m_scaling_statistics.previous_direction == scaling_direction::DOWN) ?
        scaling_direction::UP : scaling_direction::DOWN;
    m_scaling_statistics.status = scaling_status::STABLE;
      }
    else
      {
    m_scaling_statistics.status = scaling_status::TRIAL;
      }
  }

  std::size_t coordinator::scale_selection ()
  {
    static std::mt19937 gen (std::random_device { } ());
    std::vector<std::size_t> candidates;
    double max_score;

    max_score = 0;
    for (auto &stats : m_scaling_statistics.history)
      {
    if (max_score < stats.score)
      {
        max_score = stats.score;
      }
      }

    for (auto &stats : m_scaling_statistics.history)
      {
    if (max_score * 0.95 < stats.score)
      {
        candidates.push_back (stats.scale);
      }
      }

    if (candidates.size () != 0)
      {
    std::uniform_int_distribution<size_t> dis (0, candidates.size () - 1);

    return candidates[dis (gen)];
      }
    return m_current_worker;
  }

  void coordinator::statistics_EWMA (double alpha, uint64_t time_delta, double &acc, uint64_t &prev, uint64_t current)
  {
    double diff;

    diff = 0;
    if (current > prev)
      {
    diff = static_cast<double> (current - prev);
      }
    acc = acc * (1 - alpha) + diff * (alpha / (time_delta * 1e-6));
    prev = current;
  }

  std::pair<std::size_t, std::size_t> coordinator::statistics_find_score_extremes ()
  {
    double max, min;
    std::size_t i;

    max = 0;
    min = 0;
    for (i = 1; i < m_current_worker; i++)
      {
    if (m_statistics[i].m_score < m_statistics[min].m_score)
      {
        min = i;
      }
    else if (m_statistics[i].m_score >= m_statistics[max].m_score)
      {
        max = i;
      }
      }

    return { min, max };
  }

  void coordinator::statistics_update_score (std::size_t worker)
  {
#define EWMA_CONTEXT(key) c_ewma.get (statistics::context::key)
#define EWMA_WORKER(key) w_ewma.get (statistics::worker::key)

    statistics::metrics<statistics::context, double> &c_ewma = m_statistics[worker].m_sum;
    statistics::metrics<statistics::worker, double> &w_ewma = m_statistics[worker].m_worker.first;

    m_statistics[worker].m_score =
        1 * static_cast<double> (m_statistics[worker].m_client_num) / 1 +

        EVAL_WORKER (EWMA_WORKER (MQ_COMPLETED), EWMA_WORKER (BLOCKED_RMUTEX)) +

        EVAL_CONTEXT (EWMA_CONTEXT (BYTES_IN_TOTAL) + EWMA_CONTEXT (BYTES_OUT_TOTAL),
              EWMA_CONTEXT (RECV_BUDGET_HIT) + EWMA_CONTEXT (SEND_BUDGET_HIT));

#undef EWMA_CONTEXT
#undef EWMA_WORKER
  }

  void coordinator::statistics_update_connection (uint64_t delta,
      std::pair<std::size_t, statistics::metrics<statistics::worker>> &worker,
      std::vector<std::pair<uint64_t, statistics::metrics<statistics::context>>> &contexts)
  {
    std::size_t index;

    index = worker.first;

    if (m_statistics[index].m_last_updated)
      {
    m_statistics[index].m_sum.reset ();

    /* update EWMA */
    this->statistics_EWMA (EWMA_ALPHA, delta, m_statistics[index].m_worker.first, m_statistics[index].m_worker.second,
                   worker.second);
    for (auto &stats : contexts)
      {
        assert (m_statistics[index].m_contexts.find (stats.first) != m_statistics[index].m_contexts.end ());

        this->statistics_EWMA (EWMA_ALPHA, delta, m_statistics[index].m_contexts[stats.first].first,
                   m_statistics[index].m_contexts[stats.first].second, stats.second);
      }
      }
    else
      {
    /* there is no previous */
    m_statistics[index].m_worker.first = worker.second;
    m_statistics[index].m_worker.second = worker.second;

    for (auto &stats : contexts)
      {
        m_statistics[index].m_contexts[stats.first].first = stats.second;
        m_statistics[index].m_contexts[stats.first].second = stats.second;
      }
      }

    /* calculate the summation */
    for (auto &stats : m_statistics[index].m_contexts)
      {
    m_statistics[index].m_sum += stats.second.first;
      }
  }

  void coordinator::statistics_update_task ()
  {
    uint64_t stats[3] = { 0, 0, 0 };
    uint64_t depth;
    uint64_t delta, time_ns;

    time_ns = get_monotonic_ns ();
    delta = time_ns - m_task_statistics.time_ns;

    css_get_task_stats (stats);
    /* queue depth is a gauge. smooth the absolute value to avoid negative delta. */
    depth = stats[0] > stats[1] ? stats[0] - stats[1] : 0;
    if (m_task_statistics.requested.second)
      {
    this->statistics_EWMA (EWMA_ALPHA, delta, m_task_statistics.requested.first, m_task_statistics.requested.second,
                   stats[0]);
    this->statistics_EWMA (EWMA_ALPHA, delta, m_task_statistics.started.first, m_task_statistics.started.second, stats[1]);
    this->statistics_EWMA (EWMA_ALPHA, delta, m_task_statistics.completed.first, m_task_statistics.completed.second,
                   stats[2]);
    this->statistics_EWMA (EWMA_ALPHA, delta, m_task_statistics.depth.first, m_task_statistics.depth.second, depth);
      }
    else
      {
    /* there is no previous */
    m_task_statistics.requested.second = stats[0];
    m_task_statistics.started.second = stats[1];
    m_task_statistics.completed.second = stats[2];
    m_task_statistics.depth.second = depth;
      }

    m_task_statistics.time_ns = time_ns;
  }

  bool coordinator::statistics_update ()
  {
    this->handle_message_queue ();
    this->statistics_update_task ();

    return true;
  }

  bool coordinator::statistics_rebalancing ()
  {
#define EWMA_CONTEXT(key) c_ewma.get (statistics::context::key)

    constexpr double threshold = 0.2;
    std::size_t min, max;
    double diff, score, target;
    uint64_t id;

    std::tie (min, max) = statistics_find_score_extremes ();
    diff = m_statistics[max].m_score - m_statistics[min].m_score;
    if (diff <= m_statistics[max].m_score * threshold)
      {
    /* no need to rebalance */
    return true;
      }

    score = -1;
    id = 0;
    for (auto &stats : m_statistics[max].m_contexts)
      {
    auto &c_ewma = stats.second.first;

    target = EVAL_CONTEXT (EWMA_CONTEXT (BYTES_IN_TOTAL) + EWMA_CONTEXT (BYTES_OUT_TOTAL),
                   EWMA_CONTEXT (RECV_BUDGET_HIT) + EWMA_CONTEXT (SEND_BUDGET_HIT));

    if (target <= diff / 2 && score < target)
      {
        score = target;
        id = stats.first;
      }
      }

    if (id != 0)
      {
    this->transfer_connection (id, max, min);
      }

    return true;

#undef EWMA_CONTEXT
  }

  bool coordinator::statistics_scaling ()
  {
    double bytes_inout;
    std::size_t selected;
    std::size_t i;

    if (m_scaling_statistics.status == scaling_status::STABLE)
      {
    this->scale_trial ();
    return true;
      }

    assert (m_scaling_statistics.status == scaling_status::TRIAL);

    /* record at this point */
    bytes_inout = 0;
    for (i = 0; i < m_max_worker; i++)
      {
    bytes_inout += m_statistics[i].m_sum.get (statistics::context::BYTES_IN_TOTAL);
    bytes_inout += m_statistics[i].m_sum.get (statistics::context::BYTES_OUT_TOTAL);
      }
    m_scaling_statistics.history.push_back (
    {
      m_current_worker,
      VAL_TO_SCORE (50, 1000, bytes_inout) + m_task_statistics.completed.first * 2
    });
    m_scaling_statistics.count--;

    if (m_scaling_statistics.count == 0)
      {
    m_scaling_statistics.previous_scale = m_current_worker;

    selected = this->scale_selection ();
    if (selected < m_current_worker)
      {
        m_scaling_statistics.previous_direction = scaling_direction::DOWN;
        this->scale_down ();
        this->scale_trial ();
      }
    else if (selected > m_current_worker)
      {
        m_scaling_statistics.previous_direction = scaling_direction::UP;
        for (i = 0; i < selected - m_current_worker; i++)
          {
        this->scale_up ();
          }
        this->scale_trial ();
      }
    else
      {
        m_scaling_statistics.status = scaling_status::STABLE;
      }
      }
    else
      {
    if (m_scaling_statistics.direction == scaling_direction::DOWN)
      {
        this->scale_down ();
      }
    else
      {
        this->scale_up ();
      }
      }

    return true;
  }

  void coordinator::statistics_print ()
  {
    double bytes_in, bytes_out;
    double mq_completed;
    double core;
    std::size_t i;

    bytes_in = 0;
    bytes_out = 0;
    mq_completed = 0;
    core = 0;

    printf ("\033[2J\033[H");
    for (i = 0; i < m_max_worker; i++)
      {
    if (m_statistics[i].m_contexts.size () == 0)
      {
        continue;
      }

    printf ("------ worker %d (%d) ------\n", static_cast<int> (i), static_cast<int> (m_statistics[i].m_contexts.size ()));

    core += m_statistics[i].m_core;

    printf ("SCORE: %lf\n", m_statistics[i].m_score);
    /*
    printf ("LAST UPDATED: %d\n", static_cast<int> (static_cast<double> (m_statistics[i].m_last_updated) / 1e9));
    printf ("CLIENT NUM: %d (EWMA): %lf)\n", static_cast<int> (m_statistics[i].m_client_num),
        m_statistics[i].m_worker.first.get (statistics::worker::CLIENT_NUM));
    printf ("CORE USAGE: %0.4lf\n", m_statistics[i].m_core);
    printf ("MQ COMPLETED: %lf\n",
        m_statistics[i].m_worker.first.get (statistics::worker::MQ_COMPLETED));
    printf ("BLOCKED RMUTEX: %lf\n",
        m_statistics[i].m_worker.first.get (statistics::worker::BLOCKED_RMUTEX));
    printf ("RECV: %lf\n", m_statistics[i].m_sum.get (statistics::context::BYTES_IN_TOTAL));
    printf ("SEND: %lf\n", m_statistics[i].m_sum.get (statistics::context::BYTES_OUT_TOTAL));
    */

    mq_completed += m_statistics[i].m_worker.first.get (statistics::worker::MQ_COMPLETED);
    bytes_in += m_statistics[i].m_sum.get (statistics::context::BYTES_IN_TOTAL);
    bytes_out += m_statistics[i].m_sum.get (statistics::context::BYTES_OUT_TOTAL);
      }

    printf ("------ summary ------\n");
    printf ("STATUS               : %s (draining worker: %d)\n",
        m_status == status::STABLE ? "STABLE" : (m_status == status::DRAINING ? "DRAINING" : "EXPANDING"),
        m_scaling.draining_worker);
    printf ("WORKER COUNT         : %d (min: %d, max: %d)\n", m_current_worker, m_min_worker, m_max_worker);
    printf ("CORE USAGE           : %0.4lf / %d\n", core, m_max_worker);
    printf ("CORE USAGE PER WORKER: %0.4lf\n", core / m_max_worker);
    printf ("BYTES IN             : %lf\n", bytes_in);
    printf ("BYTES OUT            : %lf\n\n", bytes_out);

    printf ("MQ COMPLETED         : %0.4lf\n\n", mq_completed);

    printf ("TASK REQUESTED       : %lf\n", m_task_statistics.requested.first);
    printf ("TASK STARTED         : %lf\n", m_task_statistics.started.first);
    printf ("TASK COMPLETED       : %lf\n", m_task_statistics.completed.first);
    printf ("TASK QUEUE DEPTH     : %lf\n\n", m_task_statistics.depth.first / m_task_statistics.workers * 100);
  }

  bool coordinator::eventfd_register (int fd)
  {
    if (!m_events.add_descriptor (fd, EPOLLET | EPOLLIN))
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator->eventfd_register: add_descriptor failed\n");

    return false;
      }

    return true;
  }

  bool coordinator::eventfd_clear (int fd)
  {
    ssize_t bytes;
    uint64_t u;

    /* read counter */
    while (true)
      {
    bytes = ::read (fd, &u, sizeof (u));
    if (bytes == sizeof (u))
      {
        break;
      }

    if (bytes == 0 || (bytes > 0 && static_cast<unsigned long> (bytes) < sizeof (u)))
      {
        return false;
      }

    assert (bytes < 0);

    if (errno == EINTR)
      {
        continue;
      }
    if (errno == EAGAIN)
      {
        break;
      }
    return false;
      }
    return true;
  }

  bool coordinator::eventfd_settimer (int fd, uint64_t sec, uint64_t nsec)
  {
    struct itimerspec its;

    memset (&its, 0, sizeof (its));
    its.it_value.tv_sec = sec;
    its.it_value.tv_nsec = nsec;
    its.it_interval.tv_sec = sec;
    its.it_interval.tv_nsec = nsec;

    if (timerfd_settime (fd, 0, &its, NULL) < 0)
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator->eventfd_settimer: %s\n", strerror (errno));
    return false;
      }

    return true;
  }

  bool coordinator::eventfd_settimer (int fd, timer_latency latency)
  {
    uint64_t sec, nsec;

    sec = static_cast<uint64_t> (latency) / static_cast<uint64_t> (1e9);
    nsec = static_cast<uint64_t> (latency) % static_cast<uint64_t> (1e9);
    if (eventfd_settimer (fd, sec, nsec))
      {
    return true;
      }
    return false;
  }

  bool coordinator::eventfd_starttimer ()
  {
    timer_latency min;
    std::size_t i;

    min = timer_latency::NA;
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    if (m_timer_handler[i].valid)
      {
        if (min == timer_latency::NA || static_cast<uint64_t> (min) > static_cast<uint64_t> (m_timer_handler[i].latency))
          {
        min = m_timer_handler[i].latency;
          }
      }
      }

    if (min != timer_latency::NA)
      {
    return this->eventfd_settimer (m_timerfd, min);
      }
    return this->eventfd_stoptimer ();
  }

  bool coordinator::eventfd_stoptimer ()
  {
    return eventfd_settimer (m_timerfd, 0, 0);
  }

  bool coordinator::eventfd_addtimer (timer_type type, timer_latency latency, std::function<bool ()> handle)
  {
    timer_latency min;
    std::size_t i;

    if (m_timer_handler[static_cast<std::size_t> (type)].valid)
      {
    return true;
      }

    m_timer_handler[static_cast<std::size_t> (type)].valid = true;
    m_timer_handler[static_cast<std::size_t> (type)].latency = latency;
    m_timer_handler[static_cast<std::size_t> (type)].function = handle;
    m_timer_handler[static_cast<std::size_t> (type)].last_time = this->m_timens;

    min = timer_latency::NA;
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    if (m_timer_handler[i].valid)
      {
        if (min == timer_latency::NA || static_cast<uint64_t> (min) > static_cast<uint64_t> (m_timer_handler[i].latency))
          {
        min = m_timer_handler[i].latency;
          }
      }
      }

    assert (min != timer_latency::NA);

    return this->eventfd_settimer (m_timerfd, min);
  }

  bool coordinator::eventfd_removetimer (timer_type type)
  {
    timer_latency min;
    std::size_t i;

    /* avoid resetting the timerfd unnecessarily */
    if (!m_timer_handler[static_cast<std::size_t> (type)].valid)
      {
    return true;
      }

    m_timer_handler[static_cast<std::size_t> (type)].valid = false;

    min = timer_latency::NA;
    for (i = 0; i < static_cast<std::size_t> (timer_type::TYPE_COUNT); i++)
      {
    if (m_timer_handler[i].valid)
      {
        if (min == timer_latency::NA || static_cast<uint64_t> (min) > static_cast<uint64_t> (m_timer_handler[i].latency))
          {
        min = m_timer_handler[i].latency;
          }
      }
      }

    if (min != timer_latency::NA)
      {
    return this->eventfd_settimer (m_timerfd, min);
      }
    return this->eventfd_stoptimer ();
  }

  bool coordinator::handle_message_queue_start (message &item)
  {
    return true;
  }

  bool coordinator::handle_message_queue_new_client (message &item)
  {
    static uint64_t id = 1;

    std::vector<std::unique_ptr<worker>> &workers = m_parent->get_workers ();
    connection::worker::message request;
    std::size_t worker;

    std::tie (worker, std::ignore) = statistics_find_score_extremes ();

    assert (m_statistics[worker].m_contexts.find (id) == m_statistics[worker].m_contexts.end ());

    m_statistics[worker].m_contexts.emplace (
        id,
        std::pair<statistics::metrics<statistics::context, double>, statistics::metrics<statistics::context>> { }
    );
    m_statistics[worker].m_client_num++;

    request.type = connection::worker::message_type::NEW_CLIENT;
    request.ctx = m_parent->claim_context ();
    request.ctx->m_worker = worker;
    request.ctx->m_id = id++;
    request.conn = item.conn;

    workers[worker]->enqueue (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request));
    if (!workers[worker]->notify ())
      {
    assert_release (false);
      }

    /* update score */
    this->statistics_update_score (worker);

    return true;
  }

  bool coordinator::handle_message_queue_return_to_pool (message &item)
  {
    std::size_t i;

    for (context *ctx : item.resource)
      {
    m_statistics[ctx->m_worker].m_client_num--;

    /* remove all stats with id as m_id */
    for (i = 0; i < m_max_worker; i++)
      {
        m_statistics[i].m_contexts.erase (ctx->m_id);
      }

    /* release the conneciton */
    css_free_conn (ctx->m_conn);
    m_parent->retire_context (ctx);
      }

    return true;
  }

  bool coordinator::handle_message_queue_handoff_reply (message &item)
  {
    assert (static_cast<std::size_t> (item.from) < m_max_worker);
    assert (static_cast<std::size_t> (item.to) < m_max_worker);
    assert (item.id > 0);
    assert (m_migrating.find (item.id) != m_migrating.end ());

    /* remove from in flight list */
    m_migrating.erase (item.id);

    if (m_statistics[item.from].m_contexts.find (item.id) == m_statistics[item.from].m_contexts.end () &&
    m_statistics[item.to].m_contexts.find (item.id) == m_statistics[item.to].m_contexts.end ())
      {
    /* this stats has already been cleard in return_to_pool routine */

    return true;
      }

    if (!item.transferred)
      {
    goto not_transferred;
      }

    assert (m_statistics[item.from].m_contexts.find (item.id) != m_statistics[item.from].m_contexts.end ());

    m_statistics[item.from].m_contexts.erase (item.id);
    m_statistics[item.from].m_client_num--;
    m_statistics[item.to].m_client_num++;

    return true;

not_transferred:
    assert (m_statistics[item.to].m_contexts.find (item.id) != m_statistics[item.to].m_contexts.end ());

    /* revert */
    m_statistics[item.to].m_contexts.erase (item.id);

    return true;
  }

  bool coordinator::handle_message_queue_statistics (message &item)
  {
    std::size_t index;
    uint64_t delta;

    index = item.statistics.worker.first;
    delta = item.statistics.time_ns - m_statistics[index].m_last_updated;

    /* update stats */
    m_statistics[index].m_core = static_cast <double> (item.statistics.cpu_time_ns - m_statistics[index].m_last_cpu_time) /
                 delta;

    this->statistics_update_connection (delta, item.statistics.worker,
                    item.statistics.contexts);

    m_statistics[index].m_last_cpu_time = item.statistics.cpu_time_ns;
    m_statistics[index].m_last_updated = item.statistics.time_ns;

    /* update score */
    this->statistics_update_score (index);

    /* hibernate */
    if ((m_status == status::DRAINING && static_cast<int> (index) == m_scaling.draining_worker) &&
    item.statistics.contexts.empty ())
      {
    this->scale_down_finish ();
      }

    return true;
  }

  bool coordinator::handle_message_queue_shutdown (message &item)
  {
    m_stop = true;

    return true;
  }

  bool coordinator::handle_message_queue ()
  {
    static constexpr std::array<
    bool (coordinator::*) (message &), static_cast<std::size_t> (message_type::TYPE_COUNT)
    > handler =
    {
      /* START      */ &coordinator::handle_message_queue_start,
      /* NEW_CLIENT */ &coordinator::handle_message_queue_new_client,
      /* RETURN_TO_POOL */ &coordinator::handle_message_queue_return_to_pool,
      /* HANDOFF_REPLY  */ &coordinator::handle_message_queue_handoff_reply,
      /* STATISTICS */ &coordinator::handle_message_queue_statistics,
      /* SHUTDOWN   */ &coordinator::handle_message_queue_shutdown
    };
    message request;
    uint64_t size, i;

    static_assert (static_cast<int> (message_type::START) == 0, "message_type must start at 0");
    static_assert (static_cast<int> (message_type::TYPE_COUNT) == handler.size (), "handler table size must match");
    static_assert (static_cast<int> (message_type::TYPE_COUNT) == 6, "this must be modified");

    i = 0;
    size = m_queue_size.exchange (0, std::memory_order_acquire);
    while (i++ < size && m_queue.try_pop (request))
      {
    if (! (message_type::START <= request.type && message_type::TYPE_COUNT > request.type))
      {
        er_log_conn (__FILE__, __LINE__,
             "connection::coordinator->handle_message_queue: received unknown event from eventfd\n");
        assert_release (false);
        continue;
      }
    if (! (this->*handler[static_cast <std::size_t> (request.type)]) (request))
      {
        return false;
      }
      }

    return true;
  }

  bool coordinator::handle_controller_request (control_recv &rx, control_send &tx)
  {
    const char *name_table[] =
    {
      "SHOW STATS",
      "SCALE UP",
      "SCALE DOWN",
      "CLIENT MOVE",
      "OK",
      "NOK"
    };

    static_assert (static_cast<int> (control_type::TYPE_COUNT) == sizeof (name_table) / sizeof (name_table[0]));

    printf ("\033[2J\033[H");
    printf ("controller\n");
    printf ("  type: %s\n", name_table[static_cast<std::size_t> (rx.type)]);
    printf ("  from: %d\n", rx.from);
    printf ("  to: %d\n", rx.to);
    printf ("  id: %d\n\n", rx.id);

    switch (rx.type)
      {
      case control_type::SHOW_STATS:
    this->statistics_print ();
    tx.type = control_type::OK;
    break;

      case control_type::CLIENT_MOVE:
    this->transfer_connection (rx.id, rx.from, rx.to);
    tx.type = control_type::OK;
    break;

      case control_type::SCALE_UP:
    tx.type = this->scale_up () ? control_type::OK : control_type::NOK;
    break;

      case control_type::SCALE_DOWN:
    tx.type = this->scale_down () ? control_type::OK : control_type::NOK;
    break;

      default:
    tx.type = control_type::NOK;
    break;
      }

    return true;
  }

  bool coordinator::handle_controller ()
  {
    sockaddr_un peer;
    socklen_t peerlen;
    control_recv rx;
    control_send tx;
    result status;

    while (true)
      {
    status = m_controller.recv (rx, peer, peerlen);
    if (status == result::Pending)
      {
        break;
      }
    if (status == result::Error)
      {
        return false;
      }

    assert (status == result::Ok);

    if (!this->handle_controller_request (rx, tx))
      {
        return false;
      }

    m_controller.send (tx, peer, peerlen);
      }

    return true;
  }

  void coordinator::initialize ()
  {
    /* watch me */
    m_watcher->mtx.lock ();
    m_watcher->active++;
    m_watcher->mtx.unlock ();

    /* set name */
    pthread_setname_np (pthread_self (), "coordinator");

    /* pin myself */
    os::resources::cpu::setaffinity (m_core);

    /* entry */
    m_entry = cubthread::get_manager ()->claim_entry ();
    if (m_entry == nullptr)
      {
    er_log_conn (__FILE__, __LINE__, "connection::coordinator->initialize: claim_entry failed\n");
    assert_release (false);
      }
    m_entry->register_id ();
    m_entry->type = TT_SERVER;
    m_entry->tran_index = -1;
    m_entry->m_status = cubthread::entry::status::TS_RUN;
    m_entry->shutdown = false;

    m_entry->get_error_context ().register_thread_local ();

    m_status = status::STABLE;

    m_parent->lock_resource ();
  }

  void coordinator::finalize ()
  {
    m_parent->release_resource ();

    m_entry->unregister_id ();
    cubthread::get_manager ()->retire_entry (*m_entry);

    /* remove the watcher */
    m_watcher->mtx.lock ();
    m_watcher->active--;
    m_watcher->mtx.unlock ();

    m_watcher->cv.notify_one ();
  }

  bool coordinator::run ()
  {
    std::array<epoll_event, 4> events;
    int nfds, i, j;

    while (!m_stop)
      {
    nfds = m_events.wait (events.data (), events.size (), TIMEOUT_INFINITE);
    if (nfds < 0)
      {
        if (errno == EINTR)
          {
        continue;
          }
        er_log_conn (__FILE__, __LINE__, "connection::coordinator->run: m_events->wait failed: %s", strerror (errno));
        assert_release (false);
        continue;
      }

    /* criterion time to use during this loop */
    m_timens = this->get_monotonic_ns ();

    for (i = 0; i < nfds; i++)
      {
        assert (events[i].data.fd > 0);

        if (events[i].events & EPOLLIN)
          {
        if (events[i].data.fd == m_eventfd)
          {
            if (!this->eventfd_clear (m_eventfd))
              {
            er_log_conn (__FILE__, __LINE__, "connection::coordinator->run: eventfd_clear failed\n");
            return false;
              }
            this->handle_message_queue ();
          }
        else if (events[i].data.fd == m_timerfd)
          {
            for (j = 0; j < static_cast<int> (timer_type::TYPE_COUNT); j++)
              {
            if (!m_timer_handler[j].valid)
              {
                continue;
              }

            if (m_timens - m_timer_handler[j].last_time > static_cast<uint64_t> (m_timer_handler[j].latency))
              {
                if (!m_timer_handler[j].function ())
                  {
                return false;
                  }

                m_timer_handler[j].last_time = m_timens;
              }
              }

            if (!this->eventfd_clear (m_timerfd))
              {
            er_log_conn (__FILE__, __LINE__, "connection::coordinator->eventfd_handler: eventfd_clear failed\n");
            return false;
              }
          }
        else if (events[i].data.fd == m_ctrlfd)
          {
            this->handle_controller ();
          }
          }
      }
      }

    return true;
  }

  void coordinator::attach ()
  {
    this->initialize ();
    this->run ();
    this->finalize ();
  }
}