CUBRID Engine  latest
thread_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * thread_manager.cpp - implementation for tracker for all thread resources
21  */
22 
23 #include "thread_manager.hpp"
24 
25 // same module includes
26 #if defined (SERVER_MODE)
27 #include "thread_daemon.hpp"
28 #endif // SERVER_MODE
29 #include "thread_entry.hpp"
30 #include "thread_entry_task.hpp"
31 #if defined (SERVER_MODE)
32 #include "thread_worker_pool.hpp"
33 #endif // SERVER_MODE
34 
35 // project includes
36 #include "error_manager.h"
37 #include "log_impl.h"
38 #include "lock_free.h"
40 #include "resource_shared_pool.hpp"
41 #include "system_parameter.h"
42 
43 #include <cassert>
44 
45 namespace cubthread
46 {
47  thread_local entry *tl_Entry_p = NULL;
48 
49  manager::manager (void)
50  : m_max_threads (0)
51  , m_entries_mutex ()
52  , m_worker_pools ()
53  , m_daemons ()
54  , m_all_entries (NULL)
55  , m_entry_dispatcher (NULL)
56  , m_available_entries_count (0)
57  , m_entry_manager (NULL)
58  , m_daemon_entry_manager (NULL)
59  , m_lf_tran_sys (NULL)
60  {
61  m_entry_manager = new entry_manager ();
62  m_daemon_entry_manager = new daemon_entry_manager ();
63  }
64 
66  {
67  // pool container should be empty by now
69 
70  // make sure that we stop and free all
72 
73  delete m_entry_dispatcher;
74  delete [] m_all_entries;
75  delete m_entry_manager;
77  delete m_lf_tran_sys;
78  }
79 
80  void
82  {
83 #if defined (SA_MODE)
84  assert (false);
85  return;
86 #else // not SA_MODE = SERVER_MODE
87 
89 
92 #endif // not SA_MODE = SERVER_MODE
93  }
94 
95  void
96  manager::init_entries (bool with_lock_free)
97  {
98  // initialize thread indexes and lock-free resources
99  for (std::size_t it = 0; it < m_max_threads; it++)
100  {
101  m_all_entries[it].index = (int) (it + 1);
102  if (with_lock_free)
103  {
106  }
107  }
108  }
109 
110  void
112  {
113 #if defined (SERVER_MODE)
114  // threads + main
116 #else // !SERVER_MODE = SA_MODE
117  m_lf_tran_sys = new lockfree::tran::system (1); // a single thread = main
118 #endif // !SERVER_MODE = SA_MODE
119  }
120 
121  template<typename Res>
122  void manager::destroy_and_untrack_all_resources (std::vector<Res *> &tracker)
123  {
124  assert (tracker.empty ());
125 
126 #if defined (SERVER_MODE)
127  for (; !tracker.empty ();)
128  {
129  const auto iter = tracker.begin ();
130  (*iter)->stop_execution ();
131  delete *iter;
132  tracker.erase (iter);
133  }
134 #endif // SERVER_MODE
135  }
136 
137  template<typename Res, typename ... CtArgs>
138  inline Res *manager::create_and_track_resource (std::vector<Res *> &tracker, size_t entries_count, CtArgs &&... args)
139  {
141 
142  std::unique_lock<std::mutex> lock (m_entries_mutex); // safe-guard
143 
144  if (m_available_entries_count < entries_count)
145  {
146  return NULL;
147  }
148  m_available_entries_count -= entries_count;
149 
150  Res *new_res = new Res (std::forward<CtArgs> (args)...);
151 
152  tracker.push_back (new_res);
153 
154  return new_res;
155  }
156 
158  manager::create_worker_pool (size_t pool_size, size_t task_max_count, const char *name,
159  entry_manager *context_manager, std::size_t core_count, bool debug_logging,
160  bool pool_threads, wait_seconds wait_for_task_time)
161  {
162 #if defined (SERVER_MODE)
163  if (is_single_thread ())
164  {
165  return NULL;
166  }
167  else
168  {
169  if (context_manager == NULL)
170  {
171  context_manager = m_entry_manager;
172  }
173  // reserve pool_size entries and add to m_worker_pools
174  return create_and_track_resource (m_worker_pools, pool_size, pool_size, task_max_count, *context_manager,
175  name, core_count, debug_logging, pool_threads, wait_for_task_time);
176  }
177 #else // not SERVER_MODE = SA_MODE
178  return NULL;
179 #endif // not SERVER_MODE = SA_MODE
180  }
181 
182  daemon *
183  manager::create_daemon (const looper &looper_arg, entry_task *exec_p, const char *daemon_name /* = "" */,
184  entry_manager *context_manager /* = NULL */)
185  {
186 #if defined (SERVER_MODE)
187  if (is_single_thread ())
188  {
189  assert (false);
190  return NULL;
191  }
192  else
193  {
194  if (context_manager == NULL)
195  {
196  context_manager = m_daemon_entry_manager;
197  }
198  // reserve 1 entry and add to m_daemons
199  return create_and_track_resource (m_daemons, 1, looper_arg, context_manager, exec_p, daemon_name);
200  }
201 #else // not SERVER_MODE = SA_MODE
202  assert (false);
203  return NULL;
204 #endif // not SERVER_MODE = SA_MODE
205  }
206 
207  daemon *
208  manager::create_daemon_without_entry (const looper &looper_arg, task_without_context *exec_p, const char *daemon_name)
209  {
210 #if defined (SERVER_MODE)
211  if (is_single_thread ())
212  {
213  assert (false);
214  return NULL;
215  }
216  else
217  {
218  // reserve no entry and add to m_daemons_without_entries
219  return create_and_track_resource (m_daemons_without_entries, 0, looper_arg, exec_p, daemon_name);
220  }
221 #else // not SERVER_MODE = SA_MODE
222  assert (false);
223  return NULL;
224 #endif // not SERVER_MODE = SA_MODE
225  }
226 
227  template<typename Res>
228  inline void
229  manager::destroy_and_untrack_resource (std::vector<Res *> &tracker, Res *&res, std::size_t entries_count)
230  {
231  std::unique_lock<std::mutex> lock (m_entries_mutex); // safe-guard
233 
234  for (auto iter = tracker.begin (); iter != tracker.end (); ++iter)
235  {
236  if (res == *iter)
237  {
238  // remove resource from tracker
239  (void) tracker.erase (iter);
240 
241  // stop resource and delete
242  res->stop_execution ();
243  delete res;
244  res = NULL;
245 
246  // update available entries
247  m_available_entries_count += entries_count;
248 
249  return;
250  }
251  }
252  // resource not found
253  assert (false);
254  }
255 
256  void
258  {
259 #if defined (SERVER_MODE)
260  if (worker_pool_arg == NULL)
261  {
262  return;
263  }
264  // remove from m_worker_pools and free worker_pool_arg->get_max_count thread entries
265  return destroy_and_untrack_resource (m_worker_pools, worker_pool_arg, worker_pool_arg->get_max_count ());
266 #else // not SERVER_MODE = SA_MODE
267  assert (worker_pool_arg == NULL);
268 #endif // not SERVER_MODE = SA_MODE
269  }
270 
271  void
272  manager::push_task (entry_workpool *worker_pool_arg, entry_task *exec_p)
273  {
274  if (worker_pool_arg == NULL)
275  {
276  // execute on this thread
277  exec_p->execute (get_entry ());
278  exec_p->retire ();
279  }
280  else
281  {
282 #if defined (SERVER_MODE)
284  worker_pool_arg->execute (exec_p);
285 #else // not SERVER_MODE = SA_MODE
286  assert (false);
287  // execute on this thread
288  exec_p->execute (get_entry ());
289  exec_p->retire ();
290 #endif // not SERVER_MODE = SA_MODE
291  }
292  }
293 
294  void
295  manager::push_task_on_core (entry_workpool *worker_pool_arg, entry_task *exec_p, std::size_t core_hash)
296  {
297  if (worker_pool_arg == NULL)
298  {
299  // execute on this thread
300  exec_p->execute (get_entry ());
301  exec_p->retire ();
302  }
303  else
304  {
305 #if defined (SERVER_MODE)
307  worker_pool_arg->execute_on_core (exec_p, core_hash);
308 #else // not SERVER_MODE = SA_MODE
309  assert (false);
310  // execute on this thread
311  exec_p->execute (get_entry ());
312  exec_p->retire ();
313 #endif // not SERVER_MODE = SA_MODE
314  }
315  }
316 
317  bool
318  manager::try_task (entry &thread_p, entry_workpool *worker_pool_arg, entry_task *exec_p)
319  {
320  if (worker_pool_arg == NULL)
321  {
322  // execute on this thread
323  exec_p->execute (thread_p);
324  exec_p->retire ();
325  return true;
326  }
327  else
328  {
329 #if defined (SERVER_MODE)
331  return worker_pool_arg->try_execute (exec_p);
332 #else // not SERVER_MODE = SA_MODE
333  assert (false);
334  return false;
335 #endif // not SERVER_MODE = SA_MODE
336  }
337  }
338 
339  bool
341  {
342 #if defined (SERVER_MODE)
343  return worker_pool_arg == NULL || worker_pool_arg->is_full ();
344 #else // not SERVER_MODE = SA_MODE
345  // on SA_MODE can always push more tasks
346  return false;
347 #endif // not SERVER_MODE = SA_MODE
348  }
349 
350  void
352  {
353 #if defined (SERVER_MODE)
354  if (daemon_arg == NULL)
355  {
356  return;
357  }
358  // remove from m_daemons and free one thread entry
359  return destroy_and_untrack_resource (m_daemons, daemon_arg, 1);
360 #else // not SERVER_MODE = SA_MODE
361  assert (daemon_arg == NULL);
362 #endif // not SERVER_MODE = SA_MODE
363  }
364 
365  void
367  {
368 #if defined (SERVER_MODE)
369  if (daemon_arg == NULL)
370  {
371  return;
372  }
373  // remove from m_daemons_without_entries; no thread entries have been reserved
375 #else // not SERVER_MODE = SA_MODE
376  assert (daemon_arg == NULL);
377 #endif // not SERVER_MODE = SA_MODE
378  }
379 
380  entry *
382  {
383  tl_Entry_p = m_entry_dispatcher->claim ();
384 
385  return tl_Entry_p;
386  }
387 
388  void
390  {
391  assert (tl_Entry_p == &entry_p);
392 
393  tl_Entry_p = NULL;
394  m_entry_dispatcher->retire (entry_p);
395  }
396 
397  std::size_t
399  {
400  return m_max_threads;
401  }
402 
403  void
405  {
406  // check all thread resources are killed and freed
410  }
411 
412  void
414  {
415  // todo: is there a better way to decide on the maximum number of thread entries?
416  std::size_t max_active_workers = NUM_NON_SYSTEM_TRANS; // one per each connection
417  std::size_t max_conn_workers = NUM_NON_SYSTEM_TRANS; // one per each connection
418  std::size_t max_vacuum_workers = prm_get_integer_value (PRM_ID_VACUUM_WORKER_COUNT);
419  std::size_t max_daemons = 128; // magic number to cover predictable requirements; not cool
420 
421  // note: thread entry initialization is slow, that is why we keep a static pool initialized from the beginning to
422  // quickly claim entries. in my opinion, it would be better to have thread contexts that can be quickly
423  // generated at "runtime" (after thread starts its task). however, with current thread entry design, that is
424  // rather unlikely.
425 
426  m_max_threads = max_active_workers + max_conn_workers + max_vacuum_workers + max_daemons;
427  }
428 
429  void
431  {
433  }
434 
435  void
437  {
438  for (std::size_t index = 0; index < m_max_threads; index++)
439  {
441  m_lf_tran_sys->free_index (m_all_entries[index].pull_lf_tran_index ());
442  }
443  }
444 
445  entry *
447  {
448  for (std::size_t index = 0; index < m_max_threads; index++)
449  {
450  if (m_all_entries[index].get_id () == tid)
451  {
452  return &m_all_entries[index];
453  }
454  }
455  return NULL;
456  }
457 
459  // Global thread interface
461 
462 #if defined (SERVER_MODE)
463  const bool Is_single_thread = false;
464 #else // not SERVER_MODE = SA_MODE
465  const bool Is_single_thread = true;
466 #endif // not SERVER_MODE = SA_MODE
467 
468  static manager *Manager = NULL;
470 
471  void
472  initialize (entry *&my_entry)
473  {
474  // note - currently it is designed to be called only once. if we want repeatable calls, code must be updated.
475 
476  assert (my_entry == NULL);
477 
478  assert (Manager == NULL);
479  if (Manager == NULL)
480  {
481  Manager = new manager ();
482  }
483 
484  // init main entry
485  assert (Main_entry_p == NULL);
486  Main_entry_p = new entry ();
487  Main_entry_p->index = 0;
488  Main_entry_p->register_id ();
489  Main_entry_p->m_status = entry::status::TS_RUN;
490  Main_entry_p->resume_status = THREAD_RESUME_NONE;
491  Main_entry_p->tran_index = 0; /* system transaction */
492 #if defined (SERVER_MODE)
493  // SA_MODE uses singleton context
494  Main_entry_p->get_error_context ().register_thread_local ();
495 #endif // SERVER_MODE
496 
497  assert (tl_Entry_p == NULL);
498  tl_Entry_p = Main_entry_p;
499 
500  my_entry = Main_entry_p;
501 
502  assert (my_entry == thread_get_thread_entry_info ());
503 
504 #if defined (SERVER_MODE)
506  {
507  // perf tool needs threads to be always alive to work
509  }
510 #endif // SERVER_MODE
511  }
512 
513  void
514  finalize (void)
515  {
516 #if defined (SERVER_MODE)
517  if (Main_entry_p != NULL)
518  {
519  Main_entry_p->get_error_context ().deregister_thread_local ();
520  }
521 #endif // SERVER_MODE
522 
523  delete Main_entry_p;
524  Main_entry_p = NULL;
525  tl_Entry_p = NULL;
526 
527  delete Manager;
528  Manager = NULL;
529  }
530 
531  int
532  initialize_thread_entries (bool with_lock_free /* = true*/)
533  {
534  assert (Main_entry_p != NULL);
535 
536  int error_code = NO_ERROR;
537 #if defined (SERVER_MODE)
538  size_t old_manager_thread_count = 0;
539 
540  assert (Manager != NULL);
541 
543  Manager->alloc_entries ();
544 #endif // SERVER_MODE
545 
546  // note: even though SA_MODE does not really need to synchronize access on lock-free structures, it is better to
547  // simulate using lock-free transaction in order to avoid managing separate code
548 
550  if (error_code != NO_ERROR)
551  {
552  ASSERT_ERROR ();
553  return error_code;
554  }
555  Manager->init_lockfree_system ();
556 
557  if (with_lock_free)
558  {
559  Main_entry_p->request_lock_free_transactions ();
560  Main_entry_p->assign_lf_tran_index (Manager->get_lockfree_transys ().assign_index ());
561  }
562 
563  Manager->init_entries (with_lock_free);
564 
565  return NO_ERROR;
566  }
567 
568  entry *
570  {
571  assert (Main_entry_p != NULL);
572 
573  return Main_entry_p;
574  }
575 
576  manager *
577  get_manager (void)
578  {
579  assert (Manager != NULL);
580 
581  return Manager;
582  }
583 
585  {
586  assert (Manager == NULL);
587 
588  Manager = manager;
589  }
590 
591  std::size_t
593  {
594  // system thread + managed threads
595  return 1 + (Manager != NULL ? Manager->get_max_thread_count () : 0);
596  }
597 
598  entry &
599  get_entry (void)
600  {
601  // shouldn't be called
602  // todo: add thread_p to error manager; or something
603  // er_print_callstack (ARG_FILE_LINE, "warning: manager::get_entry is called");
604  // todo
605  assert (tl_Entry_p != NULL);
606  return *tl_Entry_p;
607  }
608 
609  void
611  {
612  assert (tl_Entry_p == NULL);
613  tl_Entry_p = &tl_entry;
614  }
615 
616  void
618  {
619  assert (tl_Entry_p != NULL);
620  tl_Entry_p = NULL;
621  }
622 
623  bool
625  {
626  return Is_single_thread;
627  }
628 
629  void
631  {
632  assert (!Is_single_thread);
633  }
634 
635  void
637  {
638  if (Main_entry_p != NULL)
639  {
640  Main_entry_p->return_lock_free_transaction_entries ();
641  }
642  if (Manager != NULL)
643  {
645  }
646  }
647 
648 
649  bool
650  is_logging_configured (const int logging_flag)
651  {
653  }
654 
655 } // namespace cubthread
static entry * Main_entry_p
entry_dispatcher * m_entry_dispatcher
cubthread::entry * thread_get_thread_entry_info(void)
#define NO_ERROR
Definition: error_code.h:46
void return_lock_free_transaction_entries(void)
void assign_lf_tran_index(lockfree::tran::index idx)
#define ASSERT_ERROR()
std::vector< daemon * > m_daemons
void init_entries(bool with_lock_free=false)
const bool Is_single_thread
Res * create_and_track_resource(std::vector< Res * > &tracker, size_t entries_count, CtArgs &&...args)
entry_manager * m_entry_manager
bool try_execute(task_type *work_arg)
void execute(task_type *work_arg)
unsigned long int thread_id_t
void alloc_entries(void)
std::size_t get_max_thread_count(void) const
void clear_thread_local_entry(void)
void wp_set_force_thread_always_alive()
int lf_initialize_transaction_systems(int max_threads)
Definition: lock_free.c:448
void set_thread_local_entry(entry &tl_entry)
void push_task_on_core(entry_workpool *worker_pool_arg, entry_task *exec_p, std::size_t core_hash)
entry_workpool * create_worker_pool(std::size_t pool_size, std::size_t task_max_count, const char *name, entry_manager *context_manager, std::size_t core_count, bool debug_logging, bool pool_threads=false, wait_seconds wait_for_task_time=std::chrono::seconds(5))
thread_resume_suspend_status resume_status
virtual void retire(void)
Definition: thread_task.hpp:75
thread_local entry * tl_Entry_p
void register_thread_local(void)
resource_shared_pool< entry > entry_dispatcher
#define NUM_NON_SYSTEM_TRANS
void destroy_daemon_without_entry(daemon *&daemon_arg)
virtual void execute(context_type &)=0
bool is_single_thread(void)
manager * get_manager(void)
void request_lock_free_transactions(void)
std::vector< entry_workpool * > m_worker_pools
#define assert(x)
static manager * Manager
lockfree::tran::system * m_lf_tran_sys
std::size_t m_max_threads
int prm_get_integer_value(PARAM_ID prm_id)
std::vector< daemon * > m_daemons_without_entries
friend void initialize(entry *&my_entry)
cuberr::context & get_error_context(void)
void check_not_single_thread(void)
int initialize_thread_entries(bool with_lock_free)
void set_max_thread_count_from_config()
#define NULL
Definition: freelistheap.h:34
lockfree::tran::system & get_lockfree_transys()
int count(int &result, const cub_regex_object &reg, const std::string &src, const int position, const INTL_CODESET codeset)
void check_all_killed(void)
std::mutex m_entries_mutex
void set_manager(manager *manager)
void destroy_worker_pool(entry_workpool *&worker_pool_arg)
entry * get_main_entry(void)
void deregister_thread_local(void)
void destroy_and_untrack_all_resources(std::vector< Res * > &tracker)
std::size_t m_available_entries_count
void set_max_thread_count(std::size_t count)
void destroy_daemon(daemon *&daemon_arg)
static bool is_flag_set(const T &where_to_check, const T &what_to_check)
Definition: base_flag.hpp:149
void destroy_and_untrack_resource(std::vector< Res * > &tracker, Res *&res, std::size_t entries_count)
std::size_t get_max_count(void) const
bool prm_get_bool_value(PARAM_ID prm_id)
bool try_task(entry &thread_p, entry_workpool *worker_pool_arg, entry_task *exec_p)
entry * find_by_tid(thread_id_t tid)
daemon * create_daemon_without_entry(const looper &looper_arg, task_without_context *exec_p, const char *daemon_name)
entry * claim_entry(void)
void retire_entry(entry &entry_p)
void return_lock_free_transaction_entries(void)
daemon_entry_manager * m_daemon_entry_manager
bool is_pool_full(entry_workpool *worker_pool_arg)
entry & get_entry(void)
void push_task(entry_workpool *worker_pool_arg, entry_task *exec_p)
daemon * create_daemon(const looper &looper_arg, entry_task *exec_p, const char *daemon_name="", entry_manager *context_manager=NULL)
void execute_on_core(task_type *work_arg, std::size_t core_hash)
bool is_logging_configured(const int logging_flag)