Skip to content

File pl_connection.cpp

File List > cubrid > src > sp > pl_connection.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

#include "pl_connection.hpp"

#include <algorithm> /* std::count_if */
#include <memory> /* std::unique_ptr */
#include <cmath>

#include "boot_sr.h"
#include "error_manager.h"
#include "system_parameter.h"
#include "pl_sr.h" /* pl_server_port(), pl_connect_server() */
#include "pl_comm.h" /* pl_disconnect_server (), pl_ping () */
#include "object_representation.h" /* OR_ */

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace cubpl
{
  // Definitions

  /*********************************************************************
   * connection_pool - definition
   *********************************************************************/

  constexpr int SYSTEM_REVISION = -1;
  constexpr int INITIAL_REVISION = 0;
  constexpr float INCREMENT_FACTOR = 1.5;

  connection_pool::connection_pool (int pool_size)
    : m_pool (pool_size, nullptr)
    , m_epoch (INITIAL_REVISION)
    , m_queue ()
    , m_mutex ()
  {
    assert (pool_size > 0);

    initialize_pool ();
  }

  connection_pool::connection_pool (int pool_size, const std::string &db_name, int pl_port, bool is_for_sys)
    : connection_pool (pool_size)
  {
    m_db_name = db_name;
    m_db_port = pl_port;
    if (is_for_sys)
      {
    m_epoch = SYSTEM_REVISION;
      }
  }

  connection_pool::~connection_pool ()
  {
    cleanup_pool ();
  }

  connection_view
  connection_pool::claim ()
  {
    if (!is_system_pool ())
      {
    if (pl_server_wait_for_ready () != NO_ERROR)
      {
        return nullptr;
      }
      }
    if (m_db_port == PL_PORT_DISABLED)
      {
    m_db_port = pl_server_port_from_info ();
      }

    std::lock_guard<std::mutex> lock (m_mutex);

    // Check if a connection is available in the queue
    auto get_connection_from_queue = [this]() -> connection_view
    {
      if (!m_queue.empty())
    {
      int index = m_queue.front();
      m_queue.pop();

      if (m_pool[index] == nullptr)
        {
          create_new_connection (index);
        }

      return get_connection_view (index);
    }
      return nullptr;
    };

    connection_view conn_view = get_connection_from_queue();
    if (conn_view != nullptr)
      {
    return conn_view;
      }

    // If no slots are available, increase the pool size
    size_t currentSize = m_pool.size();
    size_t newSize = static_cast<size_t> (std::ceil (currentSize * 1.5));
    m_pool.resize (newSize, nullptr);
    er_log_debug (ARG_FILE_LINE, "pl_connection_pool extended: %lld to %lld\n", currentSize, newSize);

    // Create the new connection
    for (size_t i = currentSize; i < newSize; ++i)
      {
    m_queue.push (i);
      }

    return get_connection_from_queue();
  }

  void
  connection_pool::increment_epoch ()
  {
    if (!is_system_pool ())
      {
    m_epoch++;
      }
  }

  int
  connection_pool::get_pool_size () const
  {
    return m_pool.size ();
  }

  int
  connection_pool::get_epoch () const
  {
    return m_epoch;
  }

  const char *
  connection_pool::get_db_name () const
  {
    return m_db_name.c_str();
  }

  int
  connection_pool::get_db_port () const
  {
    return m_db_port;
  }

  void
  connection_pool::set_db_port (int port)
  {
    m_db_port = port;
  }

  bool
  connection_pool::is_system_pool () const
  {
    return (m_epoch.load (std::memory_order::memory_order_relaxed) == SYSTEM_REVISION);
  }

  // private
  void
  connection_pool::retire (connection *conn)
  {
    std::lock_guard<std::mutex> lock (m_mutex);

    m_queue.push (conn->get_index ());

    er_log_debug (ARG_FILE_LINE, "pl_connection_pool: connection retire (%d)\n", conn->get_index ());
  }

  void
  connection_pool::initialize_pool()
  {
    for (int i = 0; i < (int) m_pool.size (); ++i)
      {
    m_pool[i] = nullptr;
    m_queue.push (i); // Pre-fill the queue with indices
      }
  }

  void
  connection_pool::cleanup_pool()
  {
    std::lock_guard<std::mutex> lock (m_mutex);

    for (int i = 0; i < (int) m_pool.size (); ++i)
      {
    if (m_pool[i])
      {
        delete m_pool[i];
        m_pool[i] = nullptr;
      }
      }

    m_pool.clear();
    while (!m_queue.empty())
      {
    m_queue.pop();
      }
  }

  void
  connection_pool::create_new_connection (int index)
  {
    connection *conn = new connection (this, index);
    m_pool[index] = conn;
  }

  connection_view
  connection_pool::get_connection_view (int index)
  {
    connection *conn = m_pool[index];
    return connection_view (conn, [this] (connection *c)
    {
      if (c)
    {
      this->retire (c); // Automatically return connection to the pool
    }
    });
  }

  /*********************************************************************
   * connection - definition
   *********************************************************************/
  connection::connection (connection_pool *pool, int index)
    : m_pool (pool)
    , m_index (index)
    , m_socket (INVALID_SOCKET)
    , m_epoch (pool->get_epoch ())
    , m_error (NO_ERROR)
  {
    //
    do_reconnect ();
  }

  connection::~connection ()
  {
    if (m_socket != INVALID_SOCKET)
      {
    pl_disconnect_server (m_socket);
      }
  }

  bool
  connection::is_connected () const
  {
    return m_socket != INVALID_SOCKET;
  }

  bool
  connection::is_valid () const
  {
    return (m_socket != INVALID_SOCKET) && (m_pool->get_epoch () == m_epoch || m_pool->get_epoch () == SYSTEM_REVISION);
  }

  int
  connection::get_index () const
  {
    return m_index;
  }

  int
  connection::get_epoch () const
  {
    return m_epoch;
  }

  SOCKET
  connection::get_socket () const
  {
    return m_socket;
  }

  int
  connection::get_last_error () const
  {
    return m_error;
  }

  int
  connection::send_buffer (const cubmem::block &blk)
  {
    m_error = NO_ERROR;

    if (!is_valid () || m_pool->is_system_pool ())
      {
    do_reconnect ();
      }

    OR_ALIGNED_BUF (OR_INT_SIZE) a_request;
    char *request = OR_ALIGNED_BUF_START (a_request);

    int request_size = static_cast<int> (blk.dim);
    or_pack_int (request, request_size);

    int nbytes = pl_writen (m_socket, request, OR_INT_SIZE);
    if (nbytes != OR_INT_SIZE)
      {
    return do_handle_network_error (ARG_FILE_LINE, nbytes);
      }

    nbytes = pl_writen (m_socket, blk.ptr, blk.dim);
    if (nbytes != static_cast<int> (blk.dim))
      {
    return do_handle_network_error (ARG_FILE_LINE, nbytes);
      }

    return NO_ERROR;
  }

  int
  connection::receive_buffer (cubmem::block &b)
  {
    return receive_buffer (b, nullptr, -1);
  }

  int
  connection::receive_buffer (cubmem::block &b, const pl_callback_func *interrupt_func, int timeout_ms)
  {
    m_error = NO_ERROR;

    if (!is_valid ())
      {
    return do_handle_network_error (ARG_FILE_LINE, -1);
      }

    int res_size = 0;
    int nbytes;
    int elapsed = 0;

    /* read data length */
    while (true)
      {
    nbytes = pl_readn_with_timeout (m_socket, (char *)&res_size, OR_INT_SIZE, timeout_ms);
    if (nbytes < 0)
      {
        if (errno == ETIMEDOUT)
          {
        if (interrupt_func && (*interrupt_func)() != NO_ERROR)
          {
            m_error = er_errid ();
            return m_error;
          }
        continue;
          }
        return do_handle_network_error (ARG_FILE_LINE, -1);
      }
    if (nbytes != sizeof (int))
      {
        return do_handle_network_error (ARG_FILE_LINE, nbytes);
      }
    else
      {
        break;
      }
      }

    res_size = ntohl (res_size);

    // To avoid invalid res_size is returned by PL server, and to prevent memory exhaustion of cub_server
    constexpr int MAX_BUFFER_SIZE = 10 * 1024 * 1024; // 10MB max size
    if (res_size > MAX_BUFFER_SIZE || res_size < 0)
      {
    return do_handle_network_error (ARG_FILE_LINE, res_size);
      }

    if (res_size == 0)
      {
    // No payload to read
    return NO_ERROR;
      }

    // Step 3: Extend block to fit the incoming data
    cubmem::extensible_block ext_blk;
    ext_blk.extend_to (res_size);

    // Step 4: Read the actual data with optional interrupt handling
    int total_read = 0;
    while (total_read < res_size)
      {
    nbytes = pl_readn_with_timeout (m_socket, ext_blk.get_ptr() + total_read, res_size - total_read, timeout_ms);
    if (errno == ETIMEDOUT && interrupt_func && (*interrupt_func)() != NO_ERROR)
      {
        return er_errid ();
      }
    if (nbytes < 0)
      {
        return do_handle_network_error (ARG_FILE_LINE, nbytes);
      }

    total_read += nbytes;
      }

    // Step 5: Move the data into the block
    cubmem::block blk (res_size, ext_blk.release_ptr());
    b = std::move (blk);

    return NO_ERROR; // Successfully received the buffer
  }

  void
  connection::invalidate ()
  {
    pl_disconnect_server (m_socket);
  }

  // private
  void
  connection::do_reconnect ()
  {
    if (m_socket != INVALID_SOCKET)
      {
    pl_disconnect_server (m_socket);
      }

    int error = pl_connect_server (m_pool->get_db_name (), m_pool->get_db_port (), m_socket);
    if (error != NO_ERROR && !m_pool->is_system_pool ())
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_CANNOT_CONNECT_PL_SERVER, 1, "connect()");
      }

    if (m_socket != INVALID_SOCKET)
      {
    m_epoch = m_pool->get_epoch ();
      }
  }

  int
  connection::do_handle_network_error (const char *file_name, const int line_no, int nbytes)
  {
    (void) invalidate ();

    if (m_pool->is_system_pool ())
      {
    // Do not set error message for system pool
    // To avoid noise in the error log
    m_error = ER_SP_NETWORK_ERROR;
      }
    else
      {
    er_set (ER_ERROR_SEVERITY, file_name, line_no, ER_SP_NETWORK_ERROR, 1, nbytes);
    m_error = er_errid ();
      }

    return m_error;
  }

} // namespace cubpl