File thread_manager.cpp¶
File List > cubrid > src > thread > thread_manager.cpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* thread_manager.cpp - implementation for tracker for all thread resources
*/
#if !defined (SERVER_MODE) && !defined (SA_MODE)
#error Belongs to server module
#endif // not SERVER_MODE and not SA_MODE
#include "thread_manager.hpp"
// same module includes
#if defined (SERVER_MODE)
#include "thread_daemon.hpp"
#endif // SERVER_MODE
#include "thread_entry.hpp"
#include "thread_entry_task.hpp"
#if defined (SERVER_MODE)
#include "thread_worker_pool.hpp"
#endif // SERVER_MODE
// project includes
#include "error_manager.h"
#include "log_impl.h"
#include "lock_free.h"
#include "lockfree_transaction_system.hpp"
#include "resource_shared_pool.hpp"
#include "system_parameter.h"
#include "resources.hpp"
#include <cassert>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace cubthread
{
thread_local entry *tl_Entry_p = NULL;
manager::manager (void)
: m_max_threads (0)
, m_entries_mutex ()
, m_worker_pools ()
, m_daemons ()
, m_all_entries (NULL)
, m_entry_dispatcher (NULL)
, m_available_entries_count (0)
, m_lf_tran_sys (NULL)
{
}
manager::~manager ()
{
// pool container should be empty by now
assert (m_available_entries_count == m_max_threads);
// make sure that we stop and free all
check_all_killed ();
delete m_entry_dispatcher;
delete [] m_all_entries;
delete m_lf_tran_sys;
}
void
manager::alloc_entries (void)
{
#if defined (SA_MODE)
assert (false);
return;
#else // not SA_MODE = SERVER_MODE
assert (m_all_entries == nullptr);
assert (m_entry_dispatcher == nullptr);
m_available_entries_count = m_max_threads;
m_all_entries = new entry[m_max_threads];
m_entry_dispatcher = new entry_dispatcher (m_all_entries, m_max_threads);
#endif // not SA_MODE = SERVER_MODE
}
void
manager::init_entries (bool with_lock_free)
{
// initialize thread indexes and lock-free resources
for (std::size_t it = 0; it < m_max_threads; it++)
{
m_all_entries[it].index = (int) (it + 1);
if (with_lock_free)
{
m_all_entries[it].request_lock_free_transactions ();
m_all_entries[it].assign_lf_tran_index (m_lf_tran_sys->assign_index ());
}
}
}
void
manager::init_lockfree_system ()
{
#if defined (SERVER_MODE)
// threads + main
m_lf_tran_sys = new lockfree::tran::system (m_max_threads + 1);
#else // !SERVER_MODE = SA_MODE
m_lf_tran_sys = new lockfree::tran::system (1); // a single thread = main
#endif // !SERVER_MODE = SA_MODE
}
daemon *
manager::create_daemon (const looper &looper_arg, entry_task *exec_p,
const char *daemon_name /* = "" */, entry_manager *entry_mgr /* = NULL */)
{
#if defined (SERVER_MODE)
assert (m_daemons.size () <= daemon_registry_t::count ());
if (entry_mgr == NULL)
{
entry_mgr = &m_daemon_entry_manager;
}
// reserve 1 entry and add to m_daemons
return create_and_track_resource<daemon> (m_daemons, 1, looper_arg, entry_mgr, exec_p, daemon_name);
#else // not SERVER_MODE = SA_MODE
assert (false);
return NULL;
#endif // not SERVER_MODE = SA_MODE
}
daemon *
manager::create_daemon_without_entry (const looper &looper_arg, task_without_context *exec_p, const char *daemon_name)
{
#if defined (SERVER_MODE)
// reserve no entry and add to m_daemons_without_entries
return create_and_track_resource<daemon> (m_daemons_without_entries, 0, looper_arg, exec_p, daemon_name);
#else // not SERVER_MODE = SA_MODE
assert (false);
return NULL;
#endif // not SERVER_MODE = SA_MODE
}
void
manager::push_task (worker_pool *worker_pool_arg, entry_task *exec_p)
{
if (worker_pool_arg == NULL)
{
// execute on this thread
exec_p->execute (get_entry ());
exec_p->retire ();
}
else
{
#if defined (SERVER_MODE)
check_not_single_thread ();
worker_pool_arg->execute (exec_p);
#else // not SERVER_MODE = SA_MODE
assert (false);
// execute on this thread
exec_p->execute (get_entry ());
exec_p->retire ();
#endif // not SERVER_MODE = SA_MODE
}
}
void
manager::push_task_on_core (worker_pool *worker_pool_arg, entry_task *exec_p, std::size_t core_hash,
bool method_mode = false)
{
if (worker_pool_arg == NULL)
{
// execute on this thread
exec_p->execute (get_entry ());
exec_p->retire ();
}
else
{
#if defined (SERVER_MODE)
check_not_single_thread ();
worker_pool_arg->execute_on_core (exec_p, core_hash, method_mode);
#else // not SERVER_MODE = SA_MODE
assert (false);
// execute on this thread
exec_p->execute (get_entry ());
exec_p->retire ();
#endif // not SERVER_MODE = SA_MODE
}
}
void
manager::destroy_daemon (daemon *&daemon_arg)
{
#if defined (SERVER_MODE)
if (daemon_arg == NULL)
{
return;
}
// remove from m_daemons and free one thread entry
return destroy_and_untrack_resource (m_daemons, daemon_arg, 1);
#else // not SERVER_MODE = SA_MODE
assert (daemon_arg == NULL);
#endif // not SERVER_MODE = SA_MODE
}
void
manager::destroy_daemon_without_entry (daemon *&daemon_arg)
{
#if defined (SERVER_MODE)
if (daemon_arg == NULL)
{
return;
}
// remove from m_daemons_without_entries; no thread entries have been reserved
return destroy_and_untrack_resource (m_daemons_without_entries, daemon_arg, 0);
#else // not SERVER_MODE = SA_MODE
assert (daemon_arg == NULL);
#endif // not SERVER_MODE = SA_MODE
}
entry *
manager::claim_entry (void)
{
tl_Entry_p = m_entry_dispatcher->claim ();
return tl_Entry_p;
}
void
manager::retire_entry (entry &entry_p)
{
assert (tl_Entry_p == &entry_p);
tl_Entry_p = NULL;
m_entry_dispatcher->retire (entry_p);
}
std::size_t
manager::get_max_thread_count (void) const
{
return m_max_threads;
}
void
manager::check_all_killed (void)
{
// check all thread resources are killed and freed
destroy_and_untrack_all_resources (m_worker_pools);
destroy_and_untrack_all_resources (m_daemons);
destroy_and_untrack_all_resources (m_daemons_without_entries);
}
void
manager::set_max_thread_count_from_config (void)
{
m_max_threads = cubbase::count_registry<connection>::total () + cubbase::count_registry<worker_pool>::total () +
cubbase::count_registry<daemon>::total () + 1 /* PAD */;
}
void
manager::set_max_thread_count (std::size_t count)
{
m_max_threads = count;
}
void
manager::return_lock_free_transaction_entries (void)
{
for (std::size_t index = 0; index < m_max_threads; index++)
{
m_all_entries[index].return_lock_free_transaction_entries ();
m_lf_tran_sys->free_index (m_all_entries[index].pull_lf_tran_index ());
}
}
entry *
manager::find_by_tid (thread_id_t tid)
{
for (std::size_t index = 0; index < m_max_threads; index++)
{
if (m_all_entries[index].get_id () == tid)
{
return &m_all_entries[index];
}
}
return NULL;
}
// Global thread interface
#if defined (SERVER_MODE)
const bool Is_single_thread = false;
#else // not SERVER_MODE = SA_MODE
const bool Is_single_thread = true;
#endif // not SERVER_MODE = SA_MODE
static manager *Manager = NULL;
static entry *Main_entry_p = NULL;
void
initialize (entry *&my_entry)
{
// note - currently it is designed to be called only once. if we want repeatable calls, code must be updated.
os::resources::initialize ();
assert (my_entry == NULL);
assert (Manager == NULL);
if (Manager == NULL)
{
Manager = new manager ();
}
// init main entry
assert (Main_entry_p == NULL);
Main_entry_p = new entry ();
Main_entry_p->type = TT_MASTER;
Main_entry_p->index = 0;
Main_entry_p->register_id ();
Main_entry_p->m_status = entry::status::TS_RUN;
Main_entry_p->resume_status = THREAD_RESUME_NONE;
Main_entry_p->tran_index = 0; /* system transaction */
#if defined (SERVER_MODE)
// SA_MODE uses singleton context
Main_entry_p->get_error_context ().register_thread_local ();
#endif // SERVER_MODE
assert (tl_Entry_p == NULL);
tl_Entry_p = Main_entry_p;
my_entry = Main_entry_p;
assert (my_entry == thread_get_thread_entry_info ());
#if defined (SERVER_MODE)
if (prm_get_bool_value (PRM_ID_PERF_TEST_MODE))
{
// perf tool needs threads to be always alive to work
wp_set_force_thread_always_alive ();
}
#endif // SERVER_MODE
}
void
finalize (void)
{
#if defined (SERVER_MODE)
if (Main_entry_p != NULL)
{
Main_entry_p->get_error_context ().deregister_thread_local ();
}
#endif // SERVER_MODE
delete Main_entry_p;
Main_entry_p = NULL;
tl_Entry_p = NULL;
delete Manager;
Manager = NULL;
}
int
initialize_thread_entries (bool with_lock_free /* = true*/)
{
assert (Main_entry_p != NULL);
int error_code = NO_ERROR;
#if defined (SERVER_MODE)
size_t old_manager_thread_count = 0;
assert (Manager != NULL);
Manager->set_max_thread_count_from_config ();
Manager->alloc_entries ();
#endif // SERVER_MODE
// note: even though SA_MODE does not really need to synchronize access on lock-free structures, it is better to
// simulate using lock-free transaction in order to avoid managing separate code
error_code = lf_initialize_transaction_systems ((int) get_max_thread_count ());
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
return error_code;
}
Manager->init_lockfree_system ();
if (with_lock_free)
{
Main_entry_p->request_lock_free_transactions ();
Main_entry_p->assign_lf_tran_index (Manager->get_lockfree_transys ().assign_index ());
}
Manager->init_entries (with_lock_free);
return NO_ERROR;
}
manager *
get_manager (void)
{
assert (Manager != NULL);
return Manager;
}
std::size_t
get_max_thread_count (void)
{
// system thread + managed threads
return 1 + (Manager != NULL ? Manager->get_max_thread_count () : 0);
}
entry &
get_entry (void)
{
// shouldn't be called
// todo: add thread_p to error manager; or something
// er_print_callstack (ARG_FILE_LINE, "warning: manager::get_entry is called");
// todo
assert (tl_Entry_p != NULL);
return *tl_Entry_p;
}
void
set_thread_local_entry (entry &tl_entry)
{
assert (tl_Entry_p == NULL);
tl_Entry_p = &tl_entry;
}
void
clear_thread_local_entry (void)
{
assert (tl_Entry_p != NULL);
tl_Entry_p = NULL;
}
bool
is_single_thread (void)
{
return Is_single_thread;
}
void
check_not_single_thread (void)
{
assert (!Is_single_thread);
}
void
return_lock_free_transaction_entries (void)
{
if (Main_entry_p != NULL)
{
Main_entry_p->return_lock_free_transaction_entries ();
}
if (Manager != NULL)
{
Manager->return_lock_free_transaction_entries ();
}
}
bool
is_logging_configured (const int logging_flag)
{
return flag<int>::is_flag_set (prm_get_integer_value (PRM_ID_THREAD_LOGGING_FLAG), logging_flag);
}
} // namespace cubthread