CUBRID Engine  latest
lockfree_circular_queue.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * lockfree_circular_queue.hpp : Lock-free structures interface.
21  */
22 
23 #ifndef _LOCKFREE_CIRCULAR_QUEUE_HPP_
24 #define _LOCKFREE_CIRCULAR_QUEUE_HPP_
25 
26 #include "base_flag.hpp"
27 
28 #include <atomic>
29 #include <cassert>
30 #include <climits>
31 #include <cstddef>
32 #include <cstdint>
33 #include <thread>
34 #include <type_traits>
35 
36 // activate preprocessor if you need to debug low-level execution of lockfree circular queue
37 // note that if you wanna track the execution history of low-level operation of a certain queue, you also need to
38 // replace produce/consume functions with produce_debug/consume_debug
39 //
40 // #define DEBUG_LFCQ
41 
42 namespace lockfree
43 {
44 
45  template <class T>
47  {
48  public:
49  using cursor_type = std::uint64_t;
50  using atomic_cursor_type = std::atomic<cursor_type>;
51  using data_type = T;
53 
54  circular_queue (std::size_t size);
55  ~circular_queue ();
56 
57  inline bool is_empty () const; // is query empty? use it for early outs but don't rely on the answer
58  inline bool is_full () const; // is query full? use it for early outs but don't rely on the answer
59  inline bool is_half_full ();
60  inline std::size_t size ();
61 
62  inline bool consume (T &element); // consume one element from queue; returns false on fail
63  // IMPORTANT!
64  // Element argument may change even if consume fails.
65  // Using its value after failed consumption is not safe.
66  inline bool produce (const T &element); // produce an element to queue; returns false on fail
67  inline void force_produce (const T &element); // force produce (loop until successful)
68 
69  inline std::uint64_t get_consumer_cursor (); // get consume cursor
70 
71  // note:
72  //
73  // above functions are cloned by debug counterparts which track a history of executed low-level operations.
74  // they were not cloned initially, but the code became unreadable with all the #ifdef DEBUG_LFCQ/#endif
75  // preprocessor directives.
76  // therefore, if you update the code for any of these functions, please make sure the debug counterparts are
77  // updated too.
78 
79  private:
80  static const cursor_type BLOCK_FLAG;
81 
82  // block default and copy constructors
83  circular_queue ();
85 
86  inline std::size_t next_pow2 (std::size_t size) const;
87 
88  inline bool test_empty_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const;
89  inline bool test_full_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const;
90 
93 
94  inline T load_data (cursor_type consume_cursor) const;
95  inline void store_data (cursor_type index, const T &data);
96  inline std::size_t get_cursor_index (cursor_type cursor) const;
97 
98  inline bool is_blocked (cursor_type cursor) const;
99  inline bool block (cursor_type cursor);
100  inline void unblock (cursor_type cursor);
101  inline void init_blocked_cursors (void);
102 
103  data_type *m_data; // data storage. access is atomic
104  atomic_flag_type *m_blocked_cursors; // is_blocked flag; when producing new data, there is a time window between
105  // cursor increment and until data is copied. block flag will tell when
106  // produce is completed.
107  atomic_cursor_type m_produce_cursor; // cursor for produce position
108  atomic_cursor_type m_consume_cursor; // cursor for consume position
109  std::size_t m_capacity; // queue capacity
110  std::size_t m_index_mask; // mask used to compute a cursor's index in queue
111 
112 #if defined (DEBUG_LFCQ)
113  private:
114  enum class local_action;
115  struct local_event;
116 
117  public:
118  class local_history
119  {
120  public:
121  local_history ()
122  : m_cursor (0)
123  {
124  }
125 
126  inline void register_event (local_action action)
127  {
128  m_cursor = (m_cursor + 1) % LOCAL_HISTORY_SIZE;
129  m_events[m_cursor].action = action;
130  }
131 
132  inline void register_event (local_action action, cursor_type cursor)
133  {
134  register_event (action);
135  m_events[m_cursor].m_consequence.cursor_value = cursor;
136  }
137 
138  inline void register_event (local_action action, T data)
139  {
140  register_event (action);
141  m_events[m_cursor].m_consequence.data_value = data;
142  }
143  private:
144  static const std::size_t LOCAL_HISTORY_SIZE = 64;
145  local_event m_events[LOCAL_HISTORY_SIZE];
146  std::size_t m_cursor;
147  };
148 
149  // clones of produce/consume with additional debug code. caller should keep its history in thread context to be
150  // inspected on demand
151  inline bool consume_debug (T &element, local_history &my_history);
152  inline bool produce_debug (const T &element, local_history &my_history);
153  inline bool force_produce_debug (const T &element, local_history &my_history);
154 
155  private:
156 
157  enum class local_action
158  {
159  NO_ACTION,
160  LOOP_PRODUCE,
161  LOOP_CONSUME,
162  LOAD_PRODUCE_CURSOR,
163  LOAD_CONSUME_CURSOR,
164  INCREMENT_PRODUCE_CURSOR,
165  INCREMENT_CONSUME_CURSOR,
166  NOT_INCREMENT_PRODUCE_CURSOR,
167  NOT_INCREMENT_CONSUME_CURSOR,
168  BLOCKED_CURSOR,
169  NOT_BLOCKED_CURSOR,
170  UNBLOCKED_CURSOR,
171  LOADED_DATA,
172  STORED_DATA,
173  QUEUE_FULL,
174  QUEUE_EMPTY
175  };
176  struct local_event
177  {
178  local_action action;
179  union consequence
180  {
181  consequence () : cursor_value (0) {}
182 
183  cursor_type cursor_value;
184  T data_value;
185  } m_consequence;
186 
187  local_event ()
188  : action (local_action::NO_ACTION)
189  , m_consequence ()
190  {
191  }
192  };
193  static local_history m_shared_dummy_history;
194 #endif // DEBUG_LFCQ
195  };
196 
197 } // namespace lockfree
198 
199 namespace lockfree
200 {
201 
202  template<class T>
204  ((cursor_type) 1) << ((sizeof (cursor_type) * CHAR_BIT) - 1); // 0x8000...
205 
206  template<class T>
207  inline
209  m_produce_cursor (0),
210  m_consume_cursor (0)
211  {
212  m_capacity = next_pow2 (size);
213  m_index_mask = m_capacity - 1;
214  assert ((m_capacity & m_index_mask) == 0);
215 
216  m_data = new data_type[m_capacity];
218  }
219 
220  template<class T>
221  inline
223  {
224  delete [] m_data;
225  delete [] m_blocked_cursors;
226  }
227 
228  template<class T>
229  inline bool
230  circular_queue<T>::produce (const T &element)
231  {
232  cursor_type cc;
233  cursor_type pc;
234  bool did_block;
235 
236  // how this works:
237  //
238  // in systems where concurrent producing is possible, we need to avoid synchronize them. in this lock-free circular
239  // queue, that translates to each thread saving its data in a data array unique slot.
240  //
241  // current way of doing the synchronization is having a block flag for each slot in the array. when a produced wants
242  // to push its data, it must first successfully block the slot to avoid racing others to write in the slot.
243  //
244  // the produce algorithm is a loop where:
245  // 1. early out if queue is full.
246  // 2. get current produce cursor, then try block its slot, then try to increment cursor (compare & swap);
247  // incrementing is executed regardless of blocking success. that's because if block fails, then someone else
248  // blocked this slot. everyone who failed should advance to next, and incrementing cursor as fast as possible
249  // helps.
250  // 3. if block was successful, then store data to the blocked slot and unlock for next generation.
251  //
252  // the loop can be broken in two ways:
253  // 1. the queue is full and push fails
254  // 2. the producer successfully blocks a cursor
255  //
256  // NOTE: I have an implementation issue I don't know how to fix without a significant overhead. When the queue is
257  // very stressed (many threads produce/consume very often), the system may preempt a thread while it has a slot
258  // blocked and may keep it preempted for a very long time (long enough to produce and consume an entire
259  // generation). I'd like to any blocking of any kind.
260  // One possible direction is to separate data storage (and slot index) from cursor. When one wants to produce
261  // an element, it first reserves a slot in storage, adds his data, and then saves slot index/cursor in what is
262  // now m_blocked_cursors using CAS operation. if this succeeds, produced data is immediately available for
263  // consumption. any preemption would not block the queue (just delay when the produce is happening).
264  //
265  // however, finding a way to dispatch slots to producers safely and without a sensible overhead is not quite
266  // straightforward.
267  //
268 
269  while (true)
270  {
273 
274  if (test_full_cursors (pc, cc))
275  {
276  /* cannot produce */
277  return false;
278  }
279 
280  // first block position
281  did_block = block (pc);
282 
283  // make sure cursor is incremented whether I blocked it or not
285  {
286  // do nothing
287  }
288  if (did_block)
289  {
290  /* I blocked it, it is mine. I can write my data. */
291  store_data (pc, element);
292 
293  unblock (pc);
294  return true;
295  }
296  }
297  }
298 
299  template<class T>
300  inline void
302  {
303  while (!produce (element))
304  {
305  std::this_thread::yield ();
306  }
307  }
308 
309  template<class T>
310  inline std::uint64_t
312  {
313  return m_consume_cursor;
314  }
315 
316  template<class T>
317  inline bool
319  {
320  cursor_type cc;
321  cursor_type pc;
322 
323  // how consume works:
324  //
325  // condition: every produced entry must be consumed once and only once.
326  //
327  // to make sure this condition is met and consume is possible concurrently and without locks, a consume cursor is
328  // used. the entry at a certain cursor is consumed only by the thread who successfully increments the cursor by one
329  // (using compare and swap, not atomic increment). the "consumed" entry is read before the cursor update, therefore
330  // the slot is freed for further produce operations immediately after the update
331  //
332  // unlike producers, a consumer cannot block the queue if it is preempted during execution
333  //
334 
335  while (true)
336  {
339 
340  if (test_empty_cursors (pc, cc))
341  {
342  /* empty */
343  return false;
344  }
345 
346  if (is_blocked (cc))
347  {
348  // first element is not yet produced. this means we can consider the queue still empty
349  return false;
350  }
351 
352  // copy element first. however, the consume is not actually happening until cursor is successfully incremented.
353  element = load_data (cc);
354 
356  {
357  // consume is complete
358 
359  /* break loop */
360  break;
361  }
362  else
363  {
364  // consume unsuccessful
365  }
366  }
367 
368  // consume successful
369  return true;
370  }
371 
372  template<class T>
373  inline bool
375  {
377  }
378 
379  template<class T>
380  inline bool
382  {
384  }
385 
386  template<class T>
387  inline bool
389  {
390  return size () >= (m_capacity / 2);
391  }
392 
393  template<class T>
394  inline std::size_t
396  {
399 
400  if (pc <= cc)
401  {
402  return 0;
403  }
404 
405  return pc - cc;
406  }
407 
408  template<class T>
409  inline std::size_t circular_queue<T>::next_pow2 (std::size_t size) const
410  {
411  std::size_t next_pow = 1;
412  for (--size; size != 0; size /= 2)
413  {
414  next_pow *= 2;
415  }
416  return next_pow;
417  }
418 
419  template<class T>
420  inline bool circular_queue<T>::test_empty_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const
421  {
422  return produce_cursor <= consume_cursor;
423  }
424 
425  template<class T>
426  inline bool circular_queue<T>::test_full_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const
427  {
428  return consume_cursor + m_capacity <= produce_cursor;
429  }
430 
431  template<class T>
432  inline typename circular_queue<T>::cursor_type
434  {
435  return cursor.load ();
436  }
437 
438  template<class T>
439  inline bool
441  {
442  // can weak be used here? I tested, no performance difference from using one or the other
443  return cursor.compare_exchange_strong (crt_value, crt_value + 1);
444  }
445 
446  template<class T>
447  inline std::size_t
449  {
450  return cursor & m_index_mask;
451  }
452 
453  template<class T>
454  inline void
456  {
457  m_data[get_cursor_index (cursor)] = data;
458  }
459 
460  template<class T>
461  inline T
463  {
464  return m_data[get_cursor_index (consume_cursor)];
465  }
466 
467  template<class T>
468  inline bool
470  {
471  cursor_type block_val = m_blocked_cursors[get_cursor_index (cursor)].load ();
472  return flag<cursor_type>::is_flag_set (block_val, BLOCK_FLAG);
473  }
474 
475  template<class T>
476  inline bool
478  {
479  cursor_type block_val = flag<cursor_type> (cursor).set (BLOCK_FLAG).get_flags ();
480  // can weak be used here?
481  return m_blocked_cursors[get_cursor_index (cursor)].compare_exchange_strong (cursor, block_val);
482  }
483 
484  template<class T>
485  inline void
487  {
488  atomic_flag_type &ref_blocked_cursor = m_blocked_cursors[get_cursor_index (cursor)];
489  flag<cursor_type> blocked_cursor_value = ref_blocked_cursor.load ();
490 
491  assert (blocked_cursor_value.is_set (BLOCK_FLAG));
492  cursor_type nextgen_cursor = blocked_cursor_value.clear (BLOCK_FLAG).get_flags () + m_capacity;
493 
494  ref_blocked_cursor.store (nextgen_cursor);
495  }
496 
497  template<class T>
498  inline void
500  {
502  for (cursor_type cursor = 0; cursor < m_capacity; cursor++)
503  {
504  // set expected cursor values in first generation (matches index)
506  }
507  }
508 
509 #if defined (DEBUG_LFCQ)
510  template<class T>
511  inline bool
512  circular_queue<T>::produce_debug (const T &element, local_history &my_history)
513  {
514  cursor_type cc;
515  cursor_type pc;
516  bool did_block;
517 
518  while (true)
519  {
520  my_history.register_event (local_action::LOOP_PRODUCE);
521 
523  my_history.register_event (local_action::LOAD_PRODUCE_CURSOR, pc);
524 
526  my_history.register_event (local_action::LOAD_CONSUME_CURSOR, cc);
527 
528  if (test_full_cursors (pc, cc))
529  {
530  /* cannot produce */
531  my_history.register_event (local_action::QUEUE_FULL);
532  return false;
533  }
534 
535  // first block position
536  did_block = block (pc);
537  my_history.register_event (did_block ? local_action::BLOCKED_CURSOR : local_action::NOT_BLOCKED_CURSOR, pc);
538 
539  // make sure cursor is incremented whether I blocked it or not
541  {
542  // do nothing
543  my_history.register_event (local_action::INCREMENT_PRODUCE_CURSOR, pc);
544  }
545  else
546  {
547  my_history.register_event (local_action::NOT_INCREMENT_PRODUCE_CURSOR, pc);
548  }
549  if (did_block)
550  {
551  /* I blocked it, it is mine. I can write my data. */
552  store_data (pc, element);
553  my_history.register_event (local_action::STORED_DATA, element);
554 
555  unblock (pc);
556  my_history.register_event (local_action::UNBLOCKED_CURSOR, pc);
557  return true;
558  }
559  }
560  }
561 
562  template<class T>
563  inline bool circular_queue<T>::force_produce_debug (const T &element, local_history &my_history)
564  {
565  while (!produce_debug (element, my_history))
566  {
567  std::this_thread::yield ();
568  }
569  }
570 
571  template<class T>
572  inline bool circular_queue<T>::consume_debug (T &element, local_history &my_history)
573  {
574  cursor_type cc;
575  cursor_type pc;
576 
577  while (true)
578  {
579  my_history.register_event (local_action::LOOP_CONSUME);
580 
582  my_history.register_event (local_action::LOAD_CONSUME_CURSOR, cc);
583 
585  my_history.register_event (local_action::LOAD_PRODUCE_CURSOR, pc);
586 
587  if (pc <= cc)
588  {
589  /* empty */
590  my_history.register_event (local_action::QUEUE_EMPTY);
591  return false;
592  }
593 
594  if (is_blocked (cc))
595  {
596  // first element is not yet produced. this means we can consider the queue still empty
597  my_history.register_event (local_action::QUEUE_EMPTY);
598  return false;
599  }
600 
601  // copy element first. however, the consume is not actually happening until cursor is successfully incremented.
602  element = load_data (cc);
603  my_history.register_event (local_action::LOADED_DATA, element);
604 
606  {
607  // consume is complete
608  my_history.register_event (local_action::INCREMENT_CONSUME_CURSOR, cc);
609 
610  /* break loop */
611  break;
612  }
613  else
614  {
615  // consume unsuccessful
616  my_history.register_event (local_action::NOT_INCREMENT_CONSUME_CURSOR, cc);
617  }
618  }
619 
620  // consume successful
621  return true;
622  }
623 #endif // DEBUG_LFCQ
624 
625 } // namespace lockfree
626 
627 #endif // _LOCKFREE_CIRCULAR_QUEUE_HPP_
std::atomic< cursor_type > atomic_cursor_type
void force_produce(const T &element)
bool test_empty_cursors(cursor_type produce_cursor, cursor_type consume_cursor) const
bool test_and_increment_cursor(atomic_cursor_type &cursor, cursor_type crt_value)
void store_data(cursor_type index, const T &data)
#define assert(x)
flag & clear(const T &flags)
Definition: base_flag.hpp:100
bool is_blocked(cursor_type cursor) const
void unblock(cursor_type cursor)
cursor_type load_cursor(atomic_cursor_type &cursor)
bool test_full_cursors(cursor_type produce_cursor, cursor_type consume_cursor) const
bool block(cursor_type cursor)
static const cursor_type BLOCK_FLAG
std::size_t get_cursor_index(cursor_type cursor) const
static bool is_flag_set(const T &where_to_check, const T &what_to_check)
Definition: base_flag.hpp:149
T load_data(cursor_type consume_cursor) const
std::size_t next_pow2(std::size_t size) const
bool is_set(const T &flags)
Definition: base_flag.hpp:108
T get_flags(void)
Definition: base_flag.hpp:128