CUBRID Engine  latest
thread_worker_pool_taskcap.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 //
20 // cap the number of tasks that are pushed into an worker pool and block incoming pushes when maxed
21 //
22 
23 #ifndef _THREAD_WORKER_POOL_TASKCAP_HPP_
24 #define _THREAD_WORKER_POOL_TASKCAP_HPP_
25 
26 #include "thread_worker_pool.hpp"
27 
28 namespace cubthread
29 {
30  template <typename Context>
32  {
33  private:
34  using context_type = Context;
37 
38  public:
40  ~worker_pool_task_capper () = default;
41 
42  bool try_task (task<Context> *task);
45 
46  private:
47  // forward declaration
48  class capped_task;
49 
50  void end_task ();
51 
52  void execute (task<Context> *task); // function does not acquire m_mutex lock
53 
56  size_t m_max_tasks;
58  std::condition_variable m_cond_var;
59  };
60 
61  template <typename Context>
63  {
64  public:
65  capped_task () = delete;
67  ~capped_task ();
68 
69  void execute (context_type &ctx) override;
70 
71  private:
74  };
75 } // namespace cubthread
76 
77 namespace cubthread
78 {
80  // worker_pool_task_capper template implementation
82  template <typename Context>
84  : m_worker_pool (worker_pool)
85  , m_tasks_available (0)
86  , m_max_tasks (0)
87  , m_mutex ()
88  , m_cond_var ()
89  {
90  m_tasks_available = m_max_tasks = worker_pool->get_max_count ();
91  }
92 
93  template <typename Context>
94  bool
96  {
97  std::unique_lock<std::mutex> ulock (m_mutex);
98 
99  if (m_tasks_available == 0)
100  {
101  // is full
102  return false;
103  }
104 
105  execute (task);
106  return true;
107  }
108 
109  template <typename Context>
110  void
112  {
113  std::unique_lock<std::mutex> ulock (m_mutex);
114 
115  auto pred = [&] () -> bool { return (m_tasks_available > 0); };
116  m_cond_var.wait (ulock, pred);
117 
118  // Make sure we have the lock.
119  assert (ulock.owns_lock ());
120 
121  execute (task);
122  }
123 
124  template <typename Context>
125  void
127  {
128  // Safeguard.
130 
132  m_worker_pool->execute (new capped_task (*this, task));
133  }
134 
135  template <typename Context>
137  {
138  std::unique_lock<std::mutex> ulock (m_mutex);
140 
141  // Safeguard
142  assert (m_tasks_available <= m_max_tasks && m_tasks_available > 0);
143 
144  ulock.unlock ();
145  m_cond_var.notify_all ();
146  }
147 
148  template <typename Context>
150  {
151  return m_worker_pool;
152  }
153 
154  template <typename Context>
156  : m_capper (capper)
157  , m_nested_task (task)
158  {
159  }
160 
161  template <typename Context>
163  {
164  m_nested_task->retire ();
165  }
166 
167  template <typename Context>
168  void
170  {
171  m_nested_task->execute (ctx);
172  m_capper.end_task ();
173  }
174 }
175 
176 #endif // !_THREAD_WORKER_POOL_TASKCAP_HPP_
worker_pool_task_capper(worker_pool< Context > *worker_pool)
static API_MUTEX mutex
Definition: api_util.c:72
virtual void retire(void)
Definition: thread_task.hpp:75
virtual void execute(context_type &)=0
cubthread::worker_pool< Context > * m_worker_pool
#define assert(x)
Context context_type
Definition: thread_task.hpp:64
std::size_t get_max_count(void) const
cubthread::worker_pool< Context > * get_worker_pool() const