File load_worker_manager.cpp¶
File List > cubrid > src > loaddb > load_worker_manager.cpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* load_worker_manager.cpp - Thread manager of the loaddb session
*/
#include "load_worker_manager.hpp"
#include "load_driver.hpp"
#include "load_session.hpp"
#include "resource_shared_pool.hpp"
#include "thread_manager.hpp"
#include "thread_worker_pool_taskcap.hpp"
#include "xserver_interface.h"
#include <condition_variable>
#include <mutex>
#include <set>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace cubload
{
/*
* cubload::worker_entry_manager
* extends cubthread::entry_manager
*
* description
* Thread entry manager for loaddb worker pool. Main functionality of the entry manager is to keep a pool of
* cubload::driver instances.
* on_create - a driver instance is claimed from the pool and assigned on thread ref
* on_retire - previously stored driver in thread ref, is retired to the pool
*/
class worker_entry_manager : public cubthread::entry_manager
{
public:
explicit worker_entry_manager (unsigned int pool_size);
~worker_entry_manager () override = default;
void on_create (cubthread::entry &context) override;
void on_retire (cubthread::entry &context) override;
private:
resource_shared_pool<driver> m_driver_pool;
};
static std::mutex g_wp_mutex;
static std::condition_variable g_wp_condvar;
std::set<session *> g_active_sessions;
static cubthread::stats_worker_pool_type *g_worker_pool;
static worker_entry_manager *g_wp_entry_manager;
static cubthread::worker_pool_task_capper *g_wp_task_capper;
worker_entry_manager::worker_entry_manager (unsigned int pool_size)
: m_driver_pool (pool_size)
{
//
}
void worker_entry_manager::on_create (cubthread::entry &context)
{
driver *driver = m_driver_pool.claim ();
context.m_loaddb_driver = driver;
context.type = TT_LOADDB;
}
void worker_entry_manager::on_retire (cubthread::entry &context)
{
if (context.m_loaddb_driver == NULL)
{
return;
}
context.m_loaddb_driver->clear ();
m_driver_pool.retire (*context.m_loaddb_driver);
context.m_loaddb_driver = NULL;
context.conn_entry = NULL;
}
bool
worker_manager_try_task (cubthread::entry_task *task)
{
assert (g_worker_pool != NULL);
return g_wp_task_capper->try_task (task);
}
REGISTER_WORKERPOOL (loaddb, []()
{
return prm_get_integer_value (PRM_ID_LOADDB_WORKER_COUNT);
});
void
worker_manager_register_session (session &load_session)
{
g_wp_mutex.lock ();
if (g_active_sessions.empty ())
{
assert (g_worker_pool == NULL);
assert (g_wp_entry_manager == NULL);
unsigned int pool_size = prm_get_integer_value (PRM_ID_LOADDB_WORKER_COUNT);
g_wp_entry_manager = new worker_entry_manager (pool_size);
g_worker_pool = thread_create_stats_worker_pool (pool_size, 1, "loaddb", *g_wp_entry_manager, true);
// m_log = false
g_wp_task_capper = new cubthread::worker_pool_task_capper (g_worker_pool);
}
else
{
assert (g_worker_pool != NULL);
assert (g_wp_entry_manager != NULL);
}
g_active_sessions.insert (&load_session);
g_wp_mutex.unlock ();
}
void
worker_manager_unregister_session (session &load_session)
{
g_wp_mutex.lock ();
if (g_active_sessions.erase (&load_session) != 1)
{
assert (false);
}
// Check if there are any sessions attached to the wp. We are under lock so we are the only ones doing this.
if (g_active_sessions.empty ())
{
// We are the last session so we can safely destroy the worker pool and the manager.
cubthread::get_manager ()->destroy_worker_pool (g_worker_pool);
delete g_wp_entry_manager;
delete g_wp_task_capper;
g_worker_pool = NULL;
g_wp_entry_manager = NULL;
g_wp_task_capper = NULL;
}
g_wp_mutex.unlock ();
g_wp_condvar.notify_one ();
}
void
worker_manager_stop_all ()
{
std::unique_lock<std::mutex> ulock (g_wp_mutex);
if (g_active_sessions.empty ())
{
return;
}
for (auto &it : g_active_sessions)
{
it->interrupt ();
}
auto pred = [] () -> bool
{
return g_active_sessions.empty ();
};
g_wp_condvar.wait (ulock, pred);
}
void
worker_manager_get_stats (UINT64 *stats_out)
{
std::unique_lock<std::mutex> ulock (g_wp_mutex);
if (g_worker_pool != NULL)
{
g_worker_pool->get_stats (stats_out);
}
}
} // namespace cubload