23 #ifndef _THREAD_WORKER_POOL_HPP_ 24 #define _THREAD_WORKER_POOL_HPP_ 37 #include <condition_variable> 38 #include <forward_list> 44 #include <system_error> 133 template <
typename Context>
145 const char *name, std::size_t core_count = 1,
bool debug_logging =
false,
bool pool_threads =
false,
146 wait_seconds wait_for_task_time = std::chrono::seconds (5));
213 template <
typename Func,
typename ... Args>
223 template <
typename Func,
typename ... Args>
224 void map_cores (Func &&func, Args &&... args);
271 template <
typename Context>
291 template <
typename Func,
typename ... Args>
295 void notify_stop (
bool &is_not_stopped);
296 void retire_queued_tasks (
void);
305 void finished_task_notification (
void);
309 void become_available (
worker &worker_arg);
311 void check_worker_not_available (
const worker &worker_arg);
316 std::size_t get_max_worker_count (
void)
const;
319 return m_parent_pool;
404 template <
typename Context>
419 void start_thread (
void);
430 template <
typename Func,
typename ... Args>
431 void map_context_if_running (
bool &stop, Func &&func, Args &&... args);
450 m_push_time = cubperf::clock::now ();
458 void init_run (
void);
460 void finish_run (
void);
462 void execute_current_task (
void);
464 void retire_current_task (
void);
466 bool get_new_task (
void);
518 template <
typename Func>
535 template <
typename Context>
538 bool debug_log,
bool pool_threads,
wait_seconds wait_for_task_time)
572 for (; it < remainder; it++)
589 template <
typename Context>
599 template <
typename Context>
612 template <
typename Context>
619 template <
typename Context>
630 template <
typename Context>
645 const std::chrono::seconds time_wait_to_thread_stop (30);
646 const std::chrono::milliseconds time_spin_sleep (10);
648 const std::chrono::seconds time_wait_to_thread_stop (60);
649 const std::chrono::milliseconds time_spin_sleep (10);
653 std::size_t stop_count = 0;
654 auto timeout = std::chrono::system_clock::now () + time_wait_to_thread_stop;
660 is_not_stopped =
false;
673 if (std::chrono::system_clock::now () > timeout)
681 std::this_thread::sleep_for (time_spin_sleep);
691 template <
typename Context>
701 template <
typename Context>
708 template<
typename Context>
715 template<
typename Context>
722 template<
typename Context>
729 template<
typename Context>
739 template<
typename Context>
748 const std::size_t MAX_SIZE = 32;
750 std::memset (stats, 0,
sizeof (stats));
755 template <
typename Context>
756 template <
typename Func,
typename ... Args>
761 for (std::size_t it = 0; it <
m_core_count && !stop; it++)
772 template <
typename Context>
773 template <
typename Func,
typename ... Args>
779 for (std::size_t it = 0; it <
m_core_count && !stop; it++)
782 func (*core_p, stop, args...);
791 template <
typename Context>
810 std::size_t next_index;
816 next_index = index + 1;
836 template <
typename Context>
838 : m_parent_pool (
NULL)
840 , m_worker_array (
NULL)
841 , m_available_workers (
NULL)
842 , m_available_count (0)
849 template <
typename Context>
859 template <
typename Context>
863 assert (worker_count > 0);
889 template <
typename Context>
897 template <
typename Context>
935 template <
typename Context>
954 template <
typename Context>
963 template <
typename Context>
967 #if !defined (NDEBUG) 977 template <
typename Context>
984 template <
typename Context>
991 template <
typename Context>
992 template <
typename Func,
typename ... Args>
1007 template <
typename Context>
1018 template <
typename Context>
1023 const std::size_t AVAILABLE_STACK_DEFAULT_SIZE = 1024;
1054 core_lock.unlock ();
1059 available_stack.
append (&refp, 1);
1073 if (available_stack.
get_size () > 0)
1091 template <
typename Context>
1104 template <
typename Context>
1118 template <
typename Context>
1120 : m_parent_core (
NULL)
1121 , m_context_p (
NULL)
1126 , m_has_thread (false)
1133 template <
typename Context>
1139 template <
typename Context>
1146 template <
typename Context>
1175 template <
typename Context>
1191 auto lambda_create = [&] (void) ->
void { t = std::thread (&
worker::run,
this); };
1192 auto lambda_detach = [&] (void) ->
void { t.detach (); };
1198 template <
typename Context>
1223 template <
typename Context>
1229 if (context_p !=
NULL)
1232 m_parent_core->get_context_manager ().stop_execution (*context_p);
1241 is_not_stopped =
true;
1250 template <
typename Context>
1269 template <
typename Context>
1282 template <
typename Context>
1294 template <
typename Context>
1315 template <
typename Context>
1321 std::unique_lock<std::mutex> ulock (
m_task_mutex, std::defer_lock);
1391 template <
typename Context>
1425 template <
typename Context>
1432 template <
typename Context>
1433 template <
typename Func,
typename ... Args>
1447 func (*ctxp, stop, args...);
1455 template <
typename Func>
1459 #if !defined (NDEBUG) 1466 #if !defined (NDEBUG) 1468 catch (
const std::system_error &e)
1480 #endif // _THREAD_WORKER_POOL_HPP_ void start_all_workers(void)
bool is_pooling_threads() const
static const cubperf::stat_id Wpstat_recycle_context
void check_worker_not_available(const worker &worker_arg)
static const cubperf::stat_id Wpstat_start_thread
std::atomic< bool > m_stopped
void finished_task_notification(void)
worker_pool_type * m_parent_pool
cubperf::time_point m_push_time
void set_has_thread(void)
void retire_queued_tasks(void)
std::size_t wp_worker_statset_get_count(void)
std::size_t m_max_workers
void execute_current_task(void)
bool is_running(void) const
worker_pool(std::size_t pool_size, std::size_t task_max_count, context_manager_type &context_mgr, const char *name, std::size_t core_count=1, bool debug_logging=false, bool pool_threads=false, wait_seconds wait_for_task_time=std::chrono::seconds(5))
void become_available(worker &worker_arg)
cubperf::statset & m_statistics
void wp_er_log_stats(const char *header, cubperf::stat_value *statsp)
void wp_worker_statset_destroy(cubperf::statset &stats)
void set_push_time_now(void)
generic_value< false > stat_value
bool try_execute(task_type *work_arg)
wait_seconds m_wait_for_task_time
void execute(task_type *work_arg)
void wp_call_func_throwing_system_error(const char *message, Func &func)
void wp_set_force_thread_always_alive()
void notify_stop(bool &is_not_stopped)
std::mutex & get_mutex(void)
void map_running_contexts(bool &stop, Func &&func, Args &&...args) const
void wp_worker_statset_time_and_increment(cubperf::statset &stats, cubperf::stat_id id)
static const cubperf::stat_id Wpstat_create_context
virtual void retire(void)
size_t get_memsize() const
std::size_t get_round_robin_core_hash(void)
static const cubperf::stat_id Wpstat_wakeup_with_task
std::atomic< context_type * > atomic_context_ptr
static const cubperf::stat_id Wpstat_found_in_queue
virtual void execute(context_type &)=0
core_type * m_parent_core
size_t get_size(void) const
worker ** m_available_workers
clock::time_point time_point
std::atomic< std::size_t > m_round_robin_counter
std::size_t get_core_count(void) const
static const cubperf::stat_id Wpstat_retire_context
static const cubperf::stat_id Wpstat_retire_task
task_type * get_task_or_become_available(worker &worker_arg)
void stop_execution(void)
void map_cores(Func &&func, Args &&...args)
void condvar_wait(std::condition_variable &condvar, std::unique_lock< std::mutex > &lock, const wait_duration< D > &duration)
typename worker_pool< Context >::core core_type
void er_log_stats(void) const
const char * wp_worker_statset_get_name(std::size_t stat_index)
void get_stats(cubperf::stat_value *sum_inout) const
void get_stats(cubperf::stat_value *stats_out) const
bool wp_is_thread_always_alive_forced()
const wait_seconds & get_wait_for_task_time() const
std::size_t system_core_count(void)
std::atomic< std::size_t > m_task_count
std::queue< task_type * > m_task_queue
void map_running_contexts(Func &&func, Args &&...args)
void push_task_on_running_thread(task< Context > *work_p, cubperf::time_point push_time)
context_manager< context_type > & get_context_manager(void)
std::mutex m_workers_mutex
void init_core(core_type &parent)
std::size_t m_available_count
void wp_handle_system_error(const char *message, const std::system_error &e)
void execute_task(task_type *task_p)
void stop_execution(bool &is_not_stopped)
std::size_t get_max_count(void) const
void start_all_workers(void)
std::size_t m_task_max_count
context_manager_type & m_context_manager
void append(const T &source)
void retire_current_task(void)
std::size_t m_max_workers
std::condition_variable m_task_cv
std::size_t get_max_worker_count(void) const
void get_stats(cubperf::stat_value *sum_inout) const
void execute_on_core(task_type *work_arg, std::size_t core_hash)
cubperf::statset & wp_worker_statset_create(void)
void map_context_if_running(bool &stop, Func &&func, Args &&...args)
const T * get_array(void) const
void assign_task(task< Context > *work_p, cubperf::time_point push_time)
worker_pool_type * get_parent_pool(void) const
static const cubperf::stat_id Wpstat_execute_task
void init_pool_and_workers(worker_pool< Context > &parent, std::size_t worker_count)
void wp_worker_statset_accumulate(const cubperf::statset &what, cubperf::stat_value *where)