CUBRID Engine  latest
thread_manager.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * thread_manager.hpp - interface of tracker for all thread resources
21  */
22 
23 #ifndef _THREAD_MANAGER_HPP_
24 #define _THREAD_MANAGER_HPP_
25 
26 #if !defined (SERVER_MODE) && !defined (SA_MODE)
27 #error Wrong module
28 #endif // not SERVER_MODE and not SA_MODE
29 
30 // same module includes
31 #include "thread_entry.hpp"
32 #include "thread_entry_task.hpp"
33 #include "thread_task.hpp"
34 #include "thread_waiter.hpp"
35 
36 // other module includes
37 #include "base_flag.hpp"
38 
39 #include <mutex>
40 #include <vector>
41 
42 // forward definitions
43 template <typename T>
45 
46 namespace lockfree
47 {
48  namespace tran
49  {
50  class system;
51  }
52 }
53 
54 namespace cubthread
55 {
56 
57  // forward definition
58  template <typename Context>
59  class worker_pool;
60  class looper;
61  class daemon;
63 
64  // alias for worker_pool<entry>
66 
67  // cubthread::manager
68  //
69  // description:
70  // thread and thread context (entry) manager
71  // CUBRID interface for using daemons and worker pools with thread entries
72  //
73  // daemons -
74  // creates, destroys and tracks all daemons
75  // provides thread entries to daemons
76  // available in SERVER_MODE only
77  // see more details in thread_daemon.hpp
78  //
79  // worker pools -
80  // create, destroys and tracks all worker pools
81  // provides thread entries to daemons
82  // available in both SERVER_MODE and SA_MODE; SA_MODE however does not actually create worker pools, but instead
83  // execute required tasks immediately (on current thread)
84  // see more details in thread_worker_pool.hpp
85  //
86  // entries -
87  // creates a pool of entries; pool cannot be extended
88  // reserves entries for daemons and worker pools; if entry resources are depleted, it will refuse to create
89  // additional daemons and worker pools
90  // dispatches entries when worker/daemon threads start execution and manages entry retirement/reuse
91  // note -
92  // thread entries should be seen as thread local variables. however, they are bulky structures that may take
93  // a long time to initialize/finalize, so they are pooled by manager; expensive initialize/finalize are
94  // replaced by claim from pool and retire to pool. note that claim also saves the entry to thread local
95  // pointer to thread_entry (see claim_entry/retire_entry)
96  //
97  // how to use:
98  // 1. daemon -
99  // daemon *my_daemon = cubthread::get_manager ()->create_daemon (daemon_looper, daemon_task_p);
100  // // daemon loops and execute task on each iteration
101  // cubthread::get_manager ()->destroy_daemon (my_daemon);
102  //
103  // 2. entry_workpool -
104  // entry_workpool *my_workpool = cubthread::get_manager ()->create_worker_pool (MAX_THREADS, MAX_JOBS);
105  // cubthread::get_manager ()->push_task (entry_workpool, entry_task_p);
106  // cubthread::get_manager ()->destroy_worker_pool (my_workpool);
107  //
108  class manager
109  {
110  public:
111  manager ();
112  ~manager ();
113 
115  // entry manager
117 
118  void alloc_entries (void);
119  void init_entries (bool with_lock_free = false);
120  void init_lockfree_system ();
121 
123  // worker pool management
125 
126  // create a entry_workpool with pool_size number of threads
127  // notes: if there are not pool_size number of entries available, worker pool is not created and NULL is returned
128  // signature emulates worker_pool constructor signature
129  entry_workpool *create_worker_pool (std::size_t pool_size, std::size_t task_max_count, const char *name,
130  entry_manager *context_manager, std::size_t core_count,
131  bool debug_logging, bool pool_threads = false,
132  wait_seconds wait_for_task_time = std::chrono::seconds (5));
133 
134  // destroy worker pool
135  void destroy_worker_pool (entry_workpool *&worker_pool_arg);
136 
137  // push task to worker pool created with this manager
138  // if worker_pool_arg is NULL, the task is executed immediately
139  void push_task (entry_workpool *worker_pool_arg, entry_task *exec_p);
140  // push task on the given core of entry worker pool.
141  // read cubthread::worker_pool::execute_on_core for details.
142  void push_task_on_core (entry_workpool *worker_pool_arg, entry_task *exec_p, std::size_t core_hash);
143 
144  // try to execute task if there are available thread in worker pool
145  // if worker_pool_arg is NULL, the task is executed immediately
146  bool try_task (entry &thread_p, entry_workpool *worker_pool_arg, entry_task *exec_p);
147 
148  // return if pool is full
149  // for SERVER_MODE see worker_pool::is_full
150  // for SA_MODE it is always false
151  bool is_pool_full (entry_workpool *worker_pool_arg);
152 
154  // daemon management
156 
157  // there are two types of daemons:
158  //
159  // 1. daemons based on thread_entry context
160  // 2. daemons without context
161  //
162  // first types of daemons will also have to reserve a thread entry. there can be unlimited second type daemons
163  //
164  // create_daemon/destroy_daemon and create_daemon_without_entry/destroy_daemon_without_entry are not
165  // interchangeable. expect safe-guard failures if not used appropriately.
166  //
167 
168  // create daemon thread
169  //
170  // note: signature should match context-based daemon constructor. only exception is context manager which is
171  // moved at the end to allow a default value
172  //
173  // todo: remove default daemon name
174  daemon *create_daemon (const looper &looper_arg, entry_task *exec_p, const char *daemon_name = "",
175  entry_manager *context_manager = NULL);
176  // destroy daemon thread
177  void destroy_daemon (daemon *&daemon_arg);
178 
179  // create & destroy daemon thread without thread entry
180  //
181  // note: create signature should match context-less daemon constructor
182  daemon *create_daemon_without_entry (const looper &looper_arg, task_without_context *exec_p,
183  const char *daemon_name);
184  void destroy_daemon_without_entry (daemon *&daemon_arg);
185 
187  // other member functions
189 
190  // get the maximum thread count
191  std::size_t get_max_thread_count (void) const;
192 
193  // verify all threads (workers and daemons) are killed
194  void check_all_killed (void);
195 
196  // get entry array; required for thread.c/h backward compatibility
197  // todo: remove me
199  {
200  return m_all_entries;
201  }
202 
204  {
205  return *m_lf_tran_sys;
206  }
207 
208  void set_max_thread_count_from_config ();
209  void set_max_thread_count (std::size_t count);
210 
212  entry *find_by_tid (thread_id_t tid);
213 
214  // mappers
215 
216  // map all entries
217  // function signature is:
218  // bool & stop_mapper - output true to stop mapping over threads
219  template <typename Func, typename ... Args>
220  void map_entries (Func &&func, Args &&... args);
221 
222  private:
223 
224  // define friend classes/functions to access claim_entry/retire_entry functions
225  friend class entry_manager;
226  friend void initialize (entry *&my_entry);
227  friend void finalize (void);
228 
229  // private type aliases
231 
232  // claim/retire entries
233  entry *claim_entry (void);
234  void retire_entry (entry &entry_p);
235 
236  // generic implementation to create and destroy resources (specialize through daemon and entry_workpool)
237  template <typename Res, typename ... CtArgs>
238  Res *create_and_track_resource (std::vector<Res *> &tracker, size_t entries_count, CtArgs &&... args);
239  template <typename Res>
240  void destroy_and_untrack_resource (std::vector<Res *> &tracker, Res *&res, std::size_t entries_count);
241  template <typename Res>
242  void destroy_and_untrack_all_resources (std::vector<Res *> &tracker);
243 
244  // private members
245 
246  // max thread count
247  std::size_t m_max_threads;
248 
249  // guard for thread resources
251  // worker pools
252  std::vector<entry_workpool *> m_worker_pools;
253  // daemons
254  std::vector<daemon *> m_daemons;
255  // daemons without entries
256  std::vector<daemon *> m_daemons_without_entries;
257 
258  // entries
260  // entry pool
262  // available entries count
266 
267  // lock-free transaction system
269  };
270 
272  // thread logging flags
273  //
274  // TODO: complete thread logging for all modules
275  //
276  // How to use:
277  //
278  // do_log = is_logging_configured (LOG_MANAGER);
279  // if (do_log)
280  // _er_log_debug (ARG_FILE_LINE, "something happens\n);
281  //
282  // Flags explained:
283  //
284  // There are three types of flags to be used: manager, worker pool and daemons. For now, only worker pools are
285  // actually logged, others are just declared for future extensions.
286  //
287  // To activate a logging flag, should set the thread_logging_flag system parameter value to include flag.
288  // For instance, to log connections, the bit for LOG_WORKER_POOL_CONNECTIONS should be set.
289  //
291  // system parameter flags for thread logging
292  // manager flags
293  const int LOG_MANAGER = 0x1;
294  const int LOG_MANAGER_ALL = 0xFF; // reserved for thread manager
295 
296  // worker pool flags
297  const int LOG_WORKER_POOL_VACUUM = 0x100;
298  const int LOG_WORKER_POOL_CONNECTIONS = 0x200;
299  const int LOG_WORKER_POOL_TRAN_WORKERS = 0x400;
300  const int LOG_WORKER_POOL_INDEX_BUILDER = 0x800;
301  const int LOG_WORKER_POOL_ALL = 0xFF00; // reserved for thread worker pools
302 
303  // daemons flags
304  const int LOG_DAEMON_VACUUM = 0x10000;
305  const int LOG_DAEMON_ALL = 0xFFFF0000; // reserved for thread daemons
306 
307  bool is_logging_configured (const int logging_flag);
308 
310  // thread global functions
312 
313  // initialize thread manager; note this creates a singleton cubthread::manager instance
314  void initialize (entry *&my_entry);
315 
316  // finalize thread manager
317  void finalize (void);
318 
319  // backward compatibility initialization
320  int initialize_thread_entries (bool with_lock_free = true);
321  entry *get_main_entry (void);
322 
323  // get thread manager
324  manager *get_manager (void);
325 
326  // quick fix for unit test mock-ups
327  void set_manager (manager *manager);
328 
329  // get maximum thread count
330  std::size_t get_max_thread_count (void);
331 
332  // is_single_thread context; e.g. SA_MODE
333  // todo: sometimes SERVER_MODE can be single-thread; e.g. during boot
334  bool is_single_thread (void);
335  // safe-guard for multi-thread features not being used in single-thread context
336  void check_not_single_thread (void);
337 
338  // get current thread's entry
339  entry &get_entry (void);
340  void set_thread_local_entry (entry &tl_entry); // for unit test easy mock-ups
341  void clear_thread_local_entry (void); // for unit test easy mock-ups
342 
344 
346  // template / inline functions
348 
349  template <typename Func, typename ... Args>
350  void
351  manager::map_entries (Func &&func, Args &&... args)
352  {
353  bool stop = false;
354  for (std::size_t i = 0; i < m_max_threads; i++)
355  {
356  func (m_all_entries[i], stop, std::forward<Args> (args)...);
357  if (stop)
358  {
359  break;
360  }
361  }
362  }
363 
364 } // namespace cubthread
365 
367 // alias functions to be used in C legacy code
368 //
369 // use inline functions instead of definitions
371 
372 inline cubthread::manager *
374 {
375  return cubthread::get_manager ();
376 }
377 
378 inline std::size_t
380 {
382 }
383 
384 inline cubthread::entry *
386 {
388  return &te;
389 }
390 
391 inline int
393 {
394  if (thread_p == NULL)
395  {
396  thread_p = thread_get_thread_entry_info ();
397  }
398 
399  return thread_p->index;
400 }
401 
402 inline int
404 {
406 }
407 
408 inline void
410 {
412 }
413 
414 // todo - we really need to do some refactoring for lock-free structures
415 inline lf_tran_entry *
416 thread_get_tran_entry (cubthread::entry *thread_p, int entry_idx)
417 {
418  if (thread_p == NULL)
419  {
420  thread_p = thread_get_thread_entry_info ();
421  }
422  if (entry_idx >= 0 && entry_idx < THREAD_TS_LAST)
423  {
424  return thread_p->tran_entries[entry_idx];
425  }
426  else
427  {
428  assert (false);
429  return NULL;
430  }
431 }
432 
433 template <typename Duration>
434 inline void
435 thread_sleep_for (Duration d)
436 {
437  std::this_thread::sleep_for (d);
438 }
439 
440 inline void
441 thread_sleep (double millisec)
442 {
443  // try to avoid this and use thread_sleep_for instead
444  std::chrono::duration<double, std::milli> duration_millis (millisec);
445  thread_sleep_for (duration_millis);
446 }
447 
448 #endif // _THREAD_MANAGER_HPP_
entry_dispatcher * m_entry_dispatcher
cubthread::entry * thread_get_thread_entry_info(void)
void thread_return_lock_free_transaction_entries(void)
lf_tran_entry * tran_entries[THREAD_TS_COUNT]
const int LOG_WORKER_POOL_ALL
std::vector< daemon * > m_daemons
const int LOG_WORKER_POOL_VACUUM
static API_MUTEX mutex
Definition: api_util.c:72
entry_manager * m_entry_manager
void thread_sleep(double millisec)
unsigned long int thread_id_t
Definition: lock_free.h:120
void clear_thread_local_entry(void)
int thread_get_current_entry_index(void)
cubthread::manager * thread_get_manager(void)
void set_thread_local_entry(entry &tl_entry)
const int LOG_WORKER_POOL_CONNECTIONS
const int LOG_WORKER_POOL_TRAN_WORKERS
const int LOG_WORKER_POOL_INDEX_BUILDER
const int LOG_MANAGER_ALL
const int LOG_DAEMON_VACUUM
int thread_get_entry_index(cubthread::entry *thread_p)
bool is_single_thread(void)
manager * get_manager(void)
std::vector< entry_workpool * > m_worker_pools
const int LOG_DAEMON_ALL
#define assert(x)
lockfree::tran::system * m_lf_tran_sys
void thread_sleep_for(Duration d)
std::size_t m_max_threads
std::vector< daemon * > m_daemons_without_entries
void check_not_single_thread(void)
lf_tran_entry * thread_get_tran_entry(cubthread::entry *thread_p, int entry_idx)
int initialize_thread_entries(bool with_lock_free)
#define NULL
Definition: freelistheap.h:34
lockfree::tran::system & get_lockfree_transys()
void return_lock_free_transaction_entries(void)
std::size_t get_max_thread_count(void)
int count(int &result, const cub_regex_object &reg, const std::string &src, const int position, const INTL_CODESET codeset)
std::mutex m_entries_mutex
void set_manager(manager *manager)
entry * get_main_entry(void)
std::size_t m_available_entries_count
std::size_t thread_num_total_threads(void)
int i
Definition: dynamic_load.c:954
daemon_entry_manager * m_daemon_entry_manager
entry * get_all_entries(void)
entry & get_entry(void)
const int LOG_MANAGER
bool is_logging_configured(const int logging_flag)