CUBRID Engine  latest
load_worker_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * load_worker_manager.cpp - Thread manager of the loaddb session
21  */
22 
23 #include "load_worker_manager.hpp"
24 
25 #include "load_driver.hpp"
26 #include "load_session.hpp"
27 #include "resource_shared_pool.hpp"
28 #include "thread_worker_pool.hpp"
30 #include "xserver_interface.h"
31 
32 #include <condition_variable>
33 #include <mutex>
34 #include <set>
35 
36 namespace cubload
37 {
38  /*
39  * cubload::worker_context_manager
40  * extends cubthread::entry_manager
41  *
42  * description
43  * Thread entry manager for loaddb worker pool. Main functionality of the entry manager is to keep a pool of
44  * cubload::driver instances.
45  * on_create - a driver instance is claimed from the pool and assigned on thread ref
46  * on_retire - previously stored driver in thread ref, is retired to the pool
47  */
49  {
50  public:
51  explicit worker_context_manager (unsigned int pool_size);
52  ~worker_context_manager () override = default;
53 
54  void on_create (cubthread::entry &context) override;
55  void on_retire (cubthread::entry &context) override;
56 
57  private:
59  };
60 
62  static std::condition_variable g_wp_condvar;
63  std::set<session *> g_active_sessions;
67 
69  : m_driver_pool (pool_size)
70  {
71  //
72  }
73 
75  {
76  driver *driver = m_driver_pool.claim ();
77 
78  context.m_loaddb_driver = driver;
79  context.type = TT_LOADDB;
80  }
81 
83  {
84  if (context.m_loaddb_driver == NULL)
85  {
86  return;
87  }
88 
89  context.m_loaddb_driver->clear ();
90 
91  m_driver_pool.retire (*context.m_loaddb_driver);
92 
93  context.m_loaddb_driver = NULL;
94  context.conn_entry = NULL;
95  }
96 
97  bool
99  {
100  assert (g_worker_pool != NULL);
101  return g_wp_task_capper->try_task (task);
102  }
103 
104  void
106  {
107  g_wp_mutex.lock ();
108 
109  if (g_active_sessions.empty ())
110  {
111  assert (g_worker_pool == NULL);
112  assert (g_wp_context_manager == NULL);
113 
114  unsigned int pool_size = prm_get_integer_value (PRM_ID_LOADDB_WORKER_COUNT);
115 
116  g_wp_context_manager = new worker_context_manager (pool_size);
117  g_worker_pool = cubthread::get_manager ()->create_worker_pool (pool_size, 2 * pool_size, "loaddb-workers",
118  g_wp_context_manager, 1, false, true);
119 
121  }
122  else
123  {
124  assert (g_worker_pool != NULL);
125  assert (g_wp_context_manager != NULL);
126  }
127 
128  g_active_sessions.insert (&load_session);
129 
130  g_wp_mutex.unlock ();
131  }
132 
133  void
135  {
136  g_wp_mutex.lock ();
137 
138  if (g_active_sessions.erase (&load_session) != 1)
139  {
140  assert (false);
141  }
142 
143  // Check if there are any sessions attached to the wp. We are under lock so we are the only ones doing this.
144  if (g_active_sessions.empty ())
145  {
146  // We are the last session so we can safely destroy the worker pool and the manager.
147  cubthread::get_manager ()->destroy_worker_pool (g_worker_pool);
148  delete g_wp_context_manager;
149 
150  delete g_wp_task_capper;
151 
152  g_worker_pool = NULL;
153  g_wp_context_manager = NULL;
154  g_wp_task_capper = NULL;
155  }
156 
157  g_wp_mutex.unlock ();
158  g_wp_condvar.notify_one ();
159  }
160 
161  void
163  {
164  std::unique_lock<std::mutex> ulock (g_wp_mutex);
165  if (g_active_sessions.empty ())
166  {
167  return;
168  }
169 
170  for (auto &it : g_active_sessions)
171  {
172  it->interrupt ();
173  }
174  auto pred = [] () -> bool
175  {
176  return g_active_sessions.empty ();
177  };
178  g_wp_condvar.wait (ulock, pred);
179  }
180 
181  void
182  worker_manager_get_stats (UINT64 *stats_out)
183  {
184  std::unique_lock<std::mutex> ulock (g_wp_mutex);
185  if (g_worker_pool != NULL)
186  {
187  g_worker_pool->get_stats (stats_out);
188  }
189  }
190 } // namespace cubload
void on_retire(cubthread::entry &context) override
void worker_manager_get_stats(UINT64 *stats_out)
static API_MUTEX mutex
Definition: api_util.c:72
static std::mutex g_wp_mutex
css_conn_entry * conn_entry
static std::condition_variable g_wp_condvar
entry_workpool * create_worker_pool(std::size_t pool_size, std::size_t task_max_count, const char *name, entry_manager *context_manager, std::size_t core_count, bool debug_logging, bool pool_threads=false, wait_seconds wait_for_task_time=std::chrono::seconds(5))
thread_type type
manager * get_manager(void)
#define assert(x)
int prm_get_integer_value(PARAM_ID prm_id)
static worker_context_manager * g_wp_context_manager
cubload::driver * m_loaddb_driver
static cubthread::worker_pool_task_capper< cubthread::entry > * g_wp_task_capper
#define NULL
Definition: freelistheap.h:34
std::set< session * > g_active_sessions
void get_stats(cubperf::stat_value *stats_out) const
~worker_context_manager() override=default
void worker_manager_unregister_session(session &load_session)
worker_context_manager(unsigned int pool_size)
void destroy_worker_pool(entry_workpool *&worker_pool_arg)
void worker_manager_stop_all()
void on_create(cubthread::entry &context) override
static cubthread::entry_workpool * g_worker_pool
void worker_manager_register_session(session &load_session)
bool worker_manager_try_task(cubthread::entry_task *task)
resource_shared_pool< driver > m_driver_pool