Skip to content

File lockfree_circular_queue.hpp

File List > base > lockfree_circular_queue.hpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * lockfree_circular_queue.hpp : Lock-free structures interface.
 */

#ifndef _LOCKFREE_CIRCULAR_QUEUE_HPP_
#define _LOCKFREE_CIRCULAR_QUEUE_HPP_

#include "base_flag.hpp"

#include <atomic>
#include <cassert>
#include <climits>
#include <cstddef>
#include <cstdint>
#include <thread>
#include <type_traits>

// activate preprocessor if you need to debug low-level execution of lockfree circular queue
// note that if you wanna track the execution history of low-level operation of a certain queue, you also need to
// replace produce/consume functions with produce_debug/consume_debug
//
// #define DEBUG_LFCQ

namespace lockfree
{

  template <class T>
  class circular_queue
  {
    public:
      using cursor_type = std::uint64_t;
      using atomic_cursor_type = std::atomic<cursor_type>;
      using data_type = T;
      using atomic_flag_type = atomic_cursor_type;

      circular_queue (std::size_t size);
      ~circular_queue ();

      inline bool is_empty () const;              // is query empty? use it for early outs but don't rely on the answer
      inline bool is_full () const;               // is query full? use it for early outs but don't rely on the answer
      inline bool is_half_full ();
      inline std::size_t size ();

      inline bool consume (T &element);               // consume one element from queue; returns false on fail
      // IMPORTANT!
      //   Element argument may change even if consume fails.
      //   Using its value after failed consumption is not safe.
      inline bool produce (const T &element);         // produce an element to queue; returns false on fail
      inline void force_produce (const T &element);   // force produce (loop until successful)

      inline std::uint64_t get_consumer_cursor ();            // get consume cursor

      // note:
      //
      //    above functions are cloned by debug counterparts which track a history of executed low-level operations.
      //    they were not cloned initially, but the code became unreadable with all the #ifdef DEBUG_LFCQ/#endif
      //    preprocessor directives.
      //    therefore, if you update the code for any of these functions, please make sure the debug counterparts are
      //    updated too.

    private:
      static const cursor_type BLOCK_FLAG;

      // block default and copy constructors
      circular_queue ();
      circular_queue (const circular_queue &);

      inline std::size_t next_pow2 (std::size_t size) const;

      inline bool test_empty_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const;
      inline bool test_full_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const;

      inline cursor_type load_cursor (atomic_cursor_type &cursor);
      inline bool test_and_increment_cursor (atomic_cursor_type &cursor, cursor_type crt_value);

      inline T load_data (cursor_type consume_cursor) const;
      inline void store_data (cursor_type index, const T &data);
      inline std::size_t get_cursor_index (cursor_type cursor) const;

      inline bool is_blocked (cursor_type cursor) const;
      inline bool block (cursor_type cursor);
      inline void unblock (cursor_type cursor);
      inline void init_blocked_cursors (void);

      data_type *m_data;   // data storage. access is atomic
      atomic_flag_type *m_blocked_cursors;  // is_blocked flag; when producing new data, there is a time window between
      // cursor increment and until data is copied. block flag will tell when
      // produce is completed.
      atomic_cursor_type m_produce_cursor;  // cursor for produce position
      atomic_cursor_type m_consume_cursor;  // cursor for consume position
      std::size_t m_capacity;                    // queue capacity
      std::size_t m_index_mask;                  // mask used to compute a cursor's index in queue

#if defined (DEBUG_LFCQ)
    private:
      enum class local_action;
      struct local_event;

    public:
      class local_history
      {
    public:
      local_history ()
        : m_cursor (0)
      {
      }

      inline void register_event (local_action action)
      {
        m_cursor = (m_cursor + 1) % LOCAL_HISTORY_SIZE;
        m_events[m_cursor].action = action;
      }

      inline void register_event (local_action action, cursor_type cursor)
      {
        register_event (action);
        m_events[m_cursor].m_consequence.cursor_value = cursor;
      }

      inline void register_event (local_action action, T data)
      {
        register_event (action);
        m_events[m_cursor].m_consequence.data_value = data;
      }
    private:
      static const std::size_t LOCAL_HISTORY_SIZE = 64;
      local_event m_events[LOCAL_HISTORY_SIZE];
      std::size_t m_cursor;
      };

      // clones of produce/consume with additional debug code. caller should keep its history in thread context to be
      // inspected on demand
      inline bool consume_debug (T &element, local_history &my_history);
      inline bool produce_debug (const T &element, local_history &my_history);
      inline bool force_produce_debug (const T &element, local_history &my_history);

    private:

      enum class local_action
      {
    NO_ACTION,
    LOOP_PRODUCE,
    LOOP_CONSUME,
    LOAD_PRODUCE_CURSOR,
    LOAD_CONSUME_CURSOR,
    INCREMENT_PRODUCE_CURSOR,
    INCREMENT_CONSUME_CURSOR,
    NOT_INCREMENT_PRODUCE_CURSOR,
    NOT_INCREMENT_CONSUME_CURSOR,
    BLOCKED_CURSOR,
    NOT_BLOCKED_CURSOR,
    UNBLOCKED_CURSOR,
    LOADED_DATA,
    STORED_DATA,
    QUEUE_FULL,
    QUEUE_EMPTY
      };
      struct local_event
      {
    local_action action;
    union consequence
    {
      consequence () : cursor_value (0) {}

      cursor_type cursor_value;
      T data_value;
    } m_consequence;

    local_event ()
      : action (local_action::NO_ACTION)
      , m_consequence ()
    {
    }
      };
      static local_history m_shared_dummy_history;
#endif // DEBUG_LFCQ
  };

} // namespace lockfree

namespace lockfree
{

  template<class T>
  typename circular_queue<T>::cursor_type const circular_queue<T>::BLOCK_FLAG =
      ((cursor_type) 1) << ((sizeof (cursor_type) * CHAR_BIT) - 1);         // 0x8000...

  template<class T>
  inline
  circular_queue<T>::circular_queue (std::size_t size) :
    m_produce_cursor (0),
    m_consume_cursor (0)
  {
    m_capacity = next_pow2 (size);
    m_index_mask = m_capacity - 1;
    assert ((m_capacity & m_index_mask) == 0);

    m_data = new data_type[m_capacity];
    init_blocked_cursors ();
  }

  template<class T>
  inline
  circular_queue<T>::~circular_queue ()
  {
    delete [] m_data;
    delete [] m_blocked_cursors;
  }

  template<class T>
  inline bool
  circular_queue<T>::produce (const T &element)
  {
    cursor_type cc;
    cursor_type pc;
    bool did_block;

    // how this works:
    //
    // in systems where concurrent producing is possible, we need to avoid synchronize them. in this lock-free circular
    // queue, that translates to each thread saving its data in a data array unique slot.
    //
    //  current way of doing the synchronization is having a block flag for each slot in the array. when a produced wants
    //  to push its data, it must first successfully block the slot to avoid racing others to write in the slot.
    //
    //  the produce algorithm is a loop where:
    //  1. early out if queue is full.
    //  2. get current produce cursor, then try block its slot, then try to increment cursor (compare & swap);
    //     incrementing is executed regardless of blocking success. that's because if block fails, then someone else
    //     blocked this slot. everyone who failed should advance to next, and incrementing cursor as fast as possible
    //     helps.
    //  3. if block was successful, then store data to the blocked slot and unlock for next generation.
    //
    //  the loop can be broken in two ways:
    //  1. the queue is full and push fails
    //  2. the producer successfully blocks a cursor
    //
    // NOTE: I have an implementation issue I don't know how to fix without a significant overhead. When the queue is
    //       very stressed (many threads produce/consume very often), the system may preempt a thread while it has a slot
    //       blocked and may keep it preempted for a very long time (long enough to produce and consume an entire
    //       generation). I'd like to any blocking of any kind.
    //       One possible direction is to separate data storage (and slot index) from cursor. When one wants to produce
    //       an element, it first reserves a slot in storage, adds his data, and then saves slot index/cursor in what is
    //       now m_blocked_cursors using CAS operation. if this succeeds, produced data is immediately available for
    //       consumption. any preemption would not block the queue (just delay when the produce is happening).
    //
    //       however, finding a way to dispatch slots to producers safely and without a sensible overhead is not quite
    //       straightforward.
    //

    while (true)
      {
    pc = load_cursor (m_produce_cursor);
    cc = load_cursor (m_consume_cursor);

    if (test_full_cursors (pc, cc))
      {
        /* cannot produce */
        return false;
      }

    // first block position
    did_block = block (pc);

    // make sure cursor is incremented whether I blocked it or not
    if (test_and_increment_cursor (m_produce_cursor, pc))
      {
        // do nothing
      }
    if (did_block)
      {
        /* I blocked it, it is mine. I can write my data. */
        store_data (pc, element);

        unblock (pc);
        return true;
      }
      }
  }

  template<class T>
  inline void
  circular_queue<T>::force_produce (const T &element)
  {
    while (!produce (element))
      {
    std::this_thread::yield ();
      }
  }

  template<class T>
  inline std::uint64_t
  circular_queue<T>::get_consumer_cursor ()
  {
    return m_consume_cursor;
  }

  template<class T>
  inline bool
  circular_queue<T>::consume (T &element)
  {
    cursor_type cc;
    cursor_type pc;

    // how consume works:
    //
    // condition: every produced entry must be consumed once and only once.
    //
    // to make sure this condition is met and consume is possible concurrently and without locks, a consume cursor is
    // used. the entry at a certain cursor is consumed only by the thread who successfully increments the cursor by one
    // (using compare and swap, not atomic increment). the "consumed" entry is read before the cursor update, therefore
    // the slot is freed for further produce operations immediately after the update
    //
    // unlike producers, a consumer cannot block the queue if it is preempted during execution
    //

    while (true)
      {
    cc = load_cursor (m_consume_cursor);
    pc = load_cursor (m_produce_cursor);

    if (test_empty_cursors (pc, cc))
      {
        /* empty */
        return false;
      }

    if (is_blocked (cc))
      {
        // first element is not yet produced. this means we can consider the queue still empty
        return false;
      }

    // copy element first. however, the consume is not actually happening until cursor is successfully incremented.
    element = load_data (cc);

    if (test_and_increment_cursor (m_consume_cursor, cc))
      {
        // consume is complete

        /* break loop */
        break;
      }
    else
      {
        // consume unsuccessful
      }
      }

    // consume successful
    return true;
  }

  template<class T>
  inline bool
  circular_queue<T>::is_empty () const
  {
    return test_empty_cursors (m_produce_cursor, m_consume_cursor);
  }

  template<class T>
  inline bool
  circular_queue<T>::is_full () const
  {
    return test_full_cursors (m_produce_cursor, m_consume_cursor);
  }

  template<class T>
  inline bool
  circular_queue<T>::is_half_full ()
  {
    return size () >= (m_capacity / 2);
  }

  template<class T>
  inline std::size_t
  circular_queue<T>::size ()
  {
    cursor_type cc = load_cursor (m_consume_cursor);
    cursor_type pc = load_cursor (m_produce_cursor);

    if (pc <= cc)
      {
    return 0;
      }

    return pc - cc;
  }

  template<class T>
  inline std::size_t circular_queue<T>::next_pow2 (std::size_t size) const
  {
    std::size_t next_pow = 1;
    for (--size; size != 0; size /= 2)
      {
    next_pow *= 2;
      }
    return next_pow;
  }

  template<class T>
  inline bool circular_queue<T>::test_empty_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const
  {
    return produce_cursor <= consume_cursor;
  }

  template<class T>
  inline bool circular_queue<T>::test_full_cursors (cursor_type produce_cursor, cursor_type consume_cursor) const
  {
    return consume_cursor + m_capacity <= produce_cursor;
  }

  template<class T>
  inline typename circular_queue<T>::cursor_type
  circular_queue<T>::load_cursor (atomic_cursor_type &cursor)
  {
    return cursor.load ();
  }

  template<class T>
  inline bool
  circular_queue<T>::test_and_increment_cursor (atomic_cursor_type &cursor, cursor_type crt_value)
  {
    // can weak be used here? I tested, no performance difference from using one or the other
    return cursor.compare_exchange_strong (crt_value, crt_value + 1);
  }

  template<class T>
  inline std::size_t
  circular_queue<T>::get_cursor_index (cursor_type cursor) const
  {
    return cursor & m_index_mask;
  }

  template<class T>
  inline void
  circular_queue<T>::store_data (cursor_type cursor, const T &data)
  {
    m_data[get_cursor_index (cursor)] = data;
  }

  template<class T>
  inline T
  circular_queue<T>::load_data (cursor_type consume_cursor) const
  {
    return m_data[get_cursor_index (consume_cursor)];
  }

  template<class T>
  inline bool
  circular_queue<T>::is_blocked (cursor_type cursor) const
  {
    cursor_type block_val = m_blocked_cursors[get_cursor_index (cursor)].load ();
    return flag<cursor_type>::is_flag_set (block_val, BLOCK_FLAG);
  }

  template<class T>
  inline bool
  circular_queue<T>::block (cursor_type cursor)
  {
    cursor_type block_val = flag<cursor_type> (cursor).set (BLOCK_FLAG).get_flags ();
    // can weak be used here?
    return m_blocked_cursors[get_cursor_index (cursor)].compare_exchange_strong (cursor, block_val);
  }

  template<class T>
  inline void
  circular_queue<T>::unblock (cursor_type cursor)
  {
    atomic_flag_type &ref_blocked_cursor = m_blocked_cursors[get_cursor_index (cursor)];
    flag<cursor_type> blocked_cursor_value = ref_blocked_cursor.load ();

    assert (blocked_cursor_value.is_set (BLOCK_FLAG));
    cursor_type nextgen_cursor = blocked_cursor_value.clear (BLOCK_FLAG).get_flags () + m_capacity;

    ref_blocked_cursor.store (nextgen_cursor);
  }

  template<class T>
  inline void
  circular_queue<T>::init_blocked_cursors (void)
  {
    m_blocked_cursors = new atomic_flag_type [m_capacity];
    for (cursor_type cursor = 0; cursor < m_capacity; cursor++)
      {
    // set expected cursor values in first generation (matches index)
    m_blocked_cursors[cursor] = cursor;
      }
  }

#if defined (DEBUG_LFCQ)
  template<class T>
  inline bool
  circular_queue<T>::produce_debug (const T &element, local_history &my_history)
  {
    cursor_type cc;
    cursor_type pc;
    bool did_block;

    while (true)
      {
    my_history.register_event (local_action::LOOP_PRODUCE);

    pc = load_cursor (m_produce_cursor);
    my_history.register_event (local_action::LOAD_PRODUCE_CURSOR, pc);

    cc = load_cursor (m_consume_cursor);
    my_history.register_event (local_action::LOAD_CONSUME_CURSOR, cc);

    if (test_full_cursors (pc, cc))
      {
        /* cannot produce */
        my_history.register_event (local_action::QUEUE_FULL);
        return false;
      }

    // first block position
    did_block = block (pc);
    my_history.register_event (did_block ? local_action::BLOCKED_CURSOR : local_action::NOT_BLOCKED_CURSOR, pc);

    // make sure cursor is incremented whether I blocked it or not
    if (test_and_increment_cursor (m_produce_cursor, pc))
      {
        // do nothing
        my_history.register_event (local_action::INCREMENT_PRODUCE_CURSOR, pc);
      }
    else
      {
        my_history.register_event (local_action::NOT_INCREMENT_PRODUCE_CURSOR, pc);
      }
    if (did_block)
      {
        /* I blocked it, it is mine. I can write my data. */
        store_data (pc, element);
        my_history.register_event (local_action::STORED_DATA, element);

        unblock (pc);
        my_history.register_event (local_action::UNBLOCKED_CURSOR, pc);
        return true;
      }
      }
  }

  template<class T>
  inline bool circular_queue<T>::force_produce_debug (const T &element, local_history &my_history)
  {
    while (!produce_debug (element, my_history))
      {
    std::this_thread::yield ();
      }
  }

  template<class T>
  inline bool circular_queue<T>::consume_debug (T &element, local_history &my_history)
  {
    cursor_type cc;
    cursor_type pc;

    while (true)
      {
    my_history.register_event (local_action::LOOP_CONSUME);

    cc = load_cursor (m_consume_cursor);
    my_history.register_event (local_action::LOAD_CONSUME_CURSOR, cc);

    pc = load_cursor (m_produce_cursor);
    my_history.register_event (local_action::LOAD_PRODUCE_CURSOR, pc);

    if (pc <= cc)
      {
        /* empty */
        my_history.register_event (local_action::QUEUE_EMPTY);
        return false;
      }

    if (is_blocked (cc))
      {
        // first element is not yet produced. this means we can consider the queue still empty
        my_history.register_event (local_action::QUEUE_EMPTY);
        return false;
      }

    // copy element first. however, the consume is not actually happening until cursor is successfully incremented.
    element = load_data (cc);
    my_history.register_event (local_action::LOADED_DATA, element);

    if (test_and_increment_cursor (m_consume_cursor, cc))
      {
        // consume is complete
        my_history.register_event (local_action::INCREMENT_CONSUME_CURSOR, cc);

        /* break loop */
        break;
      }
    else
      {
        // consume unsuccessful
        my_history.register_event (local_action::NOT_INCREMENT_CONSUME_CURSOR, cc);
      }
      }

    // consume successful
    return true;
  }
#endif // DEBUG_LFCQ

}  // namespace lockfree

#endif // _LOCKFREE_CIRCULAR_QUEUE_HPP_