File connection_pool.cpp¶
File List > connection > connection_pool.cpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* connection_pool.cpp
*/
#include <numeric>
#include <cmath>
#include <chrono>
#include <csignal>
#include <cstddef>
#include <cstdint>
#include <unistd.h>
#include <stdint.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <utility>
#include "resources.hpp"
#include "thread_manager.hpp"
#include "connection_pool.hpp"
#include "connection_worker.hpp"
#include "server_support.h"
#include "system_parameter.h"
#include "error_manager.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace cubconn::connection
{
pool::pool () :
m_max_connections (-1),
m_max_connection_workers (-1),
m_min_connection_workers (-1)
{
m_watcher = std::make_shared<thread_watcher> ();
m_watcher->active = 0;
}
pool::~pool ()
{
}
void pool::initialize (std::uint32_t max_connections, int max_connection_workers, int min_connection_workers)
{
(void) os_set_signal_handler (SIGPIPE, SIG_IGN);
(void) os_set_signal_handler (SIGFPE, SIG_IGN);
max_connection_workers = this->initialize_topology (max_connection_workers);
if (min_connection_workers > max_connection_workers)
{
min_connection_workers = max_connection_workers;
}
this->lock_resource ();
this->initialize_freelist (max_connections);
this->initialize_coordinator (max_connection_workers, min_connection_workers);
this->initialize_workers (max_connection_workers, min_connection_workers);
this->release_resource ();
/* request to start coordinating */
this->start_coordinator ();
m_max_connections = max_connections;
m_max_connection_workers = max_connection_workers;
m_min_connection_workers = min_connection_workers;
}
void pool::finalize ()
{
this->finalize_workers ();
this->finalize_coordinator ();
/* acquire the lock or kill itself */
this->try_to_lock_resource ();
m_workers.clear ();
this->finalize_freelist ();
this->release_resource ();
this->finalize_topology ();
m_max_connections = -1;
m_max_connection_workers = -1;
m_min_connection_workers = -1;
}
void pool::dispatch (css_conn_entry *conn)
{
coordinator::message request;
request.type = coordinator::message_type::NEW_CLIENT;
request.conn = conn;
m_coordinator->enqueue (std::move (request));
if (!m_coordinator->notify ())
{
assert_release (false);
}
}
void pool::lock_resource ()
{
m_mutex.lock ();
#if !defined (NDEBUG)
m_mutex_holder = std::this_thread::get_id ();
#endif
}
void pool::release_resource ()
{
#if !defined (NDEBUG)
m_mutex_holder = std::thread::id ();
#endif
m_mutex.unlock ();
}
context *pool::claim_context ()
{
freelist *head;
assert (m_mutex_holder == std::this_thread::get_id ());
head = m_freelist.m_head;
if (head)
{
m_freelist.m_head = m_freelist.m_head->m_next;
}
else
{
head = new freelist (32 * 1024);
}
m_freelist.m_claim++;
return &head->m_context;
}
void pool::retire_context (context *ctx)
{
freelist *head;
assert (m_mutex_holder == std::this_thread::get_id ());
head = reinterpret_cast<freelist *> (ctx);
head->m_context.reset ();
if (m_freelist.m_claim > m_freelist.m_max)
{
delete head;
}
else
{
head->m_next = m_freelist.m_head;
m_freelist.m_head = head;
}
m_freelist.m_claim--;
}
std::vector<std::unique_ptr<worker>> &pool::get_workers ()
{
assert (m_mutex_holder == std::this_thread::get_id ());
return m_workers;
}
void pool::try_to_lock_resource ()
{
int i;
for (i = 0; i < 1000; i++)
{
if (m_mutex.try_lock ())
{
break;
}
thread_sleep (10); /* 10 ms */
}
/* timeout */
if (i == 1000)
{
er_log_debug (ARG_FILE_LINE, "could not stop coordinator");
_exit (0);
}
#if !defined (NDEBUG)
m_mutex_holder = std::this_thread::get_id ();
#endif
}
void pool::initialize_freelist (std::uint32_t max_connections)
{
freelist *head;
std::size_t i;
assert (m_mutex_holder == std::this_thread::get_id ());
m_freelist.m_head = nullptr;
m_freelist.m_claim = 0;
m_freelist.m_max = static_cast<std::size_t> (static_cast<float> (max_connections) * /* margin */ 1.1);
for (i = 0; i < m_freelist.m_max; i++)
{
head = m_freelist.m_head;
m_freelist.m_head = new freelist (32 * 1024);
m_freelist.m_head->m_next = head;
}
}
void pool::finalize_freelist ()
{
freelist *head;
assert (m_mutex_holder == std::this_thread::get_id ());
assert (m_freelist.m_claim == 0);
while (m_freelist.m_head)
{
head = m_freelist.m_head;
m_freelist.m_head = m_freelist.m_head->m_next;
delete head;
}
m_freelist.m_max = 0;
m_freelist.m_claim = 0;
}
std::uint32_t pool::initialize_topology (std::uint32_t max_connection_workers)
{
const auto &ctx = os::resources::cpu::effective ();
if (ctx.adjusted_effective && !ctx.adjusted_effective->empty ())
{
std::vector<std::size_t> cores (
ctx.adjusted_effective->begin (),
std::next (ctx.adjusted_effective->begin (),
std::min (ctx.adjusted_effective->size (), static_cast<std::size_t> (max_connection_workers)))
);
os::resources::net::map_nic_to_index (cores);
}
return std::min (ctx.adjusted_max, static_cast<std::size_t> (max_connection_workers));
}
void pool::finalize_topology ()
{
}
void pool::initialize_workers (std::uint32_t max_connection_workers, std::uint32_t min_connection_workers)
{
std::vector<std::size_t> cores;
std::uint32_t i;
const auto &ctx = os::resources::cpu::effective ();
assert (m_mutex_holder == std::this_thread::get_id ());
m_workers.reserve (max_connection_workers);
if (ctx.adjusted_effective)
{
cores = *ctx.adjusted_effective;
}
else
{
std::vector<std::size_t> vec (ctx.adjusted_max);
std::iota (vec.begin (), vec.end (), 0);
cores = vec;
}
assert (cores.size () >= max_connection_workers);
for (i = 0; i < max_connection_workers; i++)
{
m_workers.emplace_back (std::make_unique<worker> (this, m_coordinator, m_watcher, cores[i], i));
}
/* pre-warm the connection worker and its queue to avoid a race condition. */
for (std::unique_ptr<worker> &worker : m_workers)
{
for (i = 0; i < static_cast<std::size_t> (worker::queue_type::TYPE_COUNT); i++)
{
worker::message request;
request.type = worker::message_type::START;
if (!worker->enqueue_and_notify (static_cast<worker::queue_type> (i), std::move (request), nullptr,
-1 /* infinite */))
{
assert_release (false);
}
}
}
}
void pool::finalize_workers ()
{
std::chrono::system_clock::time_point deadline, now;
std::chrono::microseconds wait_for (0);
struct timeval *timeout;
bool compelete;
for (auto &worker : m_workers)
{
worker::message request;
request.type = worker::message_type::SHUTDOWN;
worker->enqueue (worker::queue_type::IMMEDIATE, std::move (request));
if (!worker->notify ())
{
assert_release (false);
}
}
/* shutdown timeout */
timeout = css_get_shutdown_timeout ();
deadline = std::chrono::system_clock::time_point (
std::chrono::seconds (timeout->tv_sec) +
std::chrono::microseconds (timeout->tv_usec));
now = std::chrono::system_clock::now ();
if (deadline > now)
{
wait_for = std::chrono::duration_cast<std::chrono::microseconds> (deadline - now);
}
std::unique_lock<std::mutex> lock (m_watcher->mtx);
compelete = m_watcher->cv.wait_for (lock, wait_for, [this] { return m_watcher->active == 1; /* coordinator */ });
lock.unlock ();
if (!compelete)
{
er_log_debug (ARG_FILE_LINE, "could not stop all active connection workers");
_exit (0);
}
}
void pool::initialize_coordinator (std::uint32_t max_connection_workers, std::uint32_t min_connection_workers)
{
std::size_t core;
const auto &ctx = os::resources::cpu::effective ();
if (ctx.adjusted_effective)
{
core = (*ctx.adjusted_effective)[0];
}
else
{
core = 0;
}
m_coordinator = std::make_shared<coordinator> (
this,
m_watcher,
core,
max_connection_workers,
min_connection_workers
);
}
void pool::start_coordinator ()
{
coordinator::message request;
request.type = coordinator::message_type::START;
m_coordinator->enqueue (std::move (request));
if (!m_coordinator->notify ())
{
assert_release (false);
}
}
void pool::finalize_coordinator ()
{
std::chrono::system_clock::time_point deadline, now;
std::chrono::microseconds wait_for (0);
coordinator::message request;
struct timeval *timeout;
bool compelete;
request.type = coordinator::message_type::SHUTDOWN;
m_coordinator->enqueue (std::move (request));
if (!m_coordinator->notify ())
{
assert_release (false);
}
/* shutdown timeout */
timeout = css_get_shutdown_timeout ();
deadline = std::chrono::system_clock::time_point (
std::chrono::seconds (timeout->tv_sec) +
std::chrono::microseconds (timeout->tv_usec));
now = std::chrono::system_clock::now ();
if (deadline > now)
{
wait_for = std::chrono::duration_cast<std::chrono::microseconds> (deadline - now);
}
std::unique_lock<std::mutex> lock (m_watcher->mtx);
compelete = m_watcher->cv.wait_for (lock, wait_for, [this] { return m_watcher->active == 0; });
lock.unlock ();
if (!compelete)
{
er_log_debug (ARG_FILE_LINE, "could not stop coordinator");
_exit (0);
}
}
}