File thread_worker_pool_impl.hpp¶
File List > cubrid > src > thread > thread_worker_pool_impl.hpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* thread_worker_pool_impl.hpp
*/
#ifndef _THREAD_WORKER_POOL_IMPL_HPP_
#define _THREAD_WORKER_POOL_IMPL_HPP_
#if !defined (SERVER_MODE)
#error Wrong module
#endif // not SERVER_MODE
// same module include
#include "thread_task.hpp"
#include "thread_entry.hpp"
#include "thread_worker_pool.hpp"
// cubrid includes
#include "perf.hpp"
#include "resources.hpp"
#include "error_manager.h"
// system includes
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <optional>
#include <algorithm>
#include <vector>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <system_error>
#include <thread>
#include <sstream>
#include <cassert>
#include <cstring>
#include <pthread.h>
#ifndef TASK_COMM_LEN
#define TASK_COMM_LEN 16
#endif
namespace cubthread
{
// cubthread::worker_pool_impl<Stats>
//
// description
// a pool of threads to execute tasks in parallel
// for high-loads (more tasks than threads), stores tasks in queues to be executed when a thread is freed.
// for low-loads (fewer tasks than threads), retires thread when no new tasks are available and creates new
// threads when tasks are added again
// in high-loads, thread context is shared between task
//
// how to use
//
// // define the task
// class custom_task : public task<custom_context>
// {
// void execute (Context &) override { ... }
// void create_context (void) override { ... }
// void retire_context (Context &) override { ... }
// };
//
// // create worker pool
// cubthread::worker_pool<true> thread_pool (...);
// cubthread::worker_pool<false> thread_pool (...);
//
// // push tasks
// for (std::size_t i = 0; i < task_count; i++)
// {
// thread_pool.execute (new custom_task ()); // tasks are deallocated after execution
//
// // if you push more than worker pool can handle, assert is hit; release will wait for task to be pushed
// }
//
// // on destroy, worker pools stops execution (jobs in queue are not executed) and joins any running threads
//
// interface
//
// the worker pool can be partitioned into cores - a middle layer above a group of workers. this is an
// optimization for high-contention systems and only one core can be set if that's not the case.
//
// core manages a number of workers, tracks available resource - free active workers and inactive workers and
// queues tasks that could not be executed immediately.
//
template <bool Stats>
class worker_pool_impl : public worker_pool
{
public:
// forward definition for nested core class
friend class manager;
public:
// forward definition
class core_impl;
class wrapped_task;
class stats_base;
class stats;
virtual ~worker_pool_impl ();
// init
virtual void initialize (std::size_t worker_count, std::size_t core_count) override;
// execute task; execution is guaranteed, even if maximum number of tasks is reached.
void execute (task_type *work_arg) override;
// execute on give core.
virtual void execute_on_core (task_type *work_arg, std::size_t core_hash, bool is_temp = false) override;
// ensure every available worker has a live thread waiting for tasks.
// workers currently executing a task are skipped — they already have a thread.
void warmup (void) override;
// stop worker pool; stop all running threads; discard any tasks in queue
void stop_execution (void) override;
// worker is stopped after stop_execution () is called
bool is_running (void) const override;
// get maximum number of threads that can run concurrently in this worker pool
std::size_t get_worker_count (void) const override;
// get the number of cores
std::size_t get_core_count (void) const override;
// get worker pool statistics
void get_stats (cubperf::stat_value *stats_out) const override;
// log stats to error log file
void er_log_stats (void) const;
bool get_pool_threads () const
{
return m_pool_threads;
}
// context management
// map functions over all running contexts
//
// function signature is:
// cubthread::worker_pool::context_type & (in/out) : running thread context
// bool & (in/out) : input is usually false, output true to stop mapping
// typename ... args (in/out) : variadic arguments based on needs
//
// WARNING:
// this is a dangerous functionality. please note that context retirement and mapping function is not
// synchronized. mapped context may be retired or in process of retirement.
//
// make sure your case is handled properly
//
template <typename Func, typename ... Args>
void map_running_contexts (Func &&func, Args &&... args);
// map functions over all cores
//
// function signature is:
// const cubthread::worker_pool::core & (in) : core
// bool & (in/out) : input is usually false, output true to stop mapping
// typename ... args (in/out) : variadic arguments based on needs
//
template <typename Func, typename ... Args>
void map_cores (Func &&func, Args &&... args);
protected:
worker_pool_impl (std::size_t pool_size, std::size_t core_count, const char *name, entry_manager &entry_mgr,
bool pool_threads = false, wait_seconds idle_timeout = std::chrono::seconds (5));
// override this if want to change core type
virtual std::unique_ptr<core> allocate_core (bool pool_threads);
virtual void allocate_cores (std::size_t core_count);
virtual void assign_workers_to_cores (std::size_t worker_count);
// get next core by policy
virtual std::size_t get_next_core (void);
// get next core by round robin scheduling (default policy)
std::size_t get_round_robin_core_hash (void);
// core variables
std::vector<std::unique_ptr<core>> m_cores;
// maximum number of concurrent workers
std::size_t m_max_workers;
// set to true when stopped
std::atomic<bool> m_stopped;
// [optional] round robin counter used to dispatch tasks on cores
std::atomic<std::size_t> m_round_robin_counter;
};
// worker_pool_impl<Stats>::stats_base
//
// description
// empty if Stats is false
//
template <bool Stats>
class worker_pool_impl<Stats>::stats_base
{
public:
stats_base () = default;
~stats_base () = default;
stats_base (const stats_base &) = delete;
stats_base &operator= (const stats_base &) = delete;
stats_base (stats_base &&) = default;
stats_base &operator= (stats_base &&) = default;
// empty base
};
template <>
class worker_pool_impl<true>::stats_base
{
public:
stats_base ();
~stats_base ();
stats_base (const stats_base &) = delete;
stats_base &operator= (const stats_base &) = delete;
stats_base (stats_base &&other);
stats_base &operator= (stats_base &&other);
// stats
cubperf::statset *statset;
// timing
cubperf::time_point time;
};
// worker_pool_impl<Stats>::core
//
// description
// a worker pool core execution. manages a sub-group of workers.
// acts as middleman between worker pool and workers
//
// task in: execute_task
// task out: execute_task (immediately execute) or get_task_or_become_available (queued task)
//
template <bool Stats>
class worker_pool_impl<Stats>::core_impl : public worker_pool::core
{
friend class worker_pool_impl;
public:
// forward definition of nested class worker
class worker_impl;
virtual ~core_impl (void);
virtual void initialize (std::size_t worker_count) override;
// execute task
void execute_task (task_type *task_p, bool is_temp) override;
// ensure every available worker has a live thread waiting for tasks.
// workers currently executing a task are skipped — they already have a thread.
void warmup (void) override;
// notify workers to stop; if any of core's workers are still running, returns true
bool stop_execution (void) override;
void retire_queued_tasks (void);
// get a task or add worker to free active list (still running, but ready to execute another task)
std::optional<wrapped_task> get_task_or_become_available (worker &worker_arg);
void become_available (worker &worker_arg);
// is worker available?
void check_worker_not_available (const worker &worker_arg);
std::size_t get_worker_count (void) const override;
// temp worker
void register_free_temp_list (worker *w);
void free_all_temp_list ();
// stats
void get_stats (cubperf::stat_value *stats_out) const override;
// context management
// map function to all workers (and their contexts)
template <typename Func, typename ... Args>
void map_running_contexts (bool &stop, Func &&func, Args &&... args) const;
protected:
core_impl (bool pool_threads);
// override this if want to change worker type
virtual std::unique_ptr<worker> allocate_worker (bool is_temp = false);
virtual void allocate_workers (std::size_t worker_count);
virtual void initialize_workers ();
// execute task for method/stored procedure by recursive call; This task is not pooled and executes in a temporary created thread.
virtual void execute_task_as_temp (wrapped_task &&task_ref);
std::vector<std::unique_ptr<worker>> m_workers;
std::vector<worker *> m_available_workers;
std::queue<wrapped_task> m_task_queue;
// mutex to synchronize activity on worker lists
mutable std::mutex m_workers_mutex;
// temporary executed workers for method/stored procedure
std::vector<std::unique_ptr<worker>> m_temp_workers;
std::vector<std::unique_ptr<worker>> m_free_temp_workers;
// mutex to synchronize temp worker lists
mutable std::mutex m_temp_workers_mutex;
};
// worker_pool_impl<Stats>::core_impl::worker_impl
//
// description
// the worker is a worker pool nested class and represents one instance of execution. its purpose is to store the
// context, manage multiple task executions of a single thread and collect statistics.
//
template <bool Stats>
class worker_pool_impl<Stats>::core_impl::worker_impl : public worker_pool::core::worker
{
friend class core_impl;
public:
virtual ~worker_impl (void);
// init
void initialize () override;
// start thread for current worker
void start_thread (void);
bool has_thread (void);
// assign task to worker; wake a running thread or start a new one.
void assign_task (wrapped_task &&task_ref);
// [optional] used only to prestart pooled threads.
void assign_task (void);
// stop execution; if worker has a thread running, returns true
bool stop_execution (void) override;
std::mutex &get_mutex (void)
{
return m_task_mutex;
}
// stats
void get_stats (cubperf::stat_value *stats_out) const override;
// map function to context (if a task is running and if context is available)
//
// note - sometimes a thread has a context assigned, but it is waiting for tasks. if that's the case, the
// function will not be applied, since it is not considered a "running" context.
//
template <typename Func, typename ... Args>
void map_context_if_running (bool &stop, Func &&func, Args &&... args);
protected:
worker_impl (bool is_temp = false);
// run function invoked by spawned thread
void run (void);
// run initialization (creating execution context)
void init_run (void);
// finishing initialization (retiring execution context, worker becomes inactive)
void finish_run (void);
// execute m_wrapped_task
void execute_current_task (void);
// retire m_wrapped_task
void retire_current_task (void);
// get new task from 1. worker pool task queue or 2. wait for incoming tasks
bool get_new_task (void);
context_type *m_context_p; // execution context (same lifetime as spawned thread)
std::optional<wrapped_task> m_wrapped_task; // current task and metadata
// synchronization on task wait
std::condition_variable m_task_cv; // condition variable used to notify when a task is assigned or when
// worker is stopped
std::mutex m_task_mutex; // mutex to protect waiting task condition
bool m_stop; // stop execution (set to true when worker pool is stopped)
bool m_has_thread; // true if worker has a thread running
bool m_is_temp; // true if worker is for temp task
stats_base m_stats; // bool if Stats is false
};
// worker_pool_impl<Stats>::wrapped_task
//
// description
// wrapper task for timing.
//
template <bool Stats>
class worker_pool_impl<Stats>::wrapped_task
{
private:
struct task_only
{
task_type *task;
};
struct task_with_stats
{
task_type *task;
cubperf::time_point time;
// Other stats might be added here
};
using inner_type = typename std::conditional_t<Stats, task_with_stats, task_only>;
public:
explicit wrapped_task (task_type *task_p);
~wrapped_task ();
wrapped_task (const wrapped_task &) = delete;
wrapped_task &operator= (const wrapped_task &) = delete;
wrapped_task (wrapped_task &&other);
wrapped_task &operator= (wrapped_task &&other) = delete;
cubperf::time_point &get_time (void);
// helper
void execute (context_type &thread_ref);
void retire (void);
private:
inner_type m_inner;
};
// statistics
template <bool Stats>
class worker_pool_impl<Stats>::stats
{
public:
enum class id : cubperf::stat_id
{
start_thread = 0,
create_context = 1,
execute_task = 2,
retire_task = 3,
found_in_queue = 4,
wakeup_with_task = 5,
recycle_context = 6,
retire_context = 7,
// must be last
type_count
};
static const cubperf::statset_definition statdef;
static constexpr cubperf::stat_definition make_def (id stat_id, cubperf::stat_definition::type stat_type,
const char *first_name, const char *second_name);
static stats_base create (void);
static void destroy (stats_base &base);
static void time_and_increment (stats_base &base, id stat_id);
static void time_and_increment (stats_base &base, id stat_id, cubperf::duration d);
static void accumulate (const stats_base &base, cubperf::stat_value *where);
static std::size_t get_count (void);
static const char *get_name (std::size_t stat_index);
private:
stats () = delete;
};
// base functions
// system_core_count - return system core counts or 1 (if system core count cannot be obtained).
//
// use it as core count if the task execution must be highly tuned.
// does not return 0
std::size_t system_core_count (void);
// custom worker pool exception handler
void wp_handle_system_error (const char *message, const std::system_error &e);
template <typename Func>
void wp_call_func_throwing_system_error (const char *message, Func &func);
bool wp_is_thread_always_alive_forced ();
void wp_set_force_thread_always_alive ();
} // namespace cubthread
namespace cubthread
{
// worker_pool_impl<Stats>
template <bool Stats>
worker_pool_impl<Stats>::worker_pool_impl (std::size_t pool_size, std::size_t core_count, const char *name,
entry_manager &entry_mgr, bool pool_threads, wait_seconds idle_timeout)
: worker_pool (name, entry_mgr, pool_threads, idle_timeout)
, m_max_workers (pool_size)
, m_stopped (false)
, m_round_robin_counter (0)
{
assert (core_count > 0 && core_count <= pool_size);
// [optional] this option must be useful using perf
if (wp_is_thread_always_alive_forced ())
{
// override pooling/wait time options to keep threads always alive
m_pool_threads = true;
m_idle_timeout.set_infinite_wait ();
}
}
template <bool Stats>
worker_pool_impl<Stats>::~worker_pool_impl ()
{
// not safe to destroy running pools
assert (m_stopped);
}
template <bool Stats>
void
worker_pool_impl<Stats>::initialize (std::size_t worker_count, std::size_t core_count)
{
allocate_cores (core_count);
assign_workers_to_cores (worker_count);
}
template <bool Stats>
void
worker_pool_impl<Stats>::execute (task_type *work_arg)
{
execute_on_core (work_arg, get_next_core ());
}
template <bool Stats>
void
worker_pool_impl<Stats>::execute_on_core (task_type *work_arg, std::size_t core_hash, bool is_temp)
{
std::size_t core_index;
core_index = core_hash % m_cores.size ();
m_cores[core_index]->execute_task (work_arg, is_temp);
}
template <bool Stats>
bool
worker_pool_impl<Stats>::is_running (void) const
{
return !m_stopped;
}
template <bool Stats>
void
worker_pool_impl<Stats>::warmup (void)
{
for (auto &it : m_cores)
{
it->warmup ();
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::stop_execution (void)
{
if (m_stopped.exchange (true))
{
// already stopped
return;
}
else
{
// I am responsible with stopping threads
}
#if defined (NDEBUG)
const std::chrono::seconds time_wait_to_thread_stop (30); // timeout duration = 30 secs on release mode
const std::chrono::milliseconds time_spin_sleep (10); // sleep between spins for 10 milliseconds
#else // DEBUG
const std::chrono::seconds time_wait_to_thread_stop (60); // timeout duration = 60 secs on debug mode
const std::chrono::milliseconds time_spin_sleep (10); // sleep between spins for 10 milliseconds
#endif
auto timeout = std::chrono::system_clock::now () + time_wait_to_thread_stop;
bool has_running_workers;
while (true)
{
// notify all cores to stop
has_running_workers = false;
for (const auto &it : m_cores)
{
// notify all workers to stop. if any worker is still running, is_not_stopped = true is output
has_running_workers |= it->stop_execution ();
}
if (!has_running_workers)
{
// all stopped
break;
}
if (std::chrono::system_clock::now () > timeout)
{
// timed out
assert (false);
break;
}
// sleep for a while to give running threads a chance to finish
std::this_thread::sleep_for (time_spin_sleep);
}
// retire all tasks that have not been executed; at this point, no new tasks are produced
for (const auto &it : m_cores)
{
assert (dynamic_cast<core_impl *> (it.get ()));
static_cast<core_impl *> (it.get ())->retire_queued_tasks ();
}
}
template <bool Stats>
std::size_t
worker_pool_impl<Stats>::get_worker_count (void) const
{
return m_max_workers;
}
template <bool Stats>
std::size_t
worker_pool_impl<Stats>::get_core_count (void) const
{
return m_cores.size ();
}
template <bool Stats>
void
worker_pool_impl<Stats>::get_stats (cubperf::stat_value *stats_out) const
{
for (const auto &it : m_cores)
{
it->get_stats (stats_out);
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::er_log_stats (void) const
{
if constexpr (Stats)
{
std::vector<cubperf::stat_value> statset (stats::get_count (), 0);
std::stringstream ss;
std::memset (statset.data (), 0, stats::get_count () * sizeof (cubperf::stat_value));
get_stats (statset.data ());
ss << "Worker pool statistics: " << m_name << std::endl;
for (std::size_t index = 0; index < stats::get_count (); index++)
{
ss << "\t" << stats::get_name (index) << ": ";
ss << statset[index] << std::endl;
}
_er_log_debug (ARG_FILE_LINE, ss.str ().c_str ());
}
}
template <bool Stats>
template <typename Func, typename ... Args>
void
worker_pool_impl<Stats>::map_running_contexts (Func &&func, Args &&... args)
{
bool stop = false;
for (const auto &it : m_cores)
{
assert (dynamic_cast<core_impl *> (it.get ()));
static_cast<core_impl *> (it.get ())->map_running_contexts (stop, func, args...);
if (stop)
{
// mapping is stopped
return;
}
}
}
template <bool Stats>
template <typename Func, typename ... Args>
void
worker_pool_impl<Stats>::map_cores (Func &&func, Args &&... args)
{
bool stop = false;
for (const auto &it : m_cores)
{
func (*static_cast<core_impl *> (it.get ()), stop, args...);
if (stop)
{
// mapping is stopped
return;
}
}
}
template <bool Stats>
std::unique_ptr<typename worker_pool::core>
worker_pool_impl<Stats>::allocate_core (bool pool_threads)
{
return std::unique_ptr<core> (new core_impl (pool_threads));
}
template <bool Stats>
void
worker_pool_impl<Stats>::allocate_cores (std::size_t core_count)
{
std::size_t it;
m_cores.reserve (core_count);
for (it = 0; it < core_count; it++)
{
m_cores.push_back (allocate_core (m_pool_threads));
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::assign_workers_to_cores (std::size_t worker_count)
{
std::size_t quotient, remainder;
std::size_t it;
quotient = worker_count / m_cores.size ();
remainder = worker_count % m_cores.size ();
for (it = 0; it < remainder; it++)
{
m_cores[it]->set_parent_pool (*this);
m_cores[it]->initialize (quotient + 1);
}
for (; it < m_cores.size (); it++)
{
m_cores[it]->set_parent_pool (*this);
m_cores[it]->initialize (quotient);
}
}
template <bool Stats>
std::size_t
worker_pool_impl<Stats>::get_next_core (void)
{
return get_round_robin_core_hash ();
}
template <bool Stats>
std::size_t
worker_pool_impl<Stats>::get_round_robin_core_hash (void)
{
// cores are not necessarily equal, so we try to preserve the assignments proportional to their size.
// if the worker pool size is 15 and there are four cores, three of them will have four workers and one only three.
// task are dispatched in this order:
//
// core 1 | core 2 | core 3 | core 4
// 1 | 2 | 3 | 4
// 5 | 6 | 7 | 8
// 9 | 10 | 11 | 12
// 13 | 14 | 15 // last one is skipped this round to keep proportions
// 16 | 17 | 18 | 19
// ...
//
// get a core index atomically
std::size_t index;
std::size_t next_index;
while (true)
{
index = m_round_robin_counter;
next_index = index + 1;
if (next_index == m_max_workers)
{
next_index = 0;
}
if (m_round_robin_counter.compare_exchange_strong (index, next_index))
{
// my index is found
break;
}
}
return index;
}
// worker_pool_impl<Stats>::stats_base
inline
worker_pool_impl<true>::stats_base::stats_base ()
: statset (nullptr)
, time ()
{
}
inline
worker_pool_impl<true>::stats_base::~stats_base ()
{
}
inline
worker_pool_impl<true>::stats_base::stats_base (stats_base &&other)
: statset (other.statset)
, time (other.time)
{
other.statset = nullptr;
}
inline worker_pool_impl<true>::stats_base &
worker_pool_impl<true>::stats_base::operator= (stats_base &&other)
{
if (this != &other)
{
delete statset;
statset = other.statset;
time = other.time;
other.statset = nullptr;
}
return *this;
}
// worker_pool_impl<Stats>::core_impl
template <bool Stats>
worker_pool_impl<Stats>::core_impl::core_impl (bool pool_threads)
: worker_pool::core (pool_threads)
{
}
template <bool Stats>
worker_pool_impl<Stats>::core_impl::~core_impl ()
{
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::initialize (std::size_t worker_count)
{
assert (worker_count > 0);
// resources reserve
m_available_workers.reserve (worker_count);
// workers
allocate_workers (worker_count);
initialize_workers ();
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::execute_task (task_type *task_p, bool is_temp)
{
// find an available worker
// 1. one already active is preferable
// 2. inactive will do too
// 3. if no workers, enqueue the task
assert (task_p != nullptr);
worker_impl *refp = nullptr;
if (!m_parent_pool->is_running ())
{
// reject task
task_p->retire ();
return;
}
wrapped_task task_ref (task_p);
std::unique_lock<std::mutex> ulock (m_workers_mutex);
if (!m_available_workers.empty ())
{
refp = static_cast<worker_impl *> (m_available_workers.back ());
m_available_workers.pop_back ();
ulock.unlock ();
assert (refp != nullptr);
refp->assign_task (std::move (task_ref));
}
else
{
if (is_temp)
{
// no need to hold the mutex (prevent deadlock)
ulock.unlock ();
execute_task_as_temp (std::move (task_ref));
}
else
{
// save to queue
m_task_queue.push (std::move (task_ref));
}
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::warmup (void)
{
std::lock_guard<std::mutex> lock (m_workers_mutex);
worker_impl *w;
for (auto it = m_available_workers.begin (); it != m_available_workers.end (); )
{
assert (dynamic_cast<worker_impl *> (*it));
w = static_cast<worker_impl *> (*it);
if (!w->has_thread ())
{
w->assign_task ();
it = m_available_workers.erase (it);
}
else
{
++it;
}
}
}
template <bool Stats>
bool
worker_pool_impl<Stats>::core_impl::stop_execution (void)
{
bool has_running_workers = false;
// stop all temp workers first
{
std::unique_lock<std::mutex> ulock (m_temp_workers_mutex);
for (const auto &it : m_temp_workers)
{
has_running_workers |= it->stop_execution ();
}
}
// tell all workers to stop
{
std::unique_lock<std::mutex> ulock (m_workers_mutex);
for (const auto &it : m_workers)
{
has_running_workers |= it->stop_execution ();
}
}
return has_running_workers;
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::retire_queued_tasks (void)
{
std::unique_lock<std::mutex> ulock (m_workers_mutex);
while (!m_task_queue.empty ())
{
wrapped_task queued_task = std::move (m_task_queue.front ());
m_task_queue.pop ();
queued_task.retire ();
}
}
template <bool Stats>
std::optional<typename worker_pool_impl<Stats>::wrapped_task>
worker_pool_impl<Stats>::core_impl::get_task_or_become_available (worker &worker_arg)
{
std::unique_lock<std::mutex> ulock (m_workers_mutex);
if (!m_task_queue.empty ())
{
wrapped_task queued_task = std::move (m_task_queue.front ());
m_task_queue.pop ();
return std::optional<wrapped_task> (std::in_place, std::move (queued_task));
}
m_available_workers.push_back (&worker_arg);
assert (m_available_workers.size () <= m_workers.size ());
return std::nullopt;
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::become_available (worker &worker_arg)
{
std::unique_lock<std::mutex> ulock (m_workers_mutex);
m_available_workers.push_back (&worker_arg);
assert (m_available_workers.size () <= m_workers.size ());
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::check_worker_not_available (const worker &worker_arg)
{
#if !defined (NDEBUG)
std::unique_lock<std::mutex> ulock (m_workers_mutex);
for (const auto it : m_available_workers)
{
assert (it != &worker_arg);
}
#endif // DEBUG
}
template <bool Stats>
std::size_t
worker_pool_impl<Stats>::core_impl::get_worker_count (void) const
{
return m_workers.size ();
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::register_free_temp_list (worker *w)
{
std::unique_lock<std::mutex> ulock (m_temp_workers_mutex);
auto it = std::find_if (m_temp_workers.begin (), m_temp_workers.end (), [w] (const std::unique_ptr<worker> &p)
{
return p.get () == w;
});
assert (it != m_temp_workers.end ());
m_free_temp_workers.push_back (std::move (*it));
m_temp_workers.erase (it);
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::free_all_temp_list ()
{
std::unique_lock<std::mutex> ulock (m_temp_workers_mutex);
m_free_temp_workers.clear ();
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::get_stats (cubperf::stat_value *stats_out) const
{
{
std::unique_lock<std::mutex> ulock (m_workers_mutex);
for (const auto &it : m_workers)
{
it->get_stats (stats_out);
}
}
{
std::unique_lock<std::mutex> ulock (m_temp_workers_mutex);
for (const auto &it : m_temp_workers)
{
it->get_stats (stats_out);
}
}
}
template <bool Stats>
template <typename Func, typename ... Args>
void
worker_pool_impl<Stats>::core_impl::map_running_contexts (bool &stop, Func &&func, Args &&... args) const
{
{
std::unique_lock<std::mutex> ulock (m_workers_mutex);
for (const auto &worker : m_workers)
{
assert (dynamic_cast<worker_impl *> (worker.get ()));
static_cast<worker_impl *> (worker.get ())->map_context_if_running (stop, func, args...);
if (stop)
{
// stop mapping
return;
}
}
}
{
std::unique_lock<std::mutex> ulock (m_temp_workers_mutex);
for (const auto &worker : m_temp_workers)
{
assert (dynamic_cast<worker_impl *> (worker.get ()));
static_cast<worker_impl *> (worker.get ())->map_context_if_running (stop, func, args...);
if (stop)
{
// stop mapping
return;
}
}
}
}
template <bool Stats>
std::unique_ptr<typename worker_pool::core::worker>
worker_pool_impl<Stats>::core_impl::allocate_worker (bool is_temp)
{
return std::unique_ptr<worker> (new worker_impl (is_temp));
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::allocate_workers (std::size_t worker_count)
{
std::size_t it;
m_workers.reserve (worker_count);
for (it = 0; it < worker_count; it++)
{
m_workers.push_back (allocate_worker (false));
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::initialize_workers ()
{
for (const auto &worker : m_workers)
{
worker->set_parent_core (*this);
if (m_pool_threads)
{
assert (dynamic_cast<worker_impl *> (worker.get ()));
// assign task / start thread
// it will add itself to available workers
static_cast<worker_impl *> (worker.get ())->assign_task ();
}
else
{
// add to available workers
m_available_workers.push_back (worker.get ());
}
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::execute_task_as_temp (wrapped_task &&task_ref)
{
auto w = allocate_worker (true);
w->set_parent_core (*this);
std::lock_guard<std::mutex> ulock (m_temp_workers_mutex);
m_temp_workers.push_back (std::move (w));
static_cast<worker_impl *> (m_temp_workers.back ().get ())->assign_task (std::move (task_ref));
}
// worker_pool_impl<Stats>::core_impl::worker_impl
template <bool Stats>
worker_pool_impl<Stats>::core_impl::worker_impl::worker_impl (bool is_temp)
: worker_pool::core::worker ()
, m_context_p (nullptr)
, m_wrapped_task (std::nullopt)
, m_stop (false)
, m_has_thread (false)
, m_is_temp (is_temp)
, m_stats (stats::create ())
{
}
template <bool Stats>
worker_pool_impl<Stats>::core_impl::worker_impl::~worker_impl (void)
{
stats::destroy (m_stats);
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::initialize (void)
{
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::start_thread (void)
{
assert (m_has_thread);
//
// the next code tries to help visualizing any system errors that can occur during create or detach in debug
// mode
//
// release will basically be reduced to:
// std::thread (&worker::run, this).detach ();
//
std::thread t;
auto lambda_create = [&] (void) -> void { t = std::thread (&worker_impl::run, this); };
auto lambda_detach = [&] (void) -> void { t.detach (); };
wp_call_func_throwing_system_error ("starting thread", lambda_create);
wp_call_func_throwing_system_error ("detaching thread", lambda_detach);
}
template <bool Stats>
bool
worker_pool_impl<Stats>::core_impl::worker_impl::has_thread (void)
{
std::unique_lock<std::mutex> ulock (m_task_mutex);
return m_has_thread;
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::assign_task (wrapped_task &&task_ref)
{
std::unique_lock<std::mutex> ulock (m_task_mutex);
assert (!m_wrapped_task.has_value ());
// save task
m_wrapped_task.emplace (std::move (task_ref));
if (m_is_temp)
{
m_has_thread = true;
assert (m_context_p == nullptr);
start_thread ();
}
if (m_has_thread)
{
// notify waiting thread
ulock.unlock (); // mutex is not needed for notify
m_task_cv.notify_one ();
}
else
{
m_has_thread = true;
ulock.unlock ();
assert (m_context_p == nullptr);
start_thread ();
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::assign_task (void)
{
std::unique_lock<std::mutex> ulock (m_task_mutex);
assert (!m_wrapped_task.has_value ());
assert (!m_is_temp);
if (m_has_thread)
{
ulock.unlock ();
}
else
{
m_has_thread = true;
ulock.unlock ();
assert (m_context_p == nullptr);
start_thread ();
}
}
template <bool Stats>
bool
worker_pool_impl<Stats>::core_impl::worker_impl::stop_execution (void)
{
context_type *context_p = m_context_p;
bool has_thread = false;
if (context_p != nullptr)
{
// notify context to stop
m_parent_core->get_entry_manager ().stop_execution (*context_p);
}
// make sure thread is not waiting for tasks
std::unique_lock<std::mutex> ulock (m_task_mutex);
if (m_has_thread)
{
// this thread is still running
has_thread = true;
}
// stop worker
m_stop = true;
// mutex is not needed for notify
ulock.unlock ();
// The temp worker doesn't wait on task_cv; it executes the task immediately and terminates.
if (!m_is_temp)
{
m_task_cv.notify_one ();
}
return has_thread;
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::get_stats (cubperf::stat_value *stats_out) const
{
stats::accumulate (m_stats, stats_out);
}
template <bool Stats>
template <typename Func, typename ... Args>
void
worker_pool_impl<Stats>::core_impl::worker_impl::map_context_if_running (bool &stop, Func &&func, Args &&... args)
{
if (!m_wrapped_task.has_value ())
{
// not running
return;
}
context_type *ctxp = m_context_p;
if (ctxp != nullptr)
{
func (*ctxp, stop, args...);
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::run (void)
{
// clear the affinity at start
os::resources::cpu::clearaffinity ();
pthread_setname_np (pthread_self (), m_parent_core->get_parent_pool ()->get_name ().c_str ());
// do stuff at the beginning like creating context
init_run ();
// do task and terminate if this is temp worker
if (m_is_temp)
{
execute_current_task ();
finish_run ();
return;
}
if (!m_wrapped_task.has_value ())
{
// started without task; get one
if (get_new_task ())
{
assert (m_wrapped_task.has_value ());
}
}
if (m_wrapped_task.has_value ())
{
// loop and execute as many tasks as possible
do
{
execute_current_task ();
}
while (get_new_task ());
}
else
{
// never got a task
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::init_run (void)
{
// safe-guard - we have a thread
assert (m_has_thread);
#if !defined (NDEBUG)
// safe-guard - threads should [no longer] be available
if (!m_is_temp)
{
assert (dynamic_cast<core_impl *> (m_parent_core));
static_cast<core_impl *> (m_parent_core)->check_worker_not_available (*this);
}
#endif
// stats: start thread
stats::time_and_increment (m_stats, stats::id::start_thread);
// a context is required
m_context_p = &m_parent_core->get_entry_manager ().create_context ();
// stats: context create
stats::time_and_increment (m_stats, stats::id::create_context);
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::finish_run (void)
{
assert (!m_wrapped_task.has_value ());
assert (m_context_p != nullptr);
// retire context
m_parent_core->get_entry_manager ().retire_context (*m_context_p);
m_context_p = nullptr;
// stats: context retire
stats::time_and_increment (m_stats, stats::id::retire_context);
if (m_is_temp)
{
assert (dynamic_cast<core_impl *> (m_parent_core));
static_cast<core_impl *> (m_parent_core)->register_free_temp_list (this);
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::execute_current_task (void)
{
assert (m_wrapped_task.has_value ());
// execute task
m_wrapped_task->execute (*m_context_p);
// stats: execute task
stats::time_and_increment (m_stats, stats::id::execute_task);
// and retire task
retire_current_task ();
// and recycle context before getting another task
m_parent_core->get_entry_manager ().recycle_context (*m_context_p);
// stats: context recycle
stats::time_and_increment (m_stats, stats::id::recycle_context);
// notify core one task was finished
if (m_is_temp == false)
{
assert (dynamic_cast<core_impl *> (m_parent_core));
static_cast<core_impl *> (m_parent_core)->free_all_temp_list ();
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::core_impl::worker_impl::retire_current_task (void)
{
assert (m_wrapped_task.has_value ());
// retire task
m_wrapped_task->retire ();
m_wrapped_task = std::nullopt;
// stats: retire task
stats::time_and_increment (m_stats, stats::id::retire_task);
}
template <bool Stats>
bool
worker_pool_impl<Stats>::core_impl::worker_impl::get_new_task (void)
{
assert (!m_wrapped_task.has_value ());
assert (dynamic_cast<core_impl *> (m_parent_core));
std::unique_lock<std::mutex> ulock (m_task_mutex, std::defer_lock);
// check stop condition
if (!m_stop)
{
// get a queued task or wait for one to come
// either get a queued task or add to free active list
// note: returned task cannot be saved directly to m_task_p. if worker is added to wait queue and nullptr is returned,
// current thread may be preempted. worker is then claimed from free active list and worker is assigned
// a task. this changes expected behavior and can have unwanted consequences.
std::optional<wrapped_task> queued_task =
static_cast<core_impl *> (m_parent_core)->get_task_or_become_available (*this);
if (queued_task.has_value ())
{
// stats: found in queue
stats::time_and_increment (m_stats, stats::id::found_in_queue);
// it is safe to set here
m_wrapped_task.emplace (std::move (*queued_task));
return true;
}
// wait for task
ulock.lock ();
if (!m_wrapped_task.has_value () && !m_stop)
{
// wait until a task is received or stopped ...
// ... or time out
condvar_wait (m_task_cv, ulock, m_parent_core->get_parent_pool ()->get_idle_timeout (),
[this] () -> bool { return m_wrapped_task.has_value () || m_stop; });
}
else
{
// no need to wait
}
}
else
{
// we need to add to available list
static_cast<core_impl *> (m_parent_core)->become_available (*this);
ulock.lock ();
}
// did I get a task?
if (!m_wrapped_task.has_value ())
{
// no; this thread will stop. from this point forward, if a new task is assigned, a new thread must be spawned
m_has_thread = false;
// we need to retire context before another thread uses this worker
finish_run ();
return false;
}
else
{
// unlock mutex
ulock.unlock ();
// safe-guard - threads should no longer be available
static_cast<core_impl *> (m_parent_core)->check_worker_not_available (*this);
// stats: wake up with task
stats::time_and_increment (m_stats, stats::id::wakeup_with_task);
// found task
return true;
}
}
// worker_pool_impl<Stats>::wrapped_task
template <bool Stats>
worker_pool_impl<Stats>::wrapped_task::wrapped_task (task_type *task_p)
{
assert (task_p != nullptr);
m_inner.task = task_p;
if constexpr (Stats)
{
m_inner.time = cubperf::clock::now ();
}
}
template <bool Stats>
worker_pool_impl<Stats>::wrapped_task::~wrapped_task (void)
{
assert (m_inner.task == nullptr);
}
template <bool Stats>
worker_pool_impl<Stats>::wrapped_task::wrapped_task (wrapped_task &&other)
{
m_inner.task = other.m_inner.task;
other.m_inner.task = nullptr;
if constexpr (Stats)
{
m_inner.time = other.m_inner.time;
}
}
template <bool Stats>
cubperf::time_point &
worker_pool_impl<Stats>::wrapped_task::get_time (void)
{
static_assert (Stats, "get_time() requires Stats == true");
return m_inner.time;
}
template <bool Stats>
void
worker_pool_impl<Stats>::wrapped_task::execute (context_type &thread_ref)
{
assert (m_inner.task != nullptr);
m_inner.task->execute (thread_ref);
}
template <bool Stats>
void
worker_pool_impl<Stats>::wrapped_task::retire (void)
{
assert (m_inner.task != nullptr);
m_inner.task->retire ();
m_inner.task = nullptr;
}
// statistics
template <bool Stats>
constexpr cubperf::stat_definition worker_pool_impl<Stats>::stats::make_def (id stat_id,
cubperf::stat_definition::type stat_type, const char *first_name, const char *second_name)
{
return cubperf::stat_definition (static_cast<cubperf::stat_id> (stat_id), stat_type, first_name, second_name);
}
template <bool Stats>
inline const cubperf::statset_definition worker_pool_impl<Stats>::stats::statdef =
{
make_def (id::start_thread, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_start_thread", "Timer_start_thread"),
make_def (id::create_context, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_create_context", "Timer_create_context"),
make_def (id::execute_task, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_execute_task", "Timer_execute_task"),
make_def (id::retire_task, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_retire_task", "Timer_retire_task"),
make_def (id::found_in_queue, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_found_task_in_queue", "Timer_found_task_in_queue"),
make_def (id::wakeup_with_task, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_wakeup_with_task", "Timer_wakeup_with_task"),
make_def (id::recycle_context, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_recycle_context", "Timer_recycle_context"),
make_def (id::retire_context, cubperf::stat_definition::COUNTER_AND_TIMER,
"Counter_retire_context", "Timer_retire_context")
};
template <bool Stats>
typename worker_pool_impl<Stats>::stats_base
worker_pool_impl<Stats>::stats::create (void)
{
stats_base base;
if constexpr (Stats)
{
base.statset = statdef.create_statset ();
base.time = cubperf::clock::now ();
}
return base;
}
template <bool Stats>
void
worker_pool_impl<Stats>::stats::destroy (stats_base &base)
{
if constexpr (Stats)
{
delete base.statset;
base.statset = nullptr;
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::stats::time_and_increment (stats_base &base, id stat_id)
{
if constexpr (Stats)
{
base.statset->m_timept = base.time;
statdef.time_and_increment (*base.statset, static_cast<cubperf::stat_id> (stat_id));
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::stats::time_and_increment (stats_base &base, id stat_id, cubperf::duration d)
{
if constexpr (Stats)
{
base.statset->m_timept = base.time;
statdef.time_and_increment (*base.statset, static_cast<cubperf::stat_id> (stat_id), d);
}
}
template <bool Stats>
void
worker_pool_impl<Stats>::stats::accumulate (const stats_base &base, cubperf::stat_value *where)
{
if constexpr (Stats)
{
statdef.add_stat_values_with_converted_timers<std::chrono::microseconds> (*base.statset, where);
}
}
template <bool Stats>
std::size_t
worker_pool_impl<Stats>::stats::get_count (void)
{
if constexpr (Stats)
{
return statdef.get_value_count ();
}
else
{
return 0;
}
}
template <bool Stats>
const char *
worker_pool_impl<Stats>::stats::get_name (std::size_t stat_index)
{
if constexpr (Stats)
{
return statdef.get_value_name (stat_index);
}
else
{
return nullptr;
}
}
// base functions
template <typename Func>
void
wp_call_func_throwing_system_error (const char *message, Func &func)
{
#if !defined (NDEBUG)
try
{
#endif
func (); // no exception catching on release
#if !defined (NDEBUG)
}
catch (const std::system_error &e)
{
wp_handle_system_error (message, e);
}
#endif
}
} // namespace cubthread
#endif // _THREAD_WORKER_POOL_IMPL_HPP_