Skip to content

File master_connector.cpp

File List > connection > master_connector.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * master_connector.cpp
 */

#include "connection_globals.h"
#include "system_parameter.h"
#include "object_representation.h"
#include "log_common_impl.h"
#include "log_lsa.hpp"
#include "log_manager.h"
#include "heartbeat.h"
#include "error_manager.h"
#include "master_connector.hpp"
#include "server_support.h"
#include "filesys_temp.hpp"
#include "connection_sr.h"
#include "tcp.h"
#include "buffer.hpp"
#include "packet_buffer.hpp"
#include "epoll.hpp"
#include "span.hpp"
#include "porting.h"

#include <tuple>
#include <cstdint>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <string>
#include <type_traits>

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#if 0
#define er_log_conn(...) er_log_debug (__VA_ARGS__)
#else
#define er_log_conn(...)
#endif

#define NEXT_STATE(c, x) do { \
    er_log_conn (__FILE__, __LINE__, "fd = %d, set state = %d\n", c->m_conn ? c->m_conn->fd : -1, state::x); \
    (c->m_state = state::x); \
} while (0)

namespace cubconn::master
{
  /* Master connector uses Main_entry_p (TT_MASTER) instead of claiming a separate entry. */
  /* It is still registered here for consistency with other connection components.    */
  REGISTER_CONNECTION (master_connector, 0);

  connector::connector () :
    m_stop (false),
    m_entry (nullptr),
    m_master_state (master_state::CLOSED),
    m_connection_pool (nullptr)
  {
    context *ctx;

    m_eventfd = eventfd (0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (m_eventfd == -1)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector: failed to create eventfd\n");
    assert_release (false);
      }
    ctx = new context ();
    if (!ctx)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector: failed to allocate memory\n");
    assert_release (false);
      }
    ctx->m_conn = reinterpret_cast<css_conn_entry *> (new int { m_eventfd });
    if (!ctx->m_conn)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector: failed to allocate memory\n");
    assert_release (false);
      }
    if (!m_events.add_descriptor (m_eventfd, EPOLLIN, ctx))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector: add_descriptor failed\n");
    delete ctx->m_conn;
    assert_release (false);
      }
    m_context.reset ();
  }

  connector::~connector ()
  {
  }

  void connector::stop () noexcept
  {
    std::uint64_t u;
    ssize_t bytes;

    /* stop */
    m_stop = true;

    /* and wakeup */
    u = 1;
    while (true)
      {
    bytes = ::write (m_eventfd, &u, sizeof (u));
    if (bytes == sizeof (u))
      {
        break;
      }

    if (bytes == 0 || (bytes > 0 && static_cast<unsigned long> (bytes) < sizeof (u)))
      {
        assert_release (false);
      }

    assert (bytes < 0);

    if (errno == EINTR)
      {
        continue;
      }
    if (errno == EAGAIN)
      {
        break;
      }
    assert_release (false);
      }
  }

  bool connector::attach (cubthread::entry &entry) noexcept
  {
    m_entry = &entry;
    return true;
  }

  bool connector::attach (connection::pool &pool) noexcept
  {
    m_connection_pool = &pool;
    return true;
  }

  bool connector::run (int port, std::string &server_name) noexcept
  {
    m_master_port = port;
    m_server_name = server_name;
    if (!this->connect (port))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->run: connect failed");
    return false;
      }

    if (!this->prepare_handshake (server_name))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->run: prepare_handshake failed");
    return false;
      }

    if (!this->execute ())
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->run: execute failed");
    return false;
      }

    return true;
  }

  inline bool connector::make_nonblocking (int fd) noexcept
  {
    int flags;

    if (__builtin_expect (
        (flags = m_events.get_flags (fd)) == -1 ||
        m_events.set_flags (fd, flags | O_NONBLOCK) == -1
        , 0))
      {
    return false;
      }
    return true;
  }

  inline bool connector::opt_socket (int fd) noexcept
  {
    int value;

    /* setsockopt with IPPROTO_TCP can fail if the fd is not a TCP socket (e.g. unix domain) */

    value = static_cast<int> (prm_get_bool_value (PRM_ID_TCP_KEEPALIVE));
    setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &value, sizeof (value));

    value = prm_get_integer_value (PRM_ID_TCP_KEEPALIVE_IDLE);
    setsockopt (fd, IPPROTO_TCP, TCP_KEEPIDLE, &value, sizeof (value));

    value = prm_get_integer_value (PRM_ID_TCP_KEEPALIVE_INTERVAL);
    setsockopt (fd, IPPROTO_TCP, TCP_KEEPINTVL, &value, sizeof (value));

    value = prm_get_integer_value (PRM_ID_TCP_KEEPALIVE_COUNT);
    setsockopt (fd, IPPROTO_TCP, TCP_KEEPCNT, &value, sizeof (value));

    return true;
  }

  inline bool connector::dispose_connection (context *ctx)
  {
    /* remove the fd which is reset by peer */
    if (!m_events.remove_descriptor (ctx->m_conn->fd))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->dispose_connection: m_events->remove_descriptor failed: %s",
             strerror (errno));
    return false;
      }
    /* if this is an error (refuse) context, the connection entry was temporarily allocated. */
    /* DO NOT PASS it to css_free_conn (which expects a pool entry). */
    if (ctx->m_has_error)
      {
    if (!IS_INVALID_SOCKET (ctx->m_conn->fd))
      {
        css_shutdown_socket (ctx->m_conn->fd);
        ctx->m_conn->fd = INVALID_SOCKET;
      }
    delete ctx->m_conn;
      }
    else
      {
    css_prepare_shutdown_conn (ctx->m_conn);
    css_free_conn (ctx->m_conn);
      }
    delete ctx;

    return true;
  }

  inline bool connector::update_epoll_events (context *ctx)
  {
    std::uint32_t flags;

    flags = EPOLLIN | EPOLLRDHUP;
    if (ctx->has_data_to_send ())
      {
    flags |= EPOLLOUT;
      }

    if (!m_events.modify_descriptor (ctx->m_conn->fd, flags, ctx))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->update_epoll_events: m_events->modify_descriptor failed: %s",
             strerror (errno));
    return false;
      }

    return true;
  }

  inline context *connector::make_context ()
  {
    context *ctx;

    ctx = new context;
    if (!ctx)
      {
    er_log_conn (__FILE__, __LINE__, "memory allocation failed: %s", strerror (errno));
    assert_release (false);
      }
    ctx->reset ();

    return ctx;
  }

  inline int connector::connect_to_master (int port) noexcept
  {
    char hostname[CUB_MAXHOSTNAMELEN];
    int fd;

    if (GETHOSTNAME (hostname, CUB_MAXHOSTNAMELEN) != 0)
      {
    er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_TCP_HOST_NAME_ERROR, 0);
    return -1;
      }

    /* connect to cub_master */
    fd = css_tcp_client_open ((char *) hostname, port);
    if (IS_INVALID_SOCKET (fd))
      {
    /* error has already been set. */
    er_log_conn (__FILE__, __LINE__, "master::connector->connect_to_master: failed to connect - error: %s",
             strerror (errno));
    return -1;
      }

    return fd;
  }

  bool connector::connect (int port) noexcept
  {
    css_conn_entry *conn;
    SOCKET fd;

    fd = this->connect_to_master (port);
    if (IS_INVALID_SOCKET (fd))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->connect: failed to connect - error: %s", strerror (errno));
    return false;
      }

    assert (!this->m_events.is_nonblocking (fd));

    if (!this->make_nonblocking (fd))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->connect: make_nonblocking failed - error: %s", strerror (errno));
    ::close (fd);
    return false;
      }

    /* make new connection */
    conn = css_make_conn (fd);
    if (!conn)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->connect: css_make_conn failed: can't recover this");
    ::close (fd);
    return false;
      }
    m_context.m_conn = conn;
    m_master_state = master_state::CONNECTED;

    return true;
  }

  inline void connector::set_registrant (css_server_proc_register *proc_register,
                     std::string &server_name) noexcept
  {
    char *p, *last;
    char **argv;

    memcpy (proc_register->server_name, server_name.c_str (), server_name.length () + 1);
    proc_register->server_name_length = server_name.length ();
    proc_register->pid = getpid ();
    strncpy_bufsize (proc_register->exec_path, css_get_exec_path ());

    p = (char *) proc_register->args;
    last = p + proc_register->CSS_SERVER_MAX_SZ_PROC_ARGS;
    for (argv = css_get_argv (); *argv; argv++)
      {
    p += snprintf (p, MAX ((last - p), 0), "%s ", *argv);
      }
  }

  inline bool connector::prepare_handshake (std::string &server_name) noexcept
  {
    NET_HEADER *header[3];
    /* header[0]: magic number packet */
    /* header[1]: command header packet */
    /* header[2]: data header for registrant packet */
    CSS_SERVER_PROC_REGISTER *registrant;
    unsigned short request_id;
    css_conn_entry *conn;

    conn = m_context.m_conn;
    /* clear the packet buffer */
    m_context.m_sendbuf.clear ();
    header[0] = m_context.allocate<NET_HEADER> ();
    header[1] = m_context.allocate<NET_HEADER> ();
    header[2] = m_context.allocate<NET_HEADER> ();
    registrant = m_context.allocate<CSS_SERVER_PROC_REGISTER> ();
    /* cub_server magic number to be delivered to cub_master */
    std::memcpy ((char *) header[0], css_Net_magic, sizeof (css_Net_magic));
    /* make the name pakcet to register this server to cub_master */
    this->set_registrant (registrant, server_name);
    /* headers */
    request_id = css_get_request_id (conn);
    css_set_net_header (header[1], COMMAND_TYPE, SERVER_REQUEST_FROM_SERVER, request_id, sizeof (CSS_SERVER_PROC_REGISTER),
            conn->get_tran_index (), conn->invalidate_snapshot, conn->db_error);
    css_set_net_header (header[2], DATA_TYPE, 0, request_id, sizeof (CSS_SERVER_PROC_REGISTER), conn->get_tran_index (),
            conn->invalidate_snapshot, conn->db_error);
    /* register the packets */
    m_context.push_for_send ({ reinterpret_cast<std::byte *> (header[0]), sizeof (NET_HEADER) });
    m_context.push_for_send ({ reinterpret_cast<std::byte *> (header[1]), sizeof (NET_HEADER) });
    m_context.push_for_send ({ reinterpret_cast<std::byte *> (header[2]), sizeof (NET_HEADER) });
    m_context.push_for_send ({ reinterpret_cast<std::byte *> (registrant), sizeof (CSS_SERVER_PROC_REGISTER) });
    /* make the packets to msghdr */
    m_context.m_sendbuf.stamp_msghdr ();

    if (!m_events.add_descriptor (conn->fd, EPOLLIN | EPOLLOUT | EPOLLRDHUP, &m_context))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->prepare_handshake: m_events->add_descriptor failed: %s",
             strerror (errno));
    return false;
      }
    m_master_state = master_state::WAIT_RESPONSE;

    return true;
  }

  inline bool connector::prepare_switch_to_unix_socket (context *ctx) noexcept
  {
    NET_HEADER *header;
    css_conn_entry *conn;

    conn = ctx->m_conn;
    /* send the pathname for the datagram */
    /* be sure to open the datagram first.  */
    m_unixpath = filesys::temp_directory_path ();
    m_unixpath += "/cubrid_tcp_setup_server" + std::to_string (getpid ());
    (void) ::unlink (m_unixpath.c_str ());

    /* setup unix domain socket and get the path */
    if (!css_tcp_setup_server_datagram (m_unixpath.c_str (), &m_unixsocket))
      {
    er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1);
    return false;
      }

    /* clear the packet buffer */
    ctx->m_sendbuf.clear ();
    header = ctx->allocate<NET_HEADER> ();
    /* unix path to open new unix connection to master */
    css_set_net_header (header, DATA_TYPE, 0, conn->request_id, m_unixpath.length () + 1, conn->get_tran_index (),
            conn->invalidate_snapshot, conn->db_error);
    ctx->push_for_send ({ reinterpret_cast<std::byte *> (header), sizeof (NET_HEADER) });
    ctx->push_for_send ({ reinterpret_cast<std::byte *> (const_cast<char *> (m_unixpath.c_str ())), m_unixpath.length () + 1 });
    /* make the packets to msghdr */
    ctx->m_sendbuf.stamp_msghdr ();

    /* update the events */
    if (!this->update_epoll_events (ctx))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->execute: update_epoll_events failed: %s", strerror (errno));
    return false;
      }
    return true;
  }

  inline bool connector::prepare_reply (context *ctx, int reason) noexcept
  {
    css_conn_entry *conn;
    NET_HEADER *header;
    int *reason_buffer;

    conn = ctx->m_conn;

    /* clear the packet buffer */
    ctx->m_sendbuf.clear ();
    header = ctx->allocate<NET_HEADER> ();
    reason_buffer = ctx->allocate<int> ();

    css_set_net_header (header, DATA_TYPE, 0, conn->request_id, sizeof (int), conn->get_tran_index (),
            conn->invalidate_snapshot, conn->db_error);
    *reinterpret_cast<int *> (reason_buffer) = htonl (reason);

    ctx->push_for_send ({ reinterpret_cast<std::byte *> (header), sizeof (NET_HEADER) });
    ctx->push_for_send ({ reinterpret_cast<std::byte *> (reason_buffer), sizeof (int) });

    /* make the packets to msghdr */
    ctx->m_sendbuf.stamp_msghdr ();

    return true;
  }

  inline bool connector::prepare_reply_refuse_connection (context *ctx, int reason) noexcept
  {
    NET_HEADER *header[2];
    css_conn_entry *conn;
    std::aligned_storage_t<1024, 8> *error_buffer;
    int *reason_buffer;
    int error_length;

    conn = ctx->m_conn;

    /* clear the packet buffer */
    ctx->m_sendbuf.clear ();
    header[0] = ctx->allocate<NET_HEADER> ();
    header[1] = ctx->allocate<NET_HEADER> ();
    reason_buffer = ctx->allocate<int> ();
    error_buffer = ctx->allocate<std::aligned_storage_t<1024, 8>> ();

    /* set reason */
    error_length = 1024;

    css_set_net_header (header[0], DATA_TYPE, 0, conn->request_id, sizeof (int), conn->get_tran_index (),
            conn->invalidate_snapshot, conn->db_error);
    *reinterpret_cast<int *> (reason_buffer) = htonl (reason);

    conn->db_error = er_errid ();
    er_get_area_error (reinterpret_cast<char *> (error_buffer), &error_length);
    css_set_net_header (header[1], ERROR_TYPE, 0, conn->request_id, error_length, conn->get_tran_index (),
            conn->invalidate_snapshot, conn->db_error);

    ctx->push_for_send ({ reinterpret_cast<std::byte *> (header[0]), sizeof (NET_HEADER) });
    ctx->push_for_send ({ reinterpret_cast<std::byte *> (reason_buffer), sizeof (int) });
    ctx->push_for_send ({ reinterpret_cast<std::byte *> (header[1]), sizeof (NET_HEADER) });
    ctx->push_for_send ({ reinterpret_cast<std::byte *> (error_buffer), static_cast<std::size_t> (error_length) });

    /* make the packets to msghdr */
    ctx->m_sendbuf.stamp_msghdr ();
    ctx->m_has_error = true;

    if (!m_events.add_descriptor (ctx->m_conn->fd, EPOLLIN | EPOLLOUT | EPOLLRDHUP, ctx))
      {
    er_log_conn (__FILE__, __LINE__,
             "master::connector->prepare_reply_refuse_connection: m_events->add_descriptor failed: %s", strerror (errno));
    return false;
      }

    er_clear ();
    return true;
  }

  inline bool connector::prepare_heartbeat_send_request (context *ctx, CSS_SERVER_REQUEST command) noexcept
  {
    int *response;

    /* clear the packet buffer */
    ctx->m_sendbuf.clear ();
    response = ctx->allocate<int> ();
    *response = htonl (command);

    ctx->push ({ reinterpret_cast<std::byte *> (response), sizeof (int) });

    /* make the packets to msghdr */
    ctx->m_sendbuf.stamp_msghdr ();

    /* update the events */
    if (!this->update_epoll_events (ctx))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->prepare_heartbeat_send_request: update_epoll_events failed: %s",
             strerror (errno));
    return false;
      }
    return true;
  }

  inline bool connector::prepare_heartbeat_send_request_with_data (context *ctx, CSS_SERVER_REQUEST command,
      std::byte *data, std::size_t size) noexcept
  {
#define BODY_SIZE 2048
    std::aligned_storage_t<BODY_SIZE, 8> *body;
    int *response;

    /* clear the packet buffer */
    ctx->m_sendbuf.clear ();

    response = ctx->allocate<int> ();
    *response = htonl (command);

    /* this must be fixed if you wanna store the data bigger than BODY_SIZE-bytes */
    /* you can generalize size using template */
    assert_release (size <= BODY_SIZE);
    body = ctx->allocate<std::aligned_storage_t<BODY_SIZE, 8>> ();
    std::memcpy (body, data, size);

    ctx->push ({ reinterpret_cast<std::byte *> (response), sizeof (int) });
    ctx->push ({ reinterpret_cast<std::byte *> (body), size });

    /* make the packets to msghdr */
    ctx->m_sendbuf.stamp_msghdr ();

#undef BODY_SIZE

    /* update the events */
    if (!this->update_epoll_events (ctx))
      {
    er_log_conn (__FILE__, __LINE__,
             "master::connector->prepare_heartbeat_send_request_with_data: update_epoll_events failed: %s", strerror (errno));
    return false;
      }
    return true;
  }

  inline bool connector::prepare_heartbeat_register (context *ctx) noexcept
  {
    hbp_proc_register *hbp_register;

    hbp_register = hb_make_set_hbp_register (HB_PTYPE_SERVER);
    if (hbp_register == NULL)
      {
    er_log_conn (ARG_FILE_LINE, "master::connector->hb_make_set_hbp_register: hbp_register failed. \n");
    return false;
      }

    if (!this->prepare_heartbeat_send_request_with_data (ctx, SERVER_REGISTER_HA_PROCESS,
    reinterpret_cast<std::byte *> (hbp_register), sizeof (*hbp_register)))
      {
    free_and_init (hbp_register);
    return false;
      }
    free_and_init (hbp_register);
    return true;
  }

  inline bool connector::prepare_heartbeat_ha_mode (context *ctx) noexcept
  {
    int *response;

    /* clear the packet buffer */
    ctx->m_sendbuf.clear ();
    response = ctx->allocate<int> ();

    if (HA_DISABLED ())
      {
    *response = htonl (HA_SERVER_STATE_NA);
      }
    else
      {
    *response = htonl (css_ha_server_state ());
      }

    ctx->push ({ reinterpret_cast<std::byte *> (response), sizeof (int) });

    /* make the packets to msghdr */
    ctx->m_sendbuf.stamp_msghdr ();

    /* update the events */
    if (!this->update_epoll_events (ctx))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->prepare_heartbeat_ha_mode: update_epoll_events failed: %s",
             strerror (errno));
    return false;
      }
    return true;
  }

  inline bool connector::prepare_heartbeat_log_eof (context *ctx) noexcept
  {
    LOG_LSA *eof_lsa;
    static LOG_LSA prev_eof_lsa = LSA_INITIALIZER;
    alignas (8) std::byte reply[OR_LOG_LSA_ALIGNED_SIZE];

    assert (m_entry != nullptr);
    LOG_CS_ENTER_READ_MODE (m_entry);

    eof_lsa = log_get_eof_lsa ();
    (void) or_pack_log_lsa (reinterpret_cast<char *> (reply), eof_lsa);

    LOG_CS_EXIT (m_entry);

    if (LSA_EQ (&prev_eof_lsa, eof_lsa))
      {
    er_log_debug (ARG_FILE_LINE, "Disk failure has been occurred: prev_eof_lsa(%lld, %d), eof_lsa(%lld, %d)\n",
              LSA_AS_ARGS (&prev_eof_lsa), LSA_AS_ARGS (eof_lsa));
      }
    else
      {
    LSA_COPY (&prev_eof_lsa, eof_lsa);
      }

    if (!this->prepare_heartbeat_send_request_with_data (ctx, SERVER_GET_EOF, reply, OR_LOG_LSA_ALIGNED_SIZE))
      {
    return false;
      }
    return true;
  }

  inline bool connector::switch_to_unix_socket (context *ctx) noexcept
  {
    int datagram_fd;

    /* wait to be reqeusted to connect from master */
    if (!css_tcp_listen_server_datagram (m_unixsocket, &datagram_fd))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->switch_to_unix_socket: css_tcp_listen_server_datagram failed: %s",
             strerror (errno));

    (void) ::unlink (m_unixpath.c_str ());
    ::close (m_unixsocket);
    er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1);
    return false;
      }

    /* remove original */
    if (!m_events.remove_descriptor (ctx->m_conn->fd))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->switch_to_unix_socket: m_events->remove_descriptor failed: %s",
             strerror (errno));
    return false;
      }

    /* only connected file descriptor is needed */
    (void) ::unlink (m_unixpath.c_str ());
    css_prepare_shutdown_conn (ctx->m_conn);
    css_free_conn (ctx->m_conn);
    ::close (m_unixsocket);

    /* new connection */
    ctx->m_conn = css_make_conn (datagram_fd);

    /* make new socket non-blocking */
    assert (!this->m_events.is_nonblocking (datagram_fd));
    if (!this->make_nonblocking (datagram_fd))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->switch_to_unix_socket: m_events->make_nonblocking failed: %s",
             strerror (errno));
    return false;
      }

    if (!m_events.add_descriptor (ctx->m_conn->fd, EPOLLIN | EPOLLRDHUP, ctx))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->switch_to_unix_socket: m_events->add_descriptor failed: %s",
             strerror (errno));
    return false;
      }

    er_log_debug (__FILE__, __LINE__, "successfully switched to unix domain socket\n");
    m_master_state = master_state::ESTABLISHED;

    return true;
  }

  inline result connector::handshake_from_master (context *ctx) noexcept
  {
    const int *buf;
    result status;
    int response;

    std::tie (status, buf) = buffered_socket::read_fixed_size<int> (ctx->m_conn->fd, ctx->m_recvbuf);
    if (status != result::Ok)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->execute: read_fixed_size returned %d", status);
    return status;
      }

    response = ntohl (*buf);
    ctx->m_recvbuf.mark_consumed ();

    er_log_debug (__FILE__, __LINE__, "cub_server received %d as response from master\n", response);

    switch (response)
      {
      case SERVER_ALREADY_EXISTS:
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_SERVER_ALREADY_EXISTS, 1, "server name");
    return result::Error;

      case SERVER_REQUEST_ACCEPTED:
    er_log_debug (__FILE__, __LINE__, "successfully connected to master\n");
    if (!this->prepare_switch_to_unix_socket (ctx))
      {
        return result::Error;
      }
    break;

      default:
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_DURING_SERVER_CONNECT, 1, "server name");
    return result::Error;
      }

    return result::Ok;
  }

  inline result connector::request_new_client (context *ctx) noexcept
  {
    context *new_ctx;
    CSS_CONN_ENTRY *conn;
    unsigned short request_id;
    SOCKET new_fd;
    result status;

    /* master context goes back to waiting for next request regardless of send path */
    NEXT_STATE (ctx, RecvRequestType);

    /* receive new socket descriptor from the master */
    new_fd = css_open_new_socket_from_master (ctx->m_conn->fd, &request_id);
    er_log_conn (__FILE__, __LINE__, "master::connector->request_new_client: unpack new socket: %d\n", new_fd);
    if (IS_INVALID_SOCKET (new_fd))
      {
    er_log_debug (__FILE__, __LINE__,
              "master::connector->request_new_client: failed to receive client socket from master. \
              this usually indicates the master process cannot accept new connections. \
              check master process logs and system fd limits (ulimit -n, /proc/sys/fs/file-max)");
    return result::Reset;
      }

    if (!this->opt_socket (new_fd) || !this->make_nonblocking (new_fd))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->request_new_client: %s", strerror (errno));
    ::close (new_fd);

    /* this return value indicates the status of the master connection, so */
    /* return RefuseConnection and close new connections here. */
    return result::RefuseConnection;
      }

    /* make new context and conn */
    new_ctx = make_context ();

    /* check */
    if (prm_get_bool_value (PRM_ID_ACCESS_IP_CONTROL) == true && css_check_accessibility (new_fd) != NO_ERROR)
      {
    NEXT_STATE (ctx, RecvRequestType);
    new_ctx->m_conn = new css_conn_entry;
    css_initialize_conn (new_ctx->m_conn, new_fd);
    new_ctx->m_conn->request_id = request_id;

    if (!this->prepare_reply_refuse_connection (new_ctx, SERVER_INACCESSIBLE_IP))
      {
        delete new_ctx->m_conn;
        delete new_ctx;

        return result::RefuseConnection;
      }

    NEXT_STATE (new_ctx, SendReplyToClient);
    return result::RefuseConnection;
      }

    conn = css_make_conn (new_fd);
    if (conn == NULL)
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_CLIENTS_EXCEEDED, 1, NUM_NORMAL_TRANS);

    new_ctx->m_conn = new css_conn_entry;
    css_initialize_conn (new_ctx->m_conn, new_fd);
    new_ctx->m_conn->request_id = request_id;

    if (!this->prepare_reply_refuse_connection (new_ctx, SERVER_CLIENTS_EXCEEDED))
      {
        delete new_ctx->m_conn;
        delete new_ctx;

        return result::RefuseConnection;
      }

    NEXT_STATE (new_ctx, SendReplyToClient);
    return result::RefuseConnection;
      }

    new_ctx->m_conn = conn;
    new_ctx->m_conn->request_id = request_id;
    if (!this->prepare_reply (new_ctx, SERVER_CONNECTED))
      {
    css_prepare_shutdown_conn (new_ctx->m_conn);
    css_free_conn (new_ctx->m_conn);
    delete new_ctx;

    return result::RefuseConnection;
      }

    /* try to send and register the fd to epoll if fails. */
    status = buffered_socket::send_partial (new_ctx->m_conn->fd, new_ctx->m_sendbuf);
    if (status == result::Ok)
      {
    this->sent_reply_to_client (new_ctx);

    return result::Ok;
      }
    else if (status == result::PeerReset || status == result::Error)
      {
    css_prepare_shutdown_conn (new_ctx->m_conn);
    css_free_conn (new_ctx->m_conn);
    delete new_ctx;

    return result::RefuseConnection;
      }

    assert (status == result::Pending);

    if (!m_events.add_descriptor (new_ctx->m_conn->fd, EPOLLIN | EPOLLOUT | EPOLLRDHUP, new_ctx))
      {
    er_log_conn (__FILE__, __LINE__,
             "master::connector->request_new_client: m_events->add_descriptor failed: %s", strerror (errno));
    css_prepare_shutdown_conn (new_ctx->m_conn);
    css_free_conn (new_ctx->m_conn);
    delete new_ctx;

    return result::RefuseConnection;
      }

    NEXT_STATE (new_ctx, SendReplyToClient);
    return result::Ok;
  }

  inline result connector::handle_request (context *ctx) noexcept
  {
    const int *buf;
    result status;
    int request;

    std::tie (status, buf) = buffered_socket::read_fixed_size<int> (ctx->m_conn->fd, ctx->m_recvbuf);
    if (status != result::Ok)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->handle_request: read_fixed_size returned %d", status);
    return status;
      }

    request = ntohl (*buf);
    ctx->m_recvbuf.mark_consumed ();

    er_log_debug (__FILE__, __LINE__, "cub_server received %d as request from master\n", request);

    switch (request)
      {
      case SERVER_START_NEW_CLIENT:
    NEXT_STATE (ctx, RecvNewClient);
    break;

      case SERVER_START_SHUTDOWN:
    if (!HA_DISABLED ())
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HB_PROCESS_EVENT, 2,
            "Disconnected with the cub_master and will shut itself down", "");
      }
    m_stop = true;
    NEXT_STATE (ctx, RecvRequestType);
    break;

      case SERVER_STOP_SHUTDOWN:
      case SERVER_SHUTDOWN_IMMEDIATE:
      case SERVER_START_TRACING:
      case SERVER_STOP_TRACING:
      case SERVER_HALT_EXECUTION:
      case SERVER_RESUME_EXECUTION:
      case SERVER_REGISTER_HA_PROCESS:
    NEXT_STATE (ctx, RecvRequestType);
    break;

      case SERVER_GET_HA_MODE:
    if (!this->prepare_heartbeat_ha_mode (ctx))
      {
        return result::Error;
      }
    NEXT_STATE (ctx, SendHBToMaster);
    break;

      case SERVER_CHANGE_HA_MODE:
    NEXT_STATE (ctx, RecvHAMode);
    break;

      case SERVER_GET_EOF:
    if (!this->prepare_heartbeat_log_eof (ctx))
      {
        return result::Error;
      }
    NEXT_STATE (ctx, SendHBToMaster);
    break;

      default:
    er_log_debug (__FILE__, __LINE__, "cub_server received unexpected request: %d\n", request);
    return result::Error;
      }

    return result::Ok;
  }

  inline result connector::change_ha_mode (context *ctx) noexcept
  {
    HA_SERVER_STATE state;
    const int *buf;
    result status;

    std::tie (status, buf) = buffered_socket::read_fixed_size<int> (ctx->m_conn->fd, ctx->m_recvbuf);
    if (status != result::Ok)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->change_ha_mode: read_fixed_size returned %d", status);
    return status;
      }

    state = (HA_SERVER_STATE) ntohl (*buf);
    ctx->m_recvbuf.mark_consumed ();

    er_log_debug (__FILE__, __LINE__, "cub_server received request to change ha mode = %d\n", state);

    assert (m_entry != nullptr);

    if (state == HA_SERVER_STATE_ACTIVE || state == HA_SERVER_STATE_STANDBY)
      {
    if (css_change_ha_server_state (m_entry, state, false, HA_CHANGE_MODE_IMMEDIATELY, true) != NO_ERROR)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_FROM_SERVER, 1, "Cannot change server HA mode");
      }
      }
    else
      {
    er_log_debug (ARG_FILE_LINE, "ERROR : unexpected state. (state :%d). \n", state);
      }

    state = (HA_SERVER_STATE) htonl ((int) css_ha_server_state ());

    if (!this->prepare_heartbeat_send_request_with_data (ctx, SERVER_CHANGE_HA_MODE, reinterpret_cast<std::byte *> (&state),
    sizeof (state)))
      {
    return result::Error;
      }

    NEXT_STATE (ctx, SendHBToMaster);
    return result::Ok;
  }

  inline bool connector::handle_master_reception (context *ctx) noexcept
  {
    result status = result::Ok;

    switch (ctx->m_state)
      {
      case state::RecvInHandshake:
    status = this->handshake_from_master (ctx);
    NEXT_STATE (ctx, SwitchToUnixSocket);
    break;

      case state::RecvRequestType:
    status = this->handle_request (ctx);
    /* next state have already been set in handle_request. */
    break;

      case state::RecvNewClient:
    status = this->request_new_client (ctx);
    /* next state have already been set in request_new_client. */
    break;

      case state::RecvHAMode:
    status = this->change_ha_mode (ctx);
    /* next state have already been set in change_ha_mode. */
    break;

      case state::SendInHandshake:
      case state::SwitchToUnixSocket:
      case state::SendReplyToClient:
      case state::SendHBToMaster:
    /* these will be handled in handle_master_transmission */
    break;

      default:
    er_log_conn (__FILE__, __LINE__, "master::connector->handle_master_connection failed: m_context->state: %d",
             ctx->m_state);
    assert_release (false);
    break;
      }

    /* Is there an error */
    if (status == result::Reset)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->handle_master_transmission: protocol is messed up somewhere");
    ctx->m_recvbuf.reset ();
    ctx->m_sendbuf.clear ();
    NEXT_STATE (ctx, RecvRequestType);
      }
    else if (status == result::PeerReset)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->handle_master_connection: reset by peer");
    return false;
      }
    else if (status == result::Error)
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->handle_master_connection: failed");
    return false;
      }

    return true;
  }

  inline void connector::sent_reply_to_client (context *ctx) noexcept
  {
    if (!ctx->m_has_error)
      {
    css_insert_into_active_conn_list (ctx->m_conn);

    ctx->m_conn->request_id = 0;
    m_connection_pool->dispatch (ctx->m_conn);
      }
    else
      {
    /* In error context, this conn entry has been temporarily allocated */
    if (!IS_INVALID_SOCKET (ctx->m_conn->fd))
      {
        css_shutdown_socket (ctx->m_conn->fd);
        ctx->m_conn->fd = INVALID_SOCKET;
      }
    delete ctx->m_conn;
      }

    ctx->m_conn = nullptr;
    delete ctx;
  }

  inline bool connector::handle_master_transmission (context *ctx) noexcept
  {
    result status;

    assert (ctx->m_state != state::RecvInHandshake &&
        ctx->m_state != state::RecvRequestType &&
        ctx->m_state != state::RecvNewClient &&
        ctx->m_state != state::RecvHAMode);
    assert (ctx && ctx->m_conn);

    if (!ctx->has_data_to_send ())
      {
    /* no data to send */
    return true;
      }

    status = buffered_socket::send_partial (ctx->m_conn->fd, ctx->m_sendbuf);
    if (status == result::PeerReset || status == result::Error)
      {
    return false;
      }
    if (status == result::Pending)
      {
    /* pending */
    return true;
      }

    assert (status == result::Ok);

    /* fully send */
    er_log_conn (__FILE__, __LINE__, "master::connector->handle_master_transmission: fully sent the data to fd = %d\n",
         ctx->m_conn->fd);

    /* move to next state */
    switch (ctx->m_state)
      {
      case state::SendInHandshake:
    NEXT_STATE (ctx, RecvInHandshake);
    break;

      case state::SwitchToUnixSocket:
    /* switching to unix domain socket */
    if (!this->switch_to_unix_socket (ctx))
      {
        er_log_conn (__FILE__, __LINE__,
             "master::connector->handle_master_transmission: master->switch_to_unix_socket failed");
        return false;
      }
    /* register myself to master */
    if (!HA_DISABLED ())
      {
        if (!this->prepare_heartbeat_register (ctx))
          {
        er_log_conn (__FILE__, __LINE__,
                 "master::connector->handle_master_transmission: prepare_heartbeat_register failed");
        return false;
          }
        NEXT_STATE (ctx, SendHBToMaster);
      }
    else
      {
        NEXT_STATE (ctx, RecvRequestType);
      }
    break;

      case state::SendReplyToClient:
    er_log_conn (__FILE__, __LINE__, "master::connector->sent_reply_to_client: remove fd = %d\n", ctx->m_conn->fd);
    if (!m_events.remove_descriptor (ctx->m_conn->fd))
      {
        er_log_conn (__FILE__, __LINE__, "master::connector->sent_reply_to_client: m_events->remove_descriptor failed: %s",
             strerror (errno));
        return false;
      }
    this->sent_reply_to_client (ctx);
    /* return here to avoid segfault */
    return true;

      case state::SendHBToMaster:
    NEXT_STATE (ctx, RecvRequestType);
    break;

      default:
    /* impossible ! */
    assert_release (false);
    break;
      }

    /* update */
    if (!ctx->has_data_to_send () && !this->update_epoll_events (ctx))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->handle_master_transmission: update_epoll_events failed: %s",
             strerror (errno));
    return false;
      }
    return true;
  }

  inline bool connector::dispose_master_connection () noexcept
  {
    if (m_master_state == master_state::WAIT_RESPONSE || m_master_state == master_state::ESTABLISHED)
      {
    /* remove the fd which is reset by peer */
    if (!m_events.remove_descriptor (m_context.m_conn->fd))
      {
        er_log_conn (__FILE__, __LINE__,
             "master::connector->dispose_master_connection: m_events->remove_descriptor failed: %s",
             strerror (errno));
        return false;
      }
      }

    if (m_master_state != master_state::CLOSED && m_context.m_conn)
      {
    css_prepare_shutdown_conn (m_context.m_conn);
    css_free_conn (m_context.m_conn);
    m_context.m_conn = nullptr;
      }

    m_context.reset ();
    m_master_state = master_state::CLOSED;

    return true;
  }

  inline bool connector::try_to_reestablish_with_master () noexcept
  {
    fprintf (stderr, "try to re-establish the connection with master...\n");

    if (!this->dispose_master_connection ())
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->try_to_reestablish_with_master: dispose_master_connection failed");
    return false;
      }

    er_log_conn (__FILE__, __LINE__,
         "master::connector->try_to_reestablish_with_master: reestablish the connection with master");

    if (!this->connect (m_master_port))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->try_to_reestablish_with_master: connect failed");
    return false;
      }

    if (!this->prepare_handshake (m_server_name))
      {
    er_log_conn (__FILE__, __LINE__, "master::connector->try_to_reestablish_with_master: prepare_handshake failed");
    /* ensure state goes back to CLOSED and new conn is freed */
    (void) this->dispose_master_connection ();
    return false;
      }

    return true;
  }

  inline bool connector::disconnect (context *ctx) noexcept
  {
    if (ctx->m_conn->fd == m_context.m_conn->fd)
      {
    if (!HA_DISABLED ())
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_HB_PROCESS_EVENT, 2,
            "Disconnected with the cub_master and will shut itself down", "");
        m_stop = true;
        return true;
      }
    /* WAIT RESPONSE or ESTABLISHED */
    this->try_to_reestablish_with_master ();

    return true;
      }

    return this->dispose_connection (ctx);
  }

  inline bool connector::execute () noexcept
  {
    std::array<epoll_event, 512> events;
    context *ctx;
    int nfds, i;

    while (!m_stop)
      {
    nfds = m_events.wait (events.data (), events.size (), 5 * 1000 /* timeout for re-establish */);
    if (nfds < 0)
      {
        if (errno == EINTR)
          {
        continue;
          }
        er_log_conn (__FILE__, __LINE__, "master::connector->execute: m_events->wait failed: %s", strerror (errno));
        assert_release (false);
        continue;
      }

    if (__builtin_expect (m_master_state == master_state::CLOSED, 0))
      {
        /* re-establish the connection with master if it died */
        this->try_to_reestablish_with_master ();
      }

    for (i = 0; i < nfds; i++)
      {
        assert (events[i].data.ptr);

        ctx = reinterpret_cast<context *> (events[i].data.ptr);
        /* handle hangup/error first to avoid writes on dead sockets */
        if (events[i].events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR) && ctx->m_conn->fd != m_eventfd)
          {
        er_log_conn (__FILE__, __LINE__, "master::connector->execute: master connection closed: %s", strerror (errno));
        if (!this->disconnect (ctx))
          {
            return false;
          }
        continue;
          }
        if (events[i].events & EPOLLIN)
          {
        if (ctx->m_conn->fd == m_eventfd)
          {
            /* finalize */
            return true;
          }
        if (ctx->has_data_to_send ())
          {
            /* don't read while there is pending data to send */
          }
        else if (!this->handle_master_reception (ctx))
          {
            er_log_conn (__FILE__, __LINE__, "master::connector->execute: handle_master_reception failed: %d\n", 0);
            if (!this->disconnect (ctx))
              {
            return false;
              }
            continue;
          }
          }
        if (events[i].events & EPOLLOUT)
          {
        if (!this->handle_master_transmission (ctx))
          {
            er_log_conn (__FILE__, __LINE__, "master::connector->execute: handle_master_transmission failed");
            if (!this->disconnect (ctx))
              {
            return false;
              }
            continue;
          }
          }
      }
      }
    return true;
  }
}