CUBRID Engine  latest
thread_worker_pool.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * thread_worker_pool.hpp
21  */
22 
23 #ifndef _THREAD_WORKER_POOL_HPP_
24 #define _THREAD_WORKER_POOL_HPP_
25 
26 // same module include
27 #include "thread_task.hpp"
28 #include "thread_waiter.hpp"
29 
30 // cubrid includes
31 #include "perf_def.hpp"
32 #include "extensible_array.hpp"
33 
34 // system includes
35 #include <atomic>
36 #include <chrono>
37 #include <condition_variable>
38 #include <forward_list>
39 #include <list>
40 #include <memory>
41 #include <mutex>
42 #include <queue>
43 #include <string>
44 #include <system_error>
45 #include <thread>
46 
47 #include <cassert>
48 #include <cstring>
49 
50 namespace cubthread
51 {
52  // cubtread::worker_pool<Context>
53  //
54  // templates
55  // Context - thread context; a class to cache helpful information for task execution
56  //
57  // description
58  // a pool of threads to execute tasks in parallel
59  // for high-loads (more tasks than threads), stores tasks in queues to be executed when a thread is freed.
60  // for low-loads (fewer tasks than threads), retires thread when no new tasks are available and creates new
61  // threads when tasks are added again
62  // in high-loads, thread context is shared between task
63  //
64  // how to use
65  // // note that worker_pool must be specialized with a thread context
66  //
67  // // define the thread context; for CUBRID, that is usually cubthread::entry
68  // class custom_context { ... };
69  //
70  // // then define the task
71  // class custom_task : public task<custom_context>
72  // {
73  // void execute (Context &) override { ... }
74  // void create_context (void) override { ... }
75  // void retire_context (Context &) override { ... }
76  // };
77  //
78  // // create worker pool
79  // cubthread::worker_pool<custom_context> thread_pool (THREAD_COUNT, MAX_TASKS);
80  //
81  // // push tasks
82  // for (std::size_t i = 0; i < task_count; i++)
83  // {
84  // thread_pool.execute (new custom_task ()); // tasks are deallocated after execution
85  //
86  // // if you push more than worker pool can handle, assert is hit; release will wait for task to be pushed
87  // }
88  //
89  // // on destroy, worker pools stops execution (jobs in queue are not executed) and joins any running threads
90  //
91  //
92  // implementation
93  //
94  // the worker pool can be partitioned into cores - a middle layer above a group of workers. this is an
95  // optimization for high-contention systems and only one core can be set if that's not the case.
96  //
97  // core manages a number of workers, tracks available resource - free active workers and inactive workers and
98  // queues tasks that could not be executed immediately.
99  //
100  // a worker starts by being inactive (does not have a thread running). it spawns a thread on its first task,
101  // becoming active, and stays active as long as it finds more tasks to execute.
102  //
103  // This is how a new task is processed:
104  //
105  // 1. Worker pool assigns the task to a core via round robin method*. The core can then accept the task (if an
106  // available worker is found) or reject it. If task is rejected, then it is stored in a queue to be processed
107  // later.
108  // *round robin scheduling behavior may be overwritten by using execute_on_core instead of execute. it is
109  // recommended to understand how cores and workers work before trying it.
110  // sometimes however, if current tasks may be blocked until other incoming tasks are finished, a more careful
111  // core management is required.
112  //
113  // 2. Core checks if a thread is available
114  // - first checks free active list
115  // - if no free active worker is found, it checks inactive list
116  // - if a worker is found, then it is assigned the task
117  // - if no worker is found, task is saved in queue
118  //
119  // 3. Task is executed by worker in one of three ways:
120  // 3.1. worker was inactive and starts a new thread to execute the task
121  // after it finishes its first task, it tries to find new ones:
122  // 3.2. gets a queued task on its parent core
123  // 3.3. if there is no queue task, notifies core of its status (free and active) and waits for new task.
124  // note: 3.2. and 3.3. together is an atomic operation (protected by mutex)
125  // Worker stops if waiting for new task times out (and becomes inactive).
126  //
127  // NOTE: core class is private nested to worker pool and cannot be instantiated outside it.
128  // worker class is private nested to core class.
129  //
130  // todo:
131  // [Optional] Define a way to stop worker pool, but to finish executing everything it has in queue.
132  //
133  template <typename Context>
134  class worker_pool
135  {
136  public:
137  using context_type = Context;
140 
141  // forward definition
142  class core;
143 
144  worker_pool (std::size_t pool_size, std::size_t task_max_count, context_manager_type &context_mgr,
145  const char *name, std::size_t core_count = 1, bool debug_logging = false, bool pool_threads = false,
146  wait_seconds wait_for_task_time = std::chrono::seconds (5));
147  ~worker_pool ();
148 
149  // try to execute task; executes only if the maximum number of tasks is not reached.
150  // it return true when task is executed, false otherwise
151  bool try_execute (task_type *work_arg);
152 
153  // execute task; execution is guaranteed, even if maximum number of tasks is reached.
154  // read implementation in class comment for details
155  void execute (task_type *work_arg);
156  // execute on give core. real core index is core_hash module core count.
157  // note: regular execute chooses a core through round robin scheduling. this may not be a good fit for all
158  // execution patterns.
159  // execute_on_core provides control on core scheduling.
160  void execute_on_core (task_type *work_arg, std::size_t core_hash);
161 
162  // stop worker pool; stop all running threads; discard any tasks in queue
163  void stop_execution (void);
164 
165  // start all worker threads to be ready for future tasks
166  void start_all_workers (void);
167 
168  // is_running = is not stopped; when created, a worker pool starts running.
169  // worker is stopped after stop_execution () is called
170  bool is_running (void) const;
171 
172  // is_full = the maximum number of tasks is reached
173  bool is_full (void) const;
174 
175  // get maximum number of threads that can run concurrently in this worker pool
176  std::size_t get_max_count (void) const;
177  // get the number of cores
178  std::size_t get_core_count (void) const;
179 
180  // get worker pool statistics
181  // note: the statistics are collected from all cores and all their workers adding up all local statistics
182  void get_stats (cubperf::stat_value *stats_out) const;
183 
184  // log stats to error log file
185  void er_log_stats (void) const;
186 
187  inline bool is_pooling_threads () const
188  {
189  return m_pool_threads;
190  }
191  inline const wait_seconds &get_wait_for_task_time () const
192  {
193  return m_wait_for_task_time;
194  }
195 
197  // context management
199 
200  // map functions over all running contexts
201  //
202  // function signature is:
203  // cubthread::worker_pool::context_type & (in/out) : running thread context
204  // bool & (in/out) : input is usually false, output true to stop mapping
205  // typename ... args (in/out) : variadic arguments based on needs
206  //
207  // WARNING:
208  // this is a dangerous functionality. please note that context retirement and mapping function is not
209  // synchronized. mapped context may be retired or in process of retirement.
210  //
211  // make sure your case is handled properly
212  //
213  template <typename Func, typename ... Args>
214  void map_running_contexts (Func &&func, Args &&... args);
215 
216  // map functions over all cores
217  //
218  // function signature is:
219  // const cubthread::worker_pool::core & (in) : core
220  // bool & (in/out) : input is usually false, output true to stop mapping
221  // typename ... args (in/out) : variadic arguments based on needs
222  //
223  template <typename Func, typename ... Args>
224  void map_cores (Func &&func, Args &&... args);
225 
226  private:
227  using atomic_context_ptr = std::atomic<context_type *>;
228 
229  // forward definition for nested core class; he's a friend
230  friend class core;
231 
232  // get next core by round robin scheduling
233  std::size_t get_round_robin_core_hash (void);
234 
235  // maximum number of concurrent workers
236  std::size_t m_max_workers;
237 
238  // work queue to store tasks that cannot be immediately executed
239  std::size_t m_task_max_count;
240  std::atomic<std::size_t> m_task_count;
241 
242  // thread context manager
244 
245  // core variables
246  core *m_core_array; // all cores
247  std::size_t m_core_count; // core count
248  std::atomic<std::size_t> m_round_robin_counter; // round robin counter used to dispatch tasks on cores
249 
250  // set to true when stopped
251  std::atomic<bool> m_stopped;
252 
253  // true to do debug logging
254  bool m_log;
255 
256  // true to start threads at init
258 
259  // transition time period between active and inactive
261 
262  std::string m_name;
263  };
264 
265  // worker_pool<Context>::core
266  //
267  // description
268  // a worker pool core execution. manages a sub-group of workers.
269  // acts as middleman between worker pool and workers
270  //
271  template <typename Context>
272  class worker_pool<Context>::core
273  {
274  public:
275  using context_type = Context;
278 
279  // forward definition of nested class worker
280  class worker;
281 
282  // init function
283  void init_pool_and_workers (worker_pool<Context> &parent, std::size_t worker_count);
284 
285  // interface for worker pool
286  // task management
287  // execute task; returns true if task is accepted, false if it is rejected (no available workers)
288  void execute_task (task_type *task_p);
289  // context management
290  // map function to all workers (and their contexts)
291  template <typename Func, typename ... Args>
292  void map_running_contexts (bool &stop, Func &&func, Args &&... args) const;
293  // worker management
294  // notify workers to stop; if any of core's workers are still running, outputs is_not_stopped = true
295  void notify_stop (bool &is_not_stopped);
296  void retire_queued_tasks (void);
297 
298  void start_all_workers (void);
299 
300  // statistics
301  void get_stats (cubperf::stat_value *sum_inout) const;
302 
303  // interface for workers
304  // task management
305  void finished_task_notification (void);
306  // worker management
307  // get a task or add worker to free active list (still running, but ready to execute another task)
308  task_type *get_task_or_become_available (worker &worker_arg);
309  void become_available (worker &worker_arg);
310  // is worker available?
311  void check_worker_not_available (const worker &worker_arg);
312  // context management
313  context_manager<context_type> &get_context_manager (void);
314 
315  // getters
316  std::size_t get_max_worker_count (void) const;
317  inline worker_pool_type *get_parent_pool (void) const
318  {
319  return m_parent_pool;
320  }
321 
322  private:
323 
324  friend worker_pool;
325 
326  // ctor/dtor
327  core ();
328  ~core (void);
329 
330  worker_pool_type *m_parent_pool; // pointer to parent pool
331  std::size_t m_max_workers; // maximum number of workers running at once
332  worker *m_worker_array; // all core workers
334  std::size_t m_available_count;
335  std::queue<task_type *> m_task_queue; // list of tasks pushed while all workers were occupied
336  std::mutex m_workers_mutex; // mutex to synchronize activity on worker lists
337  };
338 
339  // worker_pool<Context>::worker
340  //
341  // description
342  // the worker is a worker pool nested class and represents one instance of execution. its purpose is to store the
343  // context, manage multiple task executions of a single thread and collect statistics.
344  //
345  // how it works
346  // the worker is assigned a task and a new thread is started. when task is finished, the worker tries to execute
347  // more tasks, either by consuming one from task queue or by waiting for one. if it waits too long and it is given
348  // no task, the thread stops
349  //
350  // there are two types of workers in regard with the thread status:
351  //
352  // 1. inactive worker (initial state), thread is not running and must be started before executing task
353  // 2. active worker, either executing task or waiting for a new task.
354  //
355  // there are three ways task is executed:
356  //
357  // 1. by an inactive worker; it goes through next phases:
358  // - claiming from inactive list of workers
359  // - starting thread
360  // - claiming context
361  // - executing task
362  //
363  // 2. by an active worker (thread is running); it goes through next phases:
364  // - claiming from active list of workers
365  // - notifying and waking thread
366  // - executing task
367  //
368  // 3. by being claimed from task queue; if no worker (active or inactive) is available, task is queued on core
369  // to be executed when a worker finishes its current task; it goes through next phases
370  // - adding task to queue
371  // - claiming task from queue
372  // - executing task
373  //
374  // a more sensible part of task execution is pushing task to running thread, due to limit cases. there are up to
375  // three threads implicated into this process:
376  //
377  // 1. task pusher
378  // - whenever worker is claimed from free active list, the task is directly assigned (m_task_p is set)!
379  // (as a consequence, waiting thread cannot reject the task)
380  //
381  // 2. worker pool stopper
382  // - thread requests worker pool to stop. the signal is passed to all cores and workers; including workers
383  // that are waiting for tasks. that is signaled using m_stop field
384  //
385  // 3. task waiter
386  // the thread "lingers" waiting for new arriving tasks. the worker first adds itself to core's free active
387  // list and then waits until one of next conditions happen
388  // - task is assigned (m_task_p is not nil). it executes the task (regardless of m_stop).
389  // - thread is stopped (m_stop is true).
390  // - wait times out
391  // after waking up, if no task was assigned up to this point, the thread will attempt to remove its worker
392  // from free active list. a task may yet be assigned until the worker is removed from this list; if worker is
393  // not found to be removed, it means the worker was claimed and a task was pushed or is being pushed. thread
394  // is forced to wait for assigned task.
395  // if worker is removed successfully from free active list, its thread will stop and it will be added to
396  // inactive list.
397  // note that after wake up, m_stop does not affect the course of action in any way, only m_task_p. In most
398  // cases, m_stop being true will be equivalent to m_task_p being nil. in the very limited case when m_stop is
399  // true and a task is also assigned, we let the worker execute the task.
400  //
401  // note
402  // class is nested to worker pool and cannot be used outside it
403  //
404  template <typename Context>
405  class worker_pool<Context>::core::worker
406  {
407  public:
409 
410  worker (void);
411  ~worker (void);
412 
413  // init
414  void init_core (core_type &parent);
415 
416  // assign task (can be NULL) to running thread or start thread
417  void assign_task (task<Context> *work_p, cubperf::time_point push_time);
418  // start thread for current worker
419  void start_thread (void);
420  // run task on current thread (push_time is provided by core)
421  void push_task_on_running_thread (task<Context> *work_p, cubperf::time_point push_time);
422  // stop execution; if worker has a thread running, it outputs is_not_stopped = true
423  void stop_execution (bool &is_not_stopped);
424 
425  // map function to context (if a task is running and if context is available)
426  //
427  // note - sometimes a thread has a context assigned, but it is waiting for tasks. if that's the case, the
428  // function will not be applied, since it is not considered a "running" context.
429  //
430  template <typename Func, typename ... Args>
431  void map_context_if_running (bool &stop, Func &&func, Args &&... args);
432 
433  // add own stats to given argument
434  void get_stats (cubperf::stat_value *sum_inout) const;
435 
437  {
438  return m_task_mutex;
439  }
440  bool has_thread (void)
441  {
442  return m_has_thread;
443  }
444  void set_has_thread (void)
445  {
446  m_has_thread = true;
447  }
448  void set_push_time_now (void)
449  {
450  m_push_time = cubperf::clock::now ();
451  }
452 
453  private:
454 
455  // run function invoked by spawned thread
456  void run (void);
457  // run initialization (creating execution context)
458  void init_run (void);
459  // finishing initialization (retiring execution context, worker becomes inactive)
460  void finish_run (void);
461  // execute m_task_p
462  void execute_current_task (void);
463  // retire m_task_p
464  void retire_current_task (void);
465  // get new task from 1. worker pool task queue or 2. wait for incoming tasks
466  bool get_new_task (void);
467 
468  core_type *m_parent_core; // parent core
469  Context *m_context_p; // execution context (same lifetime as spawned thread)
470 
471  task_type *m_task_p; // current task
472 
473  // synchronization on task wait
474  std::condition_variable m_task_cv; // condition variable used to notify when a task is assigned or when
475  // worker is stopped
476  std::mutex m_task_mutex; // mutex to protect waiting task condition
477  bool m_stop; // stop execution (set to true when worker pool is stopped)
478  bool m_has_thread; // true if worker has a thread running
479 
480  // statistics
481  cubperf::statset &m_statistics; // statistic collector
482  cubperf::time_point m_push_time; // push time point (provided by core)
483  };
484 
486  // statistics
488 
489  // collected workers
498 
503  std::size_t wp_worker_statset_get_count (void);
504  const char *wp_worker_statset_get_name (std::size_t stat_index);
505 
507  // other functions
509 
510  // system_core_count - return system core counts or 1 (if system core count cannot be obtained).
511  //
512  // use it as core count if the task execution must be highly tuned.
513  // does not return 0
514  std::size_t system_core_count (void);
515 
516  // custom worker pool exception handler
517  void wp_handle_system_error (const char *message, const std::system_error &e);
518  template <typename Func>
519  void wp_call_func_throwing_system_error (const char *message, Func &func);
520 
521  // dump worker pool statistics to error log
522  void wp_er_log_stats (const char *header, cubperf::stat_value *statsp);
523 
526 
527  /************************************************************************/
528  /* Template/inline implementation */
529  /************************************************************************/
530 
532  // worker_pool implementation
534 
535  template <typename Context>
536  worker_pool<Context>::worker_pool (std::size_t pool_size, std::size_t task_max_count,
537  context_manager_type &context_mgr, const char *name, std::size_t core_count,
538  bool debug_log, bool pool_threads, wait_seconds wait_for_task_time)
539  : m_max_workers (pool_size)
540  , m_task_max_count (task_max_count)
541  , m_task_count (0)
542  , m_context_manager (context_mgr)
543  , m_core_array (NULL)
544  , m_core_count (core_count)
546  , m_stopped (false)
547  , m_log (debug_log)
548  , m_pool_threads (pool_threads)
549  , m_wait_for_task_time (wait_for_task_time)
550  , m_name (name == NULL ? "" : name)
551  {
552  // initialize cores; we'll try to distribute pool evenly to all cores. if core count is not fully contained in
553  // pool size, some cores will have one additional worker
554 
555  if (m_core_count == 0)
556  {
557  assert (false);
558  m_core_count = 1;
559  }
560 
561  if (m_core_count > pool_size)
562  {
563  m_core_count = pool_size;
564  }
565 
567 
568  std::size_t quotient = m_max_workers / m_core_count;
569  std::size_t remainder = m_max_workers % m_core_count;
570  std::size_t it = 0;
571 
572  for (; it < remainder; it++)
573  {
574  m_core_array[it].init_pool_and_workers (*this, quotient + 1);
575  }
576  for (; it < m_core_count; it++)
577  {
578  m_core_array[it].init_pool_and_workers (*this, quotient);
579  }
580 
582  {
583  // override pooling/wait time options to keep threads always alive
584  m_pool_threads = true;
586  }
587  }
588 
589  template <typename Context>
591  {
592  // not safe to destroy running pools
593  assert (m_stopped);
594 
595  delete [] m_core_array;
596  m_core_array = NULL;
597  }
598 
599  template <typename Context>
600  bool
602  {
603  if (is_full ())
604  {
605  return false;
606  }
607 
608  execute (work_arg);
609  return true;
610  }
611 
612  template <typename Context>
613  void
615  {
617  }
618 
619  template <typename Context>
620  void
621  worker_pool<Context>::execute_on_core (task_type *work_arg, std::size_t core_hash)
622  {
623  // increment task count
624  ++m_task_count;
625 
626  std::size_t core_index = core_hash % m_core_count;
627  m_core_array[core_index].execute_task (work_arg);
628  }
629 
630  template <typename Context>
631  void
633  {
634  if (m_stopped.exchange (true))
635  {
636  // already stopped
637  return;
638  }
639  else
640  {
641  // I am responsible with stopping threads
642  }
643 
644 #if defined (NDEBUG)
645  const std::chrono::seconds time_wait_to_thread_stop (30); // timeout duration = 30 secs on release mode
646  const std::chrono::milliseconds time_spin_sleep (10); // sleep between spins for 10 milliseconds
647 #else // DEBUG
648  const std::chrono::seconds time_wait_to_thread_stop (60); // timeout duration = 60 secs on debug mode
649  const std::chrono::milliseconds time_spin_sleep (10); // sleep between spins for 10 milliseconds
650 #endif
651 
652  // loop until all workers are stopped or until timeout expires
653  std::size_t stop_count = 0;
654  auto timeout = std::chrono::system_clock::now () + time_wait_to_thread_stop;
655 
656  bool is_not_stopped;
657  while (true)
658  {
659  // notify all cores to stop
660  is_not_stopped = false; // assume all are stopped
661  for (std::size_t it = 0; it < m_core_count; it++)
662  {
663  // notify all workers to stop. if any worker is still running, is_not_stopped = true is output
664  m_core_array[it].notify_stop (is_not_stopped);
665  }
666 
667  if (!is_not_stopped)
668  {
669  // all stopped
670  break;
671  }
672 
673  if (std::chrono::system_clock::now () > timeout)
674  {
675  // timed out
676  assert (false);
677  break;
678  }
679 
680  // sleep for a while to give running threads a chance to finish
681  std::this_thread::sleep_for (time_spin_sleep);
682  }
683 
684  // retire all tasks that have not been executed; at this point, no new tasks are produced
685  for (std::size_t it = 0; it < m_core_count; it++)
686  {
688  }
689  }
690 
691  template <typename Context>
692  void
694  {
695  for (std::size_t it = 0; it < m_core_count; it++)
696  {
698  }
699  }
700 
701  template <typename Context>
702  bool
704  {
705  return !m_stopped;
706  }
707 
708  template<typename Context>
709  inline bool
711  {
712  return m_task_count >= m_task_max_count;
713  }
714 
715  template<typename Context>
716  std::size_t
718  {
719  return m_max_workers;
720  }
721 
722  template<typename Context>
723  std::size_t
725  {
726  return m_core_count;
727  }
728 
729  template<typename Context>
730  void
732  {
733  for (std::size_t it = 0; it < m_core_count; it++)
734  {
735  m_core_array[it].get_stats (stats_out);
736  }
737  }
738 
739  template<typename Context>
740  void
742  {
743  if (!m_log)
744  {
745  return;
746  }
747 
748  const std::size_t MAX_SIZE = 32;
749  cubperf::stat_value stats[MAX_SIZE];
750  std::memset (stats, 0, sizeof (stats));
751  get_stats (stats);
752  wp_er_log_stats (m_name.c_str (), stats);
753  }
754 
755  template <typename Context>
756  template <typename Func, typename ... Args>
757  void
759  {
760  bool stop = false;
761  for (std::size_t it = 0; it < m_core_count && !stop; it++)
762  {
763  m_core_array[it].map_running_contexts (stop, func, args...);
764  if (stop)
765  {
766  // mapping is stopped
767  return;
768  }
769  }
770  }
771 
772  template <typename Context>
773  template <typename Func, typename ... Args>
774  void
775  cubthread::worker_pool<Context>::map_cores (Func &&func, Args &&... args)
776  {
777  bool stop = false;
778  const core *core_p;
779  for (std::size_t it = 0; it < m_core_count && !stop; it++)
780  {
781  core_p = &m_core_array[it];
782  func (*core_p, stop, args...);
783  if (stop)
784  {
785  // mapping is stopped
786  return;
787  }
788  }
789  }
790 
791  template <typename Context>
792  std::size_t
794  {
795  // cores are not necessarily equal, so we try to preserve the assignments proportional to their size.
796  // if the worker pool size is 15 and there are four cores, three of them will have four workers and one only three.
797  // task are dispatched in this order:
798  //
799  // core 1 | core 2 | core 3 | core 4
800  // 1 | 2 | 3 | 4
801  // 5 | 6 | 7 | 8
802  // 9 | 10 | 11 | 12
803  // 13 | 14 | 15 // last one is skipped this round to keep proportions
804  // 16 | 17 | 18 | 19
805  // ...
806  //
807 
808  // get a core index atomically
809  std::size_t index;
810  std::size_t next_index;
811 
812  while (true)
813  {
814  index = m_round_robin_counter;
815 
816  next_index = index + 1;
817  if (next_index == m_max_workers)
818  {
819  next_index = 0;
820  }
821 
822  if (m_round_robin_counter.compare_exchange_strong (index, next_index))
823  {
824  // my index is found
825  break;
826  }
827  }
828 
829  return index;
830  }
831 
833  // worker_pool::core
835 
836  template <typename Context>
838  : m_parent_pool (NULL)
839  , m_max_workers (0)
840  , m_worker_array (NULL)
841  , m_available_workers (NULL)
842  , m_available_count (0)
843  , m_task_queue ()
844  , m_workers_mutex ()
845  {
846  //
847  }
848 
849  template <typename Context>
851  {
852  delete [] m_worker_array;
854 
855  delete [] m_available_workers;
857  }
858 
859  template <typename Context>
860  void
862  {
863  assert (worker_count > 0);
864 
865  m_parent_pool = &parent;
866  m_max_workers = worker_count;
867 
868  // allocate workers array
871 
872  for (std::size_t it = 0; it < m_max_workers; it++)
873  {
874  m_worker_array[it].init_core (*this);
876  {
877  // assign task / start thread
878  // it will add itself to available workers
879  m_worker_array[it].assign_task (NULL, cubperf::clock::now ());
880  }
881  else
882  {
883  // add to available workers
885  }
886  }
887  }
888 
889  template <typename Context>
890  void
892  {
893  // decrement task count
895  }
896 
897  template <typename Context>
898  void
900  {
901  assert (task_p != NULL);
902 
903  // find an available worker
904  // 1. one already active is preferable
905  // 2. inactive will do too
906  // 3. if no workers, reject task (returns false)
907 
908  cubperf::time_point push_time = cubperf::clock::now ();
909  worker *refp = NULL;
910 
911  std::unique_lock<std::mutex> ulock (m_workers_mutex);
912 
914  {
915  // reject task
916  task_p->retire ();
917  return;
918  }
919 
920  if (m_available_count > 0)
921  {
923  ulock.unlock ();
924 
925  assert (refp != NULL);
926  refp->assign_task (task_p, push_time);
927  }
928  else
929  {
930  // save to queue
931  m_task_queue.push (task_p);
932  }
933  }
934 
935  template <typename Context>
938  {
939  std::unique_lock<std::mutex> ulock (m_workers_mutex);
940 
941  if (!m_task_queue.empty ())
942  {
943  task_type *task_p = m_task_queue.front ();
944  assert (task_p != NULL);
945  m_task_queue.pop ();
946  return task_p;
947  }
948 
949  m_available_workers[m_available_count++] = &worker_arg;
951  return NULL;
952  }
953 
954  template <typename Context>
955  void
957  {
958  std::unique_lock<std::mutex> ulock (m_workers_mutex);
959  m_available_workers[m_available_count++] = &worker_arg;
961  }
962 
963  template <typename Context>
964  void
966  {
967 #if !defined (NDEBUG)
968  std::unique_lock<std::mutex> ulock (m_workers_mutex);
969 
970  for (std::size_t idx = 0; idx < m_available_count; idx++)
971  {
972  assert (m_available_workers[idx] != &worker_arg);
973  }
974 #endif // DEBUG
975  }
976 
977  template <typename Context>
980  {
982  }
983 
984  template <typename Context>
985  std::size_t
987  {
988  return m_max_workers;
989  }
990 
991  template <typename Context>
992  template <typename Func, typename ... Args>
993  void
994  cubthread::worker_pool<Context>::core::map_running_contexts (bool &stop, Func &&func, Args &&... args) const
995  {
996  for (std::size_t it = 0; it < m_max_workers && !stop; it++)
997  {
998  m_worker_array[it].map_context_if_running (stop, func, args...);
999  if (stop)
1000  {
1001  // stop mapping
1002  return;
1003  }
1004  }
1005  }
1006 
1007  template <typename Context>
1008  void
1010  {
1011  // tell all workers to stop
1012  for (std::size_t it = 0; it < m_max_workers; it++)
1013  {
1014  m_worker_array[it].stop_execution (is_not_stopped);
1015  }
1016  }
1017 
1018  template <typename Context>
1019  void
1021  {
1022  worker *refp = NULL;
1023  const std::size_t AVAILABLE_STACK_DEFAULT_SIZE = 1024;
1025 
1026  // how this works:
1027  //
1028  // we need to start all workers, but we need to consider the fact that some may be already running. what we need
1029  // to do is process the available workers and start threads for all those that don't have a thread started.
1030  //
1031  // workers that already have threads are saved and added back after processing all available workers.
1032  //
1033  // NOTE: this function does not guarantee that at the end all workers have threads. workers that stop their threads
1034  // during processing available workers are not restarted. however, we end up starting all or almost all
1035  // threads, which is good enough.
1036  //
1037 
1038  while (true)
1039  {
1040  // processing is done in two steps:
1041  //
1042  // 1. retrieve worker from available workers holding workers mutex
1043  // 2. verify if worker has a thread started
1044  // 2.1. if it doesn't have a thread, start one
1045  // 2.2. if it does have thread, save it to available_stack.
1046  //
1047 
1048  std::unique_lock<std::mutex> core_lock (m_workers_mutex);
1049  if (m_available_count == 0)
1050  {
1051  break;
1052  }
1054  core_lock.unlock ();
1055 
1056  if (refp->has_thread ())
1057  {
1058  // stack to make available at the end
1059  available_stack.append (&refp, 1);
1060 
1061  // note: this worker's thread may stop soon or may have stopped already. this case is accepted.
1062  }
1063  else
1064  {
1065  // this thread is already stopped and we can start its thread
1066  refp->set_push_time_now ();
1067  refp->set_has_thread ();
1068  refp->start_thread ();
1069  }
1070  }
1071 
1072  // copy all workers having threads back to available array.
1073  if (available_stack.get_size () > 0)
1074  {
1075  std::unique_lock<std::mutex> core_lock (m_workers_mutex);
1076  if (m_available_count > 0)
1077  {
1078  // move current available to make room for older ones
1079  std::memmove (m_available_workers + available_stack.get_size (), m_available_workers,
1080  m_available_count * sizeof (worker *));
1081  }
1082 
1083  // copy from stack at the beginning of m_available_workers
1084  std::memcpy (m_available_workers, available_stack.get_array (), available_stack.get_memsize ());
1085 
1086  // update available count
1087  m_available_count += available_stack.get_size ();
1088  }
1089  }
1090 
1091  template <typename Context>
1092  void
1094  {
1095  std::unique_lock<std::mutex> ulock (m_workers_mutex);
1096 
1097  while (!m_task_queue.empty ())
1098  {
1099  m_task_queue.front ()->retire ();
1100  m_task_queue.pop ();
1101  }
1102  }
1103 
1104  template <typename Context>
1105  void
1107  {
1108  for (std::size_t it = 0; it < m_max_workers; it++)
1109  {
1110  m_worker_array[it].get_stats (stats_out);
1111  }
1112  }
1113 
1115  // worker_pool<Context>::core::worker
1117 
1118  template <typename Context>
1120  : m_parent_core (NULL)
1121  , m_context_p (NULL)
1122  , m_task_p (NULL)
1123  , m_task_cv ()
1124  , m_task_mutex ()
1125  , m_stop (false)
1126  , m_has_thread (false)
1127  , m_statistics (wp_worker_statset_create ())
1128  , m_push_time ()
1129  {
1130  //
1131  }
1132 
1133  template <typename Context>
1135  {
1137  }
1138 
1139  template <typename Context>
1140  void
1142  {
1143  m_parent_core = &parent;
1144  }
1145 
1146  template <typename Context>
1147  void
1149  {
1150  // save push time
1151  m_push_time = push_time;
1152 
1153  std::unique_lock<std::mutex> ulock (m_task_mutex);
1154 
1155  // save task
1156  m_task_p = work_p;
1157 
1158  if (m_has_thread)
1159  {
1160  // notify waiting thread
1161  ulock.unlock (); // mutex is not needed for notify
1162  m_task_cv.notify_one ();
1163  }
1164  else
1165  {
1166  m_has_thread = true;
1167  ulock.unlock ();
1168 
1169  assert (m_context_p == NULL);
1170 
1171  start_thread ();
1172  }
1173  }
1174 
1175  template <typename Context>
1176  void
1178  {
1179  assert (m_has_thread);
1180 
1181  //
1182  // the next code tries to help visualizing any system errors that can occur during create or detach in debug
1183  // mode
1184  //
1185  // release will basically be reduced to:
1186  // std::thread (&worker::run, this).detach ();
1187  //
1188 
1189  std::thread t;
1190 
1191  auto lambda_create = [&] (void) -> void { t = std::thread (&worker::run, this); };
1192  auto lambda_detach = [&] (void) -> void { t.detach (); };
1193 
1194  wp_call_func_throwing_system_error ("starting thread", lambda_create);
1195  wp_call_func_throwing_system_error ("detaching thread", lambda_detach);
1196  }
1197 
1198  template <typename Context>
1199  void
1201  cubperf::time_point push_time)
1202  {
1203  // run on current thread
1204  assert (work_p != NULL);
1205 
1206  m_push_time = push_time;
1207 
1208  // must lock task mutex
1209  std::unique_lock<std::mutex> ulock (m_task_mutex);
1210 
1211  // make sure worker is in a valid state
1212  assert (m_task_p == NULL);
1213  assert (m_context_p != NULL);
1214 
1215  // set task
1216  m_task_p = work_p;
1217 
1218  // notify waiting thread
1219  ulock.unlock (); // mutex is not needed for notify
1220  m_task_cv.notify_one ();
1221  }
1222 
1223  template <typename Context>
1224  void
1226  {
1227  context_type *context_p = m_context_p;
1228 
1229  if (context_p != NULL)
1230  {
1231  // notify context to stop
1232  m_parent_core->get_context_manager ().stop_execution (*context_p);
1233  }
1234 
1235  // make sure thread is not waiting for tasks
1236  std::unique_lock<std::mutex> ulock (m_task_mutex);
1237 
1238  if (m_has_thread)
1239  {
1241  is_not_stopped = true;
1242  }
1243 
1244  m_stop = true; // stop worker
1245  ulock.unlock (); // mutex is not needed for notify
1246 
1247  m_task_cv.notify_one ();
1248  }
1249 
1250  template <typename Context>
1251  void
1253  {
1254  // safe-guard - we have a thread
1255  assert (m_has_thread);
1256 
1257  // safe-guard - threads should [no longer] be available
1258  m_parent_core->check_worker_not_available (*this);
1259 
1260  // thread was started
1262  wp_worker_statset_time_and_increment (m_statistics, Wpstat_start_thread);
1263 
1264  // a context is required
1265  m_context_p = &m_parent_core->get_context_manager ().create_context ();
1266  wp_worker_statset_time_and_increment (m_statistics, Wpstat_create_context);
1267  }
1268 
1269  template <typename Context>
1270  void
1272  {
1273  assert (m_task_p == NULL);
1274  assert (m_context_p != NULL);
1275 
1276  // retire context
1277  m_parent_core->get_context_manager ().retire_context (*m_context_p);
1278  m_context_p = NULL;
1279  wp_worker_statset_time_and_increment (m_statistics, Wpstat_retire_context);
1280  }
1281 
1282  template <typename Context>
1283  void
1285  {
1286  assert (m_task_p != NULL);
1287 
1288  // retire task
1289  m_task_p->retire ();
1290  m_task_p = NULL;
1291  wp_worker_statset_time_and_increment (m_statistics, Wpstat_retire_task);
1292  }
1293 
1294  template <typename Context>
1295  void
1297  {
1298  assert (m_task_p != NULL);
1299 
1300  // execute task
1302  wp_worker_statset_time_and_increment (m_statistics, Wpstat_execute_task);
1303 
1304  // and retire task
1306 
1307  // and recycle context before getting another task
1308  m_parent_core->get_context_manager ().recycle_context (*m_context_p);
1309  wp_worker_statset_time_and_increment (m_statistics, Wpstat_recycle_context);
1310 
1311  // notify core one task was finished
1312  m_parent_core->finished_task_notification ();
1313  }
1314 
1315  template <typename Context>
1316  bool
1318  {
1319  assert (m_task_p == NULL);
1320 
1321  std::unique_lock<std::mutex> ulock (m_task_mutex, std::defer_lock);
1322 
1323  // check stop condition
1324  if (!m_stop)
1325  {
1326  // get a queued task or wait for one to come
1327 
1328  // either get a queued task or add to free active list
1329  // note: returned task cannot be saved directly to m_task_p. if worker is added to wait queue and NULL is returned,
1330  // current thread may be preempted. worker is then claimed from free active list and worker is assigned
1331  // a task. this changes expected behavior and can have unwanted consequences.
1332  task_type *task_p = m_parent_core->get_task_or_become_available (*this);
1333  if (task_p != NULL)
1334  {
1335  wp_worker_statset_time_and_increment (m_statistics, Wpstat_found_in_queue);
1336 
1337  // it is safe to set here
1338  m_task_p = task_p;
1339  return true;
1340  }
1341 
1342  // wait for task
1343  ulock.lock ();
1344  if (m_task_p == NULL && !m_stop)
1345  {
1346  // wait until a task is received or stopped ...
1347  // ... or time out
1348  condvar_wait (m_task_cv, ulock, m_parent_core->get_parent_pool ()->get_wait_for_task_time (),
1349  [this] () -> bool { return m_task_p != NULL || m_stop; });
1350  }
1351  else
1352  {
1353  // no need to wait
1354  }
1355  }
1356  else
1357  {
1358  // we need to add to available list
1359  m_parent_core->become_available (*this);
1360 
1361  ulock.lock ();
1362  }
1363 
1364  // did I get a task?
1365  if (m_task_p == NULL)
1366  {
1367  // no; this thread will stop. from this point forward, if a new task is assigned, a new thread must be spawned
1368  m_has_thread = false;
1369 
1370  // finish_run; we neet to retire context before another thread uses this worker
1371  m_statistics.m_timept = cubperf::clock::now ();
1372  finish_run ();
1373 
1374  return false;
1375  }
1376  else
1377  {
1378  // unlock mutex
1379  ulock.unlock ();
1380 
1381  // safe-guard - threads should no longer be available
1382  m_parent_core->check_worker_not_available (*this);
1383 
1384  // found task
1386  wp_worker_statset_time_and_increment (m_statistics, Wpstat_wakeup_with_task);
1387  return true;
1388  }
1389  }
1390 
1391  template <typename Context>
1392  void
1394  {
1395  task_type *task_p = NULL;
1396 
1397  init_run (); // do stuff at the beginning like creating context
1398 
1399  if (m_task_p == NULL)
1400  {
1401  // started without task; get one
1402  if (get_new_task ())
1403  {
1404  assert (m_task_p != NULL);
1405  }
1406  }
1407 
1408  if (m_task_p != NULL)
1409  {
1410  // loop and execute as many tasks as possible
1411  do
1412  {
1414  }
1415  while (get_new_task ());
1416  }
1417  else
1418  {
1419  // never got a task
1420  }
1421 
1422  // finish_run (); // do stuff on end like retiring context
1423  }
1424 
1425  template <typename Context>
1426  void
1428  {
1430  }
1431 
1432  template <typename Context>
1433  template <typename Func, typename ... Args>
1434  void
1435  worker_pool<Context>::core::worker::map_context_if_running (bool &stop, Func &&func, Args &&... args)
1436  {
1437  if (m_task_p == NULL)
1438  {
1439  // not running
1440  return;
1441  }
1442 
1443  Context *ctxp = m_context_p;
1444 
1445  if (ctxp != NULL)
1446  {
1447  func (*ctxp, stop, args...);
1448  }
1449  }
1450 
1452  // other functions
1454 
1455  template <typename Func>
1456  void
1457  wp_call_func_throwing_system_error (const char *message, Func &func)
1458  {
1459 #if !defined (NDEBUG)
1460  try
1461  {
1462 #endif // DEBUG
1463 
1464  func (); // no exception catching on release
1465 
1466 #if !defined (NDEBUG)
1467  }
1468  catch (const std::system_error &e)
1469  {
1470  wp_handle_system_error (message, e);
1471  }
1472 #endif // DEBUG
1473  }
1474 
1475 } // namespace cubthread
1476 
1477 
1478 
1479 
1480 #endif // _THREAD_WORKER_POOL_HPP_
static const cubperf::stat_id Wpstat_recycle_context
void check_worker_not_available(const worker &worker_arg)
static const cubperf::stat_id Wpstat_start_thread
std::atomic< bool > m_stopped
std::size_t wp_worker_statset_get_count(void)
bool is_running(void) const
worker_pool(std::size_t pool_size, std::size_t task_max_count, context_manager_type &context_mgr, const char *name, std::size_t core_count=1, bool debug_logging=false, bool pool_threads=false, wait_seconds wait_for_task_time=std::chrono::seconds(5))
void become_available(worker &worker_arg)
void wp_er_log_stats(const char *header, cubperf::stat_value *statsp)
static API_MUTEX mutex
Definition: api_util.c:72
void wp_worker_statset_destroy(cubperf::statset &stats)
generic_value< false > stat_value
Definition: perf_def.hpp:46
bool try_execute(task_type *work_arg)
void execute(task_type *work_arg)
void wp_call_func_throwing_system_error(const char *message, Func &func)
void wp_set_force_thread_always_alive()
void notify_stop(bool &is_not_stopped)
std::size_t stat_id
Definition: perf_def.hpp:50
void map_running_contexts(bool &stop, Func &&func, Args &&...args) const
void wp_worker_statset_time_and_increment(cubperf::statset &stats, cubperf::stat_id id)
static const cubperf::stat_id Wpstat_create_context
virtual void retire(void)
Definition: thread_task.hpp:75
std::size_t get_round_robin_core_hash(void)
static const cubperf::stat_id Wpstat_wakeup_with_task
std::atomic< context_type * > atomic_context_ptr
static const cubperf::stat_id Wpstat_found_in_queue
virtual void execute(context_type &)=0
size_t get_size(void) const
#define assert(x)
clock::time_point time_point
Definition: perf_def.hpp:40
std::atomic< std::size_t > m_round_robin_counter
std::size_t get_core_count(void) const
static const cubperf::stat_id Wpstat_retire_context
static const cubperf::stat_id Wpstat_retire_task
task_type * get_task_or_become_available(worker &worker_arg)
void map_cores(Func &&func, Args &&...args)
void condvar_wait(std::condition_variable &condvar, std::unique_lock< std::mutex > &lock, const wait_duration< D > &duration)
typename worker_pool< Context >::core core_type
void er_log_stats(void) const
#define NULL
Definition: freelistheap.h:34
const char * wp_worker_statset_get_name(std::size_t stat_index)
void get_stats(cubperf::stat_value *sum_inout) const
void get_stats(cubperf::stat_value *stats_out) const
bool wp_is_thread_always_alive_forced()
const wait_seconds & get_wait_for_task_time() const
std::size_t system_core_count(void)
std::atomic< std::size_t > m_task_count
std::queue< task_type * > m_task_queue
void map_running_contexts(Func &&func, Args &&...args)
void push_task_on_running_thread(task< Context > *work_p, cubperf::time_point push_time)
context_manager< context_type > & get_context_manager(void)
void wp_handle_system_error(const char *message, const std::system_error &e)
void execute_task(task_type *task_p)
void stop_execution(bool &is_not_stopped)
std::size_t get_max_count(void) const
context_manager_type & m_context_manager
void append(const T &source)
std::size_t get_max_worker_count(void) const
void get_stats(cubperf::stat_value *sum_inout) const
void execute_on_core(task_type *work_arg, std::size_t core_hash)
cubperf::statset & wp_worker_statset_create(void)
void map_context_if_running(bool &stop, Func &&func, Args &&...args)
const T * get_array(void) const
void assign_task(task< Context > *work_p, cubperf::time_point push_time)
worker_pool_type * get_parent_pool(void) const
static const cubperf::stat_id Wpstat_execute_task
void init_pool_and_workers(worker_pool< Context > &parent, std::size_t worker_count)
void wp_worker_statset_accumulate(const cubperf::statset &what, cubperf::stat_value *where)