Skip to content

File lock_free.c

File List > base > lock_free.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * lock_free.c : Lock-free structures.
 */

#include "config.h"

#include <assert.h>

#include "porting.h"
#include "lock_free.h"
#include "error_manager.h"
#include "error_code.h"
#include "memory_alloc.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#if !defined(SERVER_MODE)
#define pthread_mutex_init(a, b)
#define pthread_mutex_destroy(a)
#define pthread_mutex_lock(a)   0
#define pthread_mutex_trylock(a)   0
#define pthread_mutex_unlock(a)
static int rv;
#endif /* not SERVER_MODE */

/*
 * Global lock free transaction systems systems
 */
LF_TRAN_SYSTEM spage_saving_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM obj_lock_res_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM obj_lock_ent_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM catalog_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM sessions_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM free_sort_list_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM global_unique_stats_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM hfid_table_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM xcache_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM fpcache_Ts = LF_TRAN_SYSTEM_INITIALIZER;
LF_TRAN_SYSTEM dwb_slots_Ts = LF_TRAN_SYSTEM_INITIALIZER;

static bool tran_systems_initialized = false;

/*
 * Macro definitions
 */
#define OF_GET_REF(p,o)     (void * volatile *) (((char *)(p)) + (o))
#define OF_GET_PTR(p,o)     (void *) (((char *)(p)) + (o))
#define OF_GET_PTR_DEREF(p,o)   (*OF_GET_REF (p,o))

/*
 * Address mark macros
 */
#define ADDR_HAS_MARK(p)    (((long long volatile) (p)) & 0x1)
#define ADDR_WITH_MARK(p)   ((void * volatile) (((long long volatile) (p)) | 0x1))
#define ADDR_STRIP_MARK(p)  ((void * volatile) (((long long volatile) (p)) & (~((long long) 0x1))))

// Suppress -Wignored-qualifiers GCC warnings generated by address mark macros
#if defined (__GNUC__)
#pragma GCC diagnostic ignored "-Wignored-qualifiers"
#endif
#if defined (WINDOWS)
#pragma warning (disable : 4197)
#endif

static INT64 lf_hash_size = 0;

static INT64 lf_inserts = 0;
static INT64 lf_inserts_restart = 0;
static INT64 lf_list_inserts = 0;
static INT64 lf_list_inserts_found = 0;
static INT64 lf_list_inserts_save_temp_1 = 0;
static INT64 lf_list_inserts_save_temp_2 = 0;
static INT64 lf_list_inserts_claim = 0;
static INT64 lf_list_inserts_fail_link = 0;
static INT64 lf_list_inserts_success_link = 0;

static INT64 lf_deletes = 0;
static INT64 lf_deletes_restart = 0;
static INT64 lf_list_deletes = 0;
static INT64 lf_list_deletes_found = 0;
static INT64 lf_list_deletes_fail_mark_next = 0;
static INT64 lf_list_deletes_fail_unlink = 0;
static INT64 lf_list_deletes_success_unlink = 0;
static INT64 lf_list_deletes_not_found = 0;
static INT64 lf_list_deletes_not_match = 0;

static INT64 lf_clears = 0;

static INT64 lf_retires = 0;
static INT64 lf_claims = 0;
static INT64 lf_claims_temp = 0;
static INT64 lf_transports = 0;
static INT64 lf_temps = 0;

#if defined (UNITTEST_LF)
#define LF_UNITTEST_INC(lf_stat, incval) ATOMIC_INC_64 (lf_stat, incval)
#else /* !UNITTEST_LF */
#define LF_UNITTEST_INC(lf_stat, incval)
#endif

#if defined (UNITTEST_LF) || defined (UNITTEST_CQ)
#if defined (NDEBUG)
/* Abort when calling assert even if it is not debug */
// *INDENT-OFF*
# if defined assert
#   undef assert
# endif
# if defined assert_release
#   undef assert_release
# endif
// *INDENT-ON*
#define assert(cond) if (!(cond)) abort ()
#define assert_release(cond) if (!(cond)) abort ()
#endif /* NDEBUG */
#endif /* UNITTEST_LF || UNITTEST_CQ */

static int lf_list_insert_internal (LF_TRAN_ENTRY * tran, void **list_p, void *key, int *behavior_flags,
                    LF_ENTRY_DESCRIPTOR * edesc, LF_FREELIST * freelist, void **entry, int *inserted);
static int lf_hash_insert_internal (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, int bflags, void **entry,
                    int *inserted);
static int lf_hash_delete_internal (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, void *locked_entry,
                    int bflags, int *success);

/*
 * lf_callback_vpid_hash () - hash a VPID
 *   returns: hash value
 *   vpid(in): VPID to hash
 *   htsize(in): hash table size
 */
unsigned int
lf_callback_vpid_hash (void *vpid, int htsize)
{
  VPID *lvpid = (VPID *) vpid;

  return ((lvpid->pageid | ((unsigned int) lvpid->volid) << 24) % htsize);
}

/*
 * lf_callback_vpid_compare () - compare two vpids
 *   returns: 0 if equal, non-zero otherwise
 *   vpid_1(in): first VPID
 *   vpid_2(in): second vpid
 */
int
lf_callback_vpid_compare (void *vpid_1, void *vpid_2)
{
  VPID *lvpid_1 = (VPID *) vpid_1;
  VPID *lvpid_2 = (VPID *) vpid_2;

  return !((lvpid_1->pageid == lvpid_2->pageid) && (lvpid_1->volid == lvpid_2->volid));
}

/*
 * lf_callback_vpid_copy () - copy a vpid
 *   returns: error code or NO_ERROR
 *   src(in): source VPID
 *   dest(in): destination to copy to
 */
int
lf_callback_vpid_copy (void *src, void *dest)
{
  ((VPID *) dest)->pageid = ((VPID *) src)->pageid;
  ((VPID *) dest)->volid = ((VPID *) src)->volid;

  return NO_ERROR;
}

/*
 * lf_tran_system_init () - initialize a transaction system
 *   returns: error code or NO_ERROR
 *   sys(in): tran system to initialize
 *   max_threads(in): maximum number of threads that will use this system
 */
int
lf_tran_system_init (LF_TRAN_SYSTEM * sys, int max_threads)
{
  int i;
  int error = NO_ERROR;

  assert (sys != NULL);

  sys->entry_count = LF_BITMAP_COUNT_ALIGN (max_threads);
  sys->lf_bitmap.init (LF_BITMAP_ONE_CHUNK, sys->entry_count, LF_BITMAP_FULL_USAGE_RATIO);

  /* initialize entry array */
  sys->entries = (LF_TRAN_ENTRY *) malloc (sizeof (LF_TRAN_ENTRY) * sys->entry_count);
  if (sys->entries == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (LF_TRAN_ENTRY) * sys->entry_count);

      sys->entry_count = 0;
      sys->lf_bitmap.destroy ();
      return ER_FAILED;
    }

  memset (sys->entries, 0, sys->entry_count * sizeof (LF_TRAN_ENTRY));

  for (i = 0; i < sys->entry_count; i++)
    {
      sys->entries[i].tran_system = sys;
      sys->entries[i].entry_idx = i;
      sys->entries[i].transaction_id = LF_NULL_TRANSACTION_ID;
      sys->entries[i].did_incr = false;
      sys->entries[i].last_cleanup_id = 0;
      sys->entries[i].retired_list = NULL;
      sys->entries[i].temp_entry = NULL;

#if defined (UNITTEST_LF)
      sys->entries[i].locked_mutex = NULL;
      sys->entries[i].locked_mutex_line = -1;
#endif /* UNITTEST_LF */
    }

  sys->used_entry_count = 0;
  sys->global_transaction_id = 0;
  sys->min_active_transaction_id = 0;
  sys->mati_refresh_interval = 100;

  return NO_ERROR;
}

/*
 * lf_tran_system_destroy () - destroy a tran system
 *   sys(in): tran system
 *
 * NOTE: If (edesc == NULL) then there may be some memory leaks. Make sure the
 * function is called with NULL edesc only at shutdown, or after collecting all
 * retired entries.
 */
void
lf_tran_system_destroy (LF_TRAN_SYSTEM * sys)
{
  int i;

  assert (sys != NULL);

  if (sys->entry_desc != NULL)
    {
      for (i = 0; i < sys->entry_count; i++)
    {
      lf_tran_destroy_entry (&sys->entries[i]);
    }
    }
  sys->entry_count = 0;

  if (sys->entries != NULL)
    {
      free_and_init (sys->entries);
    }

  sys->lf_bitmap.destroy ();

  return;
}

/*
 * lf_dtran_request_entry () - request a tran "entry"
 *   returns: entry or NULL on error
 *   sys(in): tran system
 */
LF_TRAN_ENTRY *
lf_tran_request_entry (LF_TRAN_SYSTEM * sys)
{
  LF_TRAN_ENTRY *entry;
  int entry_idx;

  assert (sys != NULL);
  assert (sys->entry_count > 0);
  assert (sys->entries != NULL);

  entry_idx = sys->lf_bitmap.get_entry ();
  if (entry_idx < 0)
    {
      assert (false);
      return NULL;
    }

  /* done; clear and serve */
  ATOMIC_INC_32 (&sys->used_entry_count, 1);
  entry = &sys->entries[entry_idx];

  assert (entry->transaction_id == LF_NULL_TRANSACTION_ID);

#if defined (UNITTEST_LF)
  entry->locked_mutex = NULL;
  entry->locked_mutex_line = -1;
#endif /* UNITTEST_LF */

  return entry;
}

/*
 * lf_tran_return_entry () - return a previously requested entry
 *   returns: error code or NO_ERROR
 *   sys(in): tran system
 *   entry(in): tran entry
 *
 * NOTE: Only entries requested from this system should be returned.
 */
void
lf_tran_return_entry (LF_TRAN_ENTRY * entry)
{
  LF_TRAN_SYSTEM *sys;

  assert (entry != NULL);
  assert (entry->entry_idx >= 0);
  assert (entry->transaction_id == LF_NULL_TRANSACTION_ID);
  assert (entry->tran_system != NULL);

  /* fetch system */
  sys = entry->tran_system;

  /* clear bitfield so slot may be reused */
  sys->lf_bitmap.free_entry (entry->entry_idx);

  /* decrement use counter */
  ATOMIC_INC_32 (&sys->used_entry_count, -1);
}

/*
 * lf_tran_destroy_entry () - destroy a tran entry
 **  return : NULL
 *   entry(in): tran entry
 */

void
lf_tran_destroy_entry (LF_TRAN_ENTRY * entry)
{
  LF_ENTRY_DESCRIPTOR *edesc = NULL;

  assert (entry != NULL);
  assert (entry->tran_system != NULL);

  edesc = entry->tran_system->entry_desc;
  if (edesc != NULL)
    {
      void *curr = entry->retired_list, *next = NULL;
      while (curr != NULL)
    {
      next = (void *) OF_GET_PTR_DEREF (curr, edesc->of_local_next);
      if (edesc->f_uninit != NULL)
        {
          edesc->f_uninit (curr);
        }
      edesc->f_free (curr);
      curr = next;
    }

      entry->retired_list = NULL;
    }
}

/*
 * lf_dtran_compute_minimum_delete_id () - compute minimum delete id of all
 *                     used entries of a tran system
 *   return: error code or NO_ERROR
 *   sys(in): tran system
 */
void
lf_tran_compute_minimum_transaction_id (LF_TRAN_SYSTEM * sys)
{
  UINT64 minvalue = LF_NULL_TRANSACTION_ID;
  int i, j;

  /* determine minimum value of all min_cdi fields in system entries */
  for (i = 0; i < sys->entry_count / LF_BITFIELD_WORD_SIZE; i++)
    {
      if (sys->lf_bitmap.bitfield[i])
    {
      for (j = 0; j < LF_BITFIELD_WORD_SIZE; j++)
        {
          int pos = i * LF_BITFIELD_WORD_SIZE + j;
          UINT64 fetch = sys->entries[pos].transaction_id;

          if (minvalue > fetch)
        {
          minvalue = fetch;
        }
        }
    }
    }

  /* store new minimum for later fetching */
  ATOMIC_TAS_64 (&sys->min_active_transaction_id, minvalue);
}

/*
 * lf_tran_start_op () - start operation in entry
 *   returns: error code or NO_ERROR
 *   entry(in): tran entry
 *   incr(in): increment global counter?
 */
void
lf_tran_start (LF_TRAN_ENTRY * entry, bool incr)
{
  LF_TRAN_SYSTEM *sys;

  assert (entry != NULL);
  assert (entry->tran_system != NULL);
  assert (entry->transaction_id == LF_NULL_TRANSACTION_ID || (!entry->did_incr && incr));

  sys = entry->tran_system;

  if (incr && !entry->did_incr)
    {
      entry->transaction_id = ATOMIC_INC_64 (&sys->global_transaction_id, 1);
      entry->did_incr = true;

      if (entry->transaction_id % entry->tran_system->mati_refresh_interval == 0)
    {
      lf_tran_compute_minimum_transaction_id (entry->tran_system);
    }
    }
  else
    {
      entry->transaction_id = VOLATILE_ACCESS (sys->global_transaction_id, UINT64);
    }
}

/*
 * lf_tran_end_op () - end operation in entry
 *   returns: error code or NO_ERROR
 *   sys(in): tran system
 *   entry(in): tran entry
 */
void
lf_tran_end (LF_TRAN_ENTRY * entry)
{
  /* maximum value of domain */
  assert (entry->transaction_id != LF_NULL_TRANSACTION_ID);
  entry->transaction_id = LF_NULL_TRANSACTION_ID;
  entry->did_incr = false;
}

/*
 * lf_initialize_transaction_systems () - initialize global transaction systems
 */
int
lf_initialize_transaction_systems (int max_threads)
{
  if (tran_systems_initialized)
    {
      lf_destroy_transaction_systems ();
      /* reinitialize */
    }

  if (lf_tran_system_init (&spage_saving_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }
  if (lf_tran_system_init (&obj_lock_res_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }
  if (lf_tran_system_init (&obj_lock_ent_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }
  if (lf_tran_system_init (&catalog_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }
  if (lf_tran_system_init (&sessions_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }
  if (lf_tran_system_init (&free_sort_list_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }
  if (lf_tran_system_init (&global_unique_stats_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }

  if (lf_tran_system_init (&hfid_table_Ts, max_threads) != NO_ERROR)
    {
      goto error;
    }

  if (lf_tran_system_init (&xcache_Ts, max_threads) != NO_ERROR)
    {
      /* TODO: Could we not use an array for tran systems? */
      goto error;
    }

  if (lf_tran_system_init (&fpcache_Ts, max_threads) != NO_ERROR)
    {
      /* TODO: Could we not use an array for tran systems? */
      goto error;
    }

  if (lf_tran_system_init (&dwb_slots_Ts, max_threads) != NO_ERROR)
    {
      /* TODO: Could we not use an array for tran systems? */
      goto error;
    }

  tran_systems_initialized = true;
  return NO_ERROR;

error:
  lf_destroy_transaction_systems ();
  return ER_FAILED;
}

/*
 * lf_destroy_transaction_systems () - destroy global transaction systems
 */
void
lf_destroy_transaction_systems (void)
{
  lf_tran_system_destroy (&spage_saving_Ts);
  lf_tran_system_destroy (&obj_lock_res_Ts);
  lf_tran_system_destroy (&obj_lock_ent_Ts);
  lf_tran_system_destroy (&catalog_Ts);
  lf_tran_system_destroy (&sessions_Ts);
  lf_tran_system_destroy (&free_sort_list_Ts);
  lf_tran_system_destroy (&global_unique_stats_Ts);
  lf_tran_system_destroy (&hfid_table_Ts);
  lf_tran_system_destroy (&xcache_Ts);
  lf_tran_system_destroy (&fpcache_Ts);
  lf_tran_system_destroy (&dwb_slots_Ts);

  tran_systems_initialized = false;
}

/*
 * lf_stack_push () - push an entry on a lock free stack
 *   returns: error code or NO_ERROR
 *   top(in/out): top of stack
 *   entry(in): entry to push
 *   edesc(in): descriptor for entry
 */
int
lf_stack_push (void **top, void *entry, LF_ENTRY_DESCRIPTOR * edesc)
{
  void *rtop = NULL;
  assert (top != NULL && entry != NULL && edesc != NULL);

  do
    {
      rtop = *((void *volatile *) top);
      OF_GET_PTR_DEREF (entry, edesc->of_local_next) = rtop;
    }
  while (!ATOMIC_CAS_ADDR (top, rtop, entry));

  /* done */
  return NO_ERROR;
}

/*
 * lf_stack_pop () - pop an entry off a lock free stack
 *   returns: entry or NULL on empty stack
 *   top(in/out): top of stack
 *   edesc(in): descriptor for entry
 *
 * NOTE: This function is vulnerable to an ABA problem in the following
 * scenario:
 *   (1) Thread_1 executes POP and is suspended exactly after MARK
 *   (2) Thread_2 executes: t := POP, POP, PUSH (t)
 *   (3) Thread_1 is continued; CAS will succeed but prev is pointing to the
 *       wrong address
 */
void *
lf_stack_pop (void **top, LF_ENTRY_DESCRIPTOR * edesc)
{
  void *rtop = NULL, *prev = NULL;
  assert (top != NULL && edesc != NULL);

  while (true)
    {
      rtop = *top;
      if (rtop == NULL)
    {
      /* at this time the stack is empty */
      return NULL;
    }

      prev = OF_GET_PTR_DEREF (rtop, edesc->of_local_next); /* MARK */
      if (ATOMIC_CAS_ADDR (top, rtop, prev))
    {
      /* clear link */
      OF_GET_PTR_DEREF (rtop, edesc->of_local_next) = NULL;

      /* return success */
      return rtop;
    }
    }

  /* impossible case */
  assert (false);
  return NULL;
}

/*
 * lf_freelist_alloc_block () - allocate a block of entries in the freelist
 *   returns: error code or NO_ERROR
 *   freelist(in): freelist to allocate to
 */
static int
lf_freelist_alloc_block (LF_FREELIST * freelist)
{
  void *head = NULL, *tail = NULL, *new_entry = NULL, *top = NULL;
  LF_ENTRY_DESCRIPTOR *edesc;
  int i;

  assert (freelist != NULL && freelist->entry_desc != NULL);
  edesc = freelist->entry_desc;

  /* allocate a block */
  for (i = 0; i < freelist->block_size; i++)
    {
      new_entry = edesc->f_alloc ();
      if (new_entry == NULL)
    {
      /* we use a decoy size since we don't know it */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) 1);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

      /* of_prev of new entry points to tail; new entry becomes tail */
      OF_GET_PTR_DEREF (new_entry, edesc->of_local_next) = tail;
      tail = new_entry;

      /* store first entry as head */
      if (head == NULL)
    {
      head = new_entry;
    }
    }

  /* append block to freelist */
  do
    {
      top = VOLATILE_ACCESS (freelist->available, void *);
      OF_GET_PTR_DEREF (head, edesc->of_local_next) = top;
    }
  while (!ATOMIC_CAS_ADDR (&freelist->available, top, tail));

  /* increment allocated count */
  ATOMIC_INC_32 (&freelist->alloc_cnt, freelist->block_size);
  ATOMIC_INC_32 (&freelist->available_cnt, freelist->block_size);

  /* operation successful, block appended */
  return NO_ERROR;
}

/*
 * lf_freelist_init () - initialize a freelist
 *   returns: error code or NO_ERROR
 *   freelist(in): freelist to initialize
 *   initial_blocks(in): number of blocks to allocate at initialisation
 *   block_size(in): number of entries allocated in a block
 */
int
lf_freelist_init (LF_FREELIST * freelist, int initial_blocks, int block_size, LF_ENTRY_DESCRIPTOR * edesc,
          LF_TRAN_SYSTEM * tran_system)
{
  int i;

  assert (freelist != NULL && edesc != NULL);
  assert (tran_system != NULL);
  assert (initial_blocks >= 1);

  if (freelist->available != NULL)
    {
      /* already initialized */
      return NO_ERROR;
    }

  /* initialize fields */
  freelist->available = NULL;

  freelist->available_cnt = 0;
  freelist->retired_cnt = 0;
  freelist->alloc_cnt = 0;

  freelist->block_size = block_size;
  freelist->entry_desc = edesc;
  freelist->tran_system = tran_system;

  tran_system->entry_desc = edesc;

  for (i = 0; i < initial_blocks; i++)
    {
      if (lf_freelist_alloc_block (freelist) != NO_ERROR)
    {
      return ER_FAILED;
    }
    }

  /* all ok */
  return NO_ERROR;
}

/*
 * lf_freelist_destroy () - destroy the freelist
 *   freelist(in): freelist
 */
void
lf_freelist_destroy (LF_FREELIST * freelist)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  void *entry, *next;

  assert (freelist != NULL);

  if (freelist->available == NULL)
    {
      return;
    }

  edesc = freelist->entry_desc;
  entry = freelist->available;
  if (entry != NULL)
    {
      do
    {
      /* save next entry */
      next = OF_GET_PTR_DEREF (entry, edesc->of_local_next);

      /* discard current entry */
      edesc->f_free (entry);

      /* advance */
      entry = next;
    }
      while (entry != NULL);
    }

  freelist->available = NULL;
}

/*
 * lf_freelist_claim () - claim an entry from the available list
 *   returns: entry or NULL on error
 *   tran_entry(in): lock free transaction entry
 *   freelist(in): freelist to claim from
 */
void *
lf_freelist_claim (LF_TRAN_ENTRY * tran_entry, LF_FREELIST * freelist)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  void *entry;
  bool local_tran = false;

  assert (tran_entry != NULL);
  assert (freelist != NULL);
  assert (freelist->entry_desc != NULL);

  edesc = freelist->entry_desc;

  LF_UNITTEST_INC (&lf_claims, 1);

  /* first check temporary entry */
  if (tran_entry->temp_entry != NULL)
    {
      entry = tran_entry->temp_entry;
      tran_entry->temp_entry = NULL;
      OF_GET_PTR_DEREF (entry, edesc->of_next) = NULL;

      LF_UNITTEST_INC (&lf_claims_temp, 1);
      LF_UNITTEST_INC (&lf_temps, -1);
      return entry;
    }

  /* We need a transaction. Careful: we cannot increment transaction ID! */
  if (tran_entry->transaction_id == LF_NULL_TRANSACTION_ID)
    {
      local_tran = true;
      lf_tran_start_with_mb (tran_entry, false);
    }

  /* clean retired list, if possible */
  if (LF_TRAN_CLEANUP_NECESSARY (tran_entry))
    {
      if (lf_freelist_transport (tran_entry, freelist) != NO_ERROR)
    {
      if (local_tran)
        {
          lf_tran_end_with_mb (tran_entry);
        }
      return NULL;
    }
    }

  /* claim an entry */
  while (true)
    {
      /* try to get a new entry form the safe stack */
      entry = lf_stack_pop (&freelist->available, edesc);

      if (entry != NULL)
    {
      /* adjust counter */
      ATOMIC_INC_32 (&freelist->available_cnt, -1);

      if ((edesc->f_init != NULL) && (edesc->f_init (entry) != NO_ERROR))
        {
          /* can't initialize it */
          if (local_tran)
        {
          lf_tran_end_with_mb (tran_entry);
        }
          return NULL;
        }

      /* initialize next */
      OF_GET_PTR_DEREF (entry, edesc->of_next) = NULL;

      /* done! */
      if (local_tran)
        {
          lf_tran_end_with_mb (tran_entry);
        }
      return entry;
    }
      else
    {
      /* NOTE: as you can see, more than one thread can start allocating a new freelist_entry block at the same
       * time; this behavior is acceptable given that the freelist has a _low_ enough value of block_size; it sure
       * beats synchronizing the operations */
      if (lf_freelist_alloc_block (freelist) != NO_ERROR)
        {
          if (local_tran)
        {
          lf_tran_end_with_mb (tran_entry);
        }
          return NULL;
        }

      /* retry a stack pop */
      continue;
    }
    }

  /* impossible! */
  assert (false);
  if (local_tran)
    {
      lf_tran_end_with_mb (tran_entry);
    }
  return NULL;
}

/*
 * lf_freelist_retire () - retire an entry
 *   returns: error code or NO_ERROR
 *   tran_entry(in): tran entry to store local retired entries
 *   freelist(in): freelist to use
 *   entry(in): entry to retire
 */
int
lf_freelist_retire (LF_TRAN_ENTRY * tran_entry, LF_FREELIST * freelist, void *entry)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  UINT64 *tran_id;
  int ret = NO_ERROR;
  bool local_tran = false;

  assert (entry != NULL && tran_entry != NULL);
  assert (freelist != NULL);
  edesc = freelist->entry_desc;
  assert (edesc != NULL);

  /* see if local transaction is required */
  if (tran_entry->transaction_id == LF_NULL_TRANSACTION_ID)
    {
      local_tran = true;
      lf_tran_start_with_mb (tran_entry, true);
    }
  else if (!tran_entry->did_incr)
    {
      lf_tran_start_with_mb (tran_entry, true);
    }

  /* do a retired list cleanup, if possible */
  if (LF_TRAN_CLEANUP_NECESSARY (tran_entry))
    {
      if (lf_freelist_transport (tran_entry, freelist) != NO_ERROR)
    {
      return ER_FAILED;
    }
    }

  /* set transaction index of entry */
  tran_id = (UINT64 *) OF_GET_PTR (entry, edesc->of_del_tran_id);
  *tran_id = tran_entry->transaction_id;

  /* push to local list */
  OF_GET_PTR_DEREF (entry, edesc->of_local_next) = tran_entry->retired_list;
  tran_entry->retired_list = entry;

  /* for stats purposes */
  ATOMIC_INC_32 (&freelist->retired_cnt, 1);

  LF_UNITTEST_INC (&lf_retires, 1);

  /* end local transaction */
  if (local_tran)
    {
      lf_tran_end_with_mb (tran_entry);
    }

  return ret;
}

/*
 * lf_freelist_transport () - transport local retired entries to freelist
 *   returns: error code or NO_ERROR
 *   tran_entry(in): transaction entry to use
 *   freelist(in): freelist
 */
int
lf_freelist_transport (LF_TRAN_ENTRY * tran_entry, LF_FREELIST * freelist)
{
  LF_TRAN_SYSTEM *tran_system;
  LF_ENTRY_DESCRIPTOR *edesc;
  UINT64 min_tran_id = 0;
  UINT64 *del_id;
  int transported_count = 0, freed_count = 0;
  void *list = NULL, *list_next = NULL, *list_trailer = NULL;
  void *aval_first = NULL, *aval_last = NULL;
  void *old_head;

  assert (freelist != NULL);
  assert (tran_entry != NULL);
  tran_system = tran_entry->tran_system;
  assert (tran_system != NULL);
  edesc = freelist->entry_desc;
  assert (edesc != NULL);

  /* check if cleanup is actually necessary */
  min_tran_id = tran_system->min_active_transaction_id;
  if (min_tran_id <= tran_entry->last_cleanup_id)
    {
      /* nothing to do */
      return NO_ERROR;
    }

  /* walk private list and unlink old entries */
  list = tran_entry->retired_list;
  while (list != NULL)
    {
      list_next = OF_GET_PTR_DEREF (list, edesc->of_local_next);

      if (aval_first == NULL)
    {
      del_id = (UINT64 *) OF_GET_PTR (list, edesc->of_del_tran_id);
      assert (del_id != NULL);

      if (*del_id < min_tran_id)
        {
          /* uninit the reusable entry */
          if (edesc->f_uninit != NULL)
        {
          edesc->f_uninit (list);
        }

          /* free entry if alloc_cnt is greater than max_alloc_cnt */
          if (freelist->alloc_cnt > edesc->max_alloc_cnt)
        {
          assert (edesc->f_free);

          edesc->f_free (list);
          freed_count++;

          list = list_next;
          continue;
        }

          /* found first reusable entry - since list is ordered by descending transaction id, entries that follow
           * are also reusable */
          aval_first = list;
          aval_last = list;

          transported_count++;
        }
      else
        {
          /* save trailer */
          list_trailer = list;
        }
    }
      else
    {
      /* uninit the reusable entry */
      if (edesc->f_uninit != NULL)
        {
          edesc->f_uninit (list);
        }

      /* continue until we get to tail */
      aval_last = list;

      transported_count++;
    }

      list = list_next;
    }

  /* head of retired_list */
  if (list_trailer != NULL)
    {
      /* unlink found part of list */
      OF_GET_PTR_DEREF (list_trailer, edesc->of_local_next) = NULL;
    }
  else
    {
      /* whole list can be reused */
      tran_entry->retired_list = NULL;
    }

  /* link part of list to available */
  if (aval_first != NULL)
    {
      do
    {
      old_head = VOLATILE_ACCESS (freelist->available, void *);
      OF_GET_PTR_DEREF (aval_last, edesc->of_local_next) = old_head;
    }
      while (!ATOMIC_CAS_ADDR (&freelist->available, old_head, aval_first));
    }

  /* update counters */
  if (freed_count > 0 || transported_count > 0)
    {
      ATOMIC_INC_32 (&freelist->retired_cnt, -(transported_count + freed_count));
    }
  if (transported_count > 0)
    {
      ATOMIC_INC_32 (&freelist->available_cnt, transported_count);

      LF_UNITTEST_INC (&lf_transports, transported_count);
    }
  if (freed_count > 0)
    {
      ATOMIC_INC_32 (&freelist->alloc_cnt, -freed_count);
    }

  /* register cleanup */
  tran_entry->last_cleanup_id = min_tran_id;

  /* all ok */
  return NO_ERROR;
}

/*
 * lf_io_list_find () - find in an insert-only list
 *   returns: error code or NO_ERROR
 *   list_p(in): address of list head
 *   key(in): key to search for
 *   edesc(in): entry descriptor
 *   entry(out): found entry or NULL
 */
int
lf_io_list_find (void **list_p, void *key, LF_ENTRY_DESCRIPTOR * edesc, void **entry)
{
  pthread_mutex_t *entry_mutex;
  void **curr_p;
  void *curr;
  int rv;

  assert (list_p != NULL && edesc != NULL);
  assert (key != NULL && entry != NULL);

  /* by default, not found */
  (*entry) = NULL;

  curr_p = list_p;
  curr = *((void *volatile *) curr_p);

  /* search */
  while (curr != NULL)
    {
      if (edesc->f_key_cmp (key, OF_GET_PTR (curr, edesc->of_key)) == 0)
    {
      /* found! */
      if (edesc->using_mutex)
        {
          /* entry has a mutex protecting it's members; lock it */
          entry_mutex = (pthread_mutex_t *) OF_GET_PTR (curr, edesc->of_mutex);
          rv = pthread_mutex_lock (entry_mutex);
        }

      (*entry) = curr;
      return NO_ERROR;
    }

      /* advance */
      curr_p = (void **) OF_GET_REF (curr, edesc->of_next);
      curr = *((void *volatile *) curr_p);
    }

  /* all ok but not found */
  return NO_ERROR;
}

/*
 * lf_io_list_find_or_insert () - find an entry in an insert only list or insert
 *                new entry if not found
 *   returns: error code or NO_ERROR
 *   list_p(in): address of list head
 *   new_entry(in): new entry to insert if entry with same key does not exist
 *   edesc(in): entry descriptor
 *   entry(out): found entry or "new_entry" if inserted
 *
 * NOTE: key is extracted from new_entry
 */
int
lf_io_list_find_or_insert (void **list_p, void *new_entry, LF_ENTRY_DESCRIPTOR * edesc, void **entry)
{
  pthread_mutex_t *entry_mutex;
  void **curr_p;
  void *curr;
  void *key;
  int rv;

  assert (list_p != NULL && edesc != NULL);
  assert (new_entry != NULL && entry != NULL);

  /* extract key from new entry */
  key = OF_GET_PTR (new_entry, edesc->of_key);

  /* by default, not found */
  (*entry) = NULL;

restart_search:
  curr_p = list_p;
  curr = *((void *volatile *) curr_p);

  /* search */
  while (curr_p != NULL)
    {
      /* is this the droid we are looking for? */
      if (curr != NULL)
    {
      if (edesc->f_key_cmp (key, OF_GET_PTR (curr, edesc->of_key)) == 0)
        {
          /* found! */
          if (edesc->using_mutex)
        {
          /* entry has a mutex protecting it's members; lock it */
          entry_mutex = (pthread_mutex_t *) OF_GET_PTR (curr, edesc->of_mutex);
          rv = pthread_mutex_lock (entry_mutex);
        }

          (*entry) = curr;
          return NO_ERROR;
        }

      /* advance */
      curr_p = (void **) OF_GET_REF (curr, edesc->of_next);
      curr = *((void *volatile *) curr_p);
    }
      else
    {
      /* end of bucket, we must insert */
      (*entry) = new_entry;
      if (edesc->using_mutex)
        {
          /* entry has a mutex protecting it's members; lock it */
          entry_mutex = (pthread_mutex_t *) OF_GET_PTR ((*entry), edesc->of_mutex);
          rv = pthread_mutex_lock (entry_mutex);
        }

      /* attempt an add */
      if (!ATOMIC_CAS_ADDR (curr_p, (void *) NULL, (*entry)))
        {
          if (edesc->using_mutex)
        {
          /* link failed, unlock mutex */
          entry_mutex = (pthread_mutex_t *) OF_GET_PTR ((*entry), edesc->of_mutex);
          pthread_mutex_unlock (entry_mutex);
        }

          goto restart_search;
        }

      /* done! */
      return NO_ERROR;
    }
    }

  /* impossible case */
  assert (false);
  return ER_FAILED;
}

/*
 * lf_list_find () - find an entry in list
 *   returns: error code or NO_ERROR
 *   tran(in): lock free transaction system
 *   list_p(in): address of list head
 *   key(in): key to search for
 *   behavior_flags(in/out): flags that control restart behavior
 *   edesc(in): entry descriptor
 *   entry(out): entry (if found) or NULL
 */
int
lf_list_find (LF_TRAN_ENTRY * tran, void **list_p, void *key, int *behavior_flags, LF_ENTRY_DESCRIPTOR * edesc,
          void **entry)
{
  pthread_mutex_t *entry_mutex;
  void **curr_p;
  void *curr;
  int rv;

  assert (tran != NULL);
  assert (list_p != NULL && edesc != NULL);
  assert (key != NULL && entry != NULL);

  /* by default, not found */
  (*entry) = NULL;

  lf_tran_start_with_mb (tran, false);

restart_search:
  curr_p = list_p;
  curr = ADDR_STRIP_MARK (*((void *volatile *) curr_p));

  /* search */
  while (curr != NULL)
    {
      if (edesc->f_key_cmp (key, OF_GET_PTR (curr, edesc->of_key)) == 0)
    {
      /* found! */
      if (edesc->using_mutex)
        {
          /* entry has a mutex protecting it's members; lock it */
          entry_mutex = (pthread_mutex_t *) OF_GET_PTR (curr, edesc->of_mutex);
          rv = pthread_mutex_lock (entry_mutex);

          /* mutex has been locked, no need to keep transaction */
          lf_tran_end_with_mb (tran);

          if (ADDR_HAS_MARK (OF_GET_PTR_DEREF (curr, edesc->of_next)))
        {
          /* while waiting for lock, somebody else deleted the entry; restart the search */
          pthread_mutex_unlock (entry_mutex);

          if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
            {
              *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
              return NO_ERROR;
            }
          else
            {
              goto restart_search;
            }
        }
        }

      (*entry) = curr;
      return NO_ERROR;
    }

      /* advance */
      curr_p = (void **) OF_GET_REF (curr, edesc->of_next);
      curr = ADDR_STRIP_MARK (*((void *volatile *) curr_p));
    }

  /* all ok but not found */
  lf_tran_end_with_mb (tran);
  return NO_ERROR;
}

/*
 * lf_list_insert_internal () - insert an entry into latch-free list.
 *   returns: error code or NO_ERROR
 *   tran(in): lock free transaction system
 *   list_p(in): address of list head
 *   key(in): key to search for or insert
 *   behavior_flags(in/out): flags that control restart behavior
 *   edesc(in): entry descriptor
 *   freelist(in): freelist to fetch new entries from
 *   entry(in/out): found entry or inserted entry
 *   inserted(out): returns 1 if inserted, 0 if found or not inserted.
 *
 * Behavior flags:
 *
 * LF_LIST_BF_RETURN_ON_RESTART - When insert fails because last entry in bucket was deleted, if this flag is set,
 *                then the operation is restarted from here, instead of looping inside
 *                lf_list_insert_internal (as a consequence, hash key is recalculated).
 *                NOTE: Currently, this flag is always used (I must find out why).
 *
 * LF_LIST_BF_INSERT_GIVEN  - If this flag is set, the caller means to force its own entry into hash table.
 *                When the flag is not set, a new entry is claimed from freelist.
 *                NOTE: If an entry with the same key already exists, the entry given as argument is
 *                  automatically retired.
 *
 * LF_LIST_BF_FIND_OR_INSERT    - If this flag is set and an entry for the same key already exists, the existing
 *                key will be output. If the flag is not set and key exists, insert just gives up
 *                and a NULL entry is output.
 *                NOTE: insert will not give up when key exists, if edesc->f_update is provided.
 *                  a new key is generated and insert is restarted.
 */
static int
lf_list_insert_internal (LF_TRAN_ENTRY * tran, void **list_p, void *key, int *behavior_flags,
             LF_ENTRY_DESCRIPTOR * edesc, LF_FREELIST * freelist, void **entry, int *inserted)
{
  /* Macro's to avoid repeating the code (and making mistakes) */

  /* Assert used to make sure the current entry is protected by either transaction or mutex. */
#define LF_ASSERT_USE_MUTEX_OR_TRAN_STARTED() \
  assert (is_tran_started == !edesc->using_mutex); /* The transaction is started if and only if we don't use mutex */ \
  assert (!edesc->using_mutex || entry_mutex) /* If we use mutex, we have a mutex locked. */

  /* Start a transaction */
#define LF_START_TRAN() \
  if (!is_tran_started) lf_tran_start_with_mb (tran, false); is_tran_started = true
#define LF_START_TRAN_FORCE() \
  assert (!is_tran_started); lf_tran_start_with_mb (tran, false); is_tran_started = true

  /* End a transaction if started */
#define LF_END_TRAN() \
  if (is_tran_started) lf_tran_end_with_mb (tran)
  /* Force end transaction; a transaction is expected */
#define LF_END_TRAN_FORCE() \
  assert (is_tran_started); lf_tran_end_with_mb (tran); is_tran_started = false

#if defined (UNITTEST_LF)
  /* Lock current entry (using mutex is expected) */
#define LF_LOCK_ENTRY(tolock) \
  assert (tran->locked_mutex == NULL); \
  assert (edesc->using_mutex); \
  assert ((tolock) != NULL); \
  assert (entry_mutex == NULL); \
  /* entry has a mutex protecting it's members; lock it */ \
  entry_mutex = (pthread_mutex_t *) OF_GET_PTR (tolock, edesc->of_mutex); \
  tran->locked_mutex_line = __LINE__; \
  tran->locked_mutex = entry_mutex; \
  rv = pthread_mutex_lock (entry_mutex)

  /* Unlock current entry (if it was locked). */
#define LF_UNLOCK_ENTRY() \
  if (edesc->using_mutex && entry_mutex) \
    { \
      assert (tran->locked_mutex == entry_mutex); \
      tran->locked_mutex = NULL; \
      pthread_mutex_unlock (entry_mutex); \
      entry_mutex = NULL; \
    }
  /* Force unlocking current entry (it is expected to be locked). */
#define LF_UNLOCK_ENTRY_FORCE() \
  assert (edesc->using_mutex && entry_mutex != NULL); \
  assert (tran->locked_mutex == entry_mutex); \
  tran->locked_mutex = NULL; \
  pthread_mutex_unlock (entry_mutex); \
  entry_mutex = NULL
#else /* !UNITTEST_LF */
  /* Lock current entry (using mutex is expected) */
#define LF_LOCK_ENTRY(tolock) \
  assert (edesc->using_mutex); \
  assert ((tolock) != NULL); \
  assert (entry_mutex == NULL); \
  /* entry has a mutex protecting it's members; lock it */ \
  entry_mutex = (pthread_mutex_t *) OF_GET_PTR (tolock, edesc->of_mutex); \
  rv = pthread_mutex_lock (entry_mutex)

  /* Unlock current entry (if it was locked). */
#define LF_UNLOCK_ENTRY() \
  if (edesc->using_mutex && entry_mutex) \
    { \
      pthread_mutex_unlock (entry_mutex); \
      entry_mutex = NULL; \
    }
  /* Force unlocking current entry (it is expected to be locked). */
#define LF_UNLOCK_ENTRY_FORCE() \
  assert (edesc->using_mutex && entry_mutex != NULL); \
  pthread_mutex_unlock (entry_mutex); \
  entry_mutex = NULL
#endif /* !UNITTEST_LF */

  pthread_mutex_t *entry_mutex = NULL;  /* Locked entry mutex when not NULL */
  void **curr_p;
  void *curr;
  int rv;
  bool is_tran_started = false;

  assert (tran != NULL);
  assert (list_p != NULL && edesc != NULL);
  assert (key != NULL && entry != NULL);
  assert (freelist != NULL);
  assert (behavior_flags != NULL);

  assert ((*entry != NULL) == LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN));

  if (inserted != NULL)
    {
      *inserted = 0;
    }

restart_search:

  LF_UNITTEST_INC (&lf_list_inserts, 1);

  LF_START_TRAN_FORCE ();

  curr_p = list_p;
  curr = ADDR_STRIP_MARK (*((void *volatile *) curr_p));

  /* search */
  while (curr_p != NULL)
    {
      assert (is_tran_started);
      assert (entry_mutex == NULL);

      if (curr != NULL)
    {
      if (edesc->f_key_cmp (key, OF_GET_PTR (curr, edesc->of_key)) == 0)
        {
          /* found an entry with the same key. */

          LF_UNITTEST_INC (&lf_list_inserts_found, 1);

          if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN) && *entry != NULL)
        {
          /* save this for further (local) use. */
          assert (tran->temp_entry == NULL);
          tran->temp_entry = *entry;

          LF_UNITTEST_INC (&lf_list_inserts_save_temp_1, 1);
          LF_UNITTEST_INC (&lf_temps, 1);

          /* don't keep the entry around. */
          *entry = NULL;
        }

          if (edesc->using_mutex)
        {
          /* entry has a mutex protecting it's members; lock it */
          LF_LOCK_ENTRY (curr);

          /* mutex has been locked, no need to keep transaction alive */
          LF_END_TRAN_FORCE ();

          if (ADDR_HAS_MARK (OF_GET_PTR_DEREF (curr, edesc->of_next)))
            {
              /* while waiting for lock, somebody else deleted the entry; restart the search */
              LF_UNLOCK_ENTRY_FORCE ();

              if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
            {
              *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
              return NO_ERROR;
            }
              else
            {
              goto restart_search;
            }
            }
        }

          LF_ASSERT_USE_MUTEX_OR_TRAN_STARTED ();
          if (edesc->f_duplicate != NULL)
        {
          /* we have a duplicate key callback. */
          if (edesc->f_duplicate (key, curr) != NO_ERROR)
            {
              ASSERT_ERROR ();
              LF_END_TRAN ();
              LF_UNLOCK_ENTRY ();
              return NO_ERROR;
            }
#if 1
          LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
          LF_END_TRAN ();
          LF_UNLOCK_ENTRY ();
          return NO_ERROR;
#else /* !1 = 0 */
          /* Could we have such cases that we just update existing entry without modifying anything else?
           * And would it be usable with just a flag?
           * Then this code may be used.
           * So far we have only one usage for f_duplicate, which increment SESSION_ID and requires
           * restarting hash search. This will be the usual approach if f_duplicate.
           * If we just increment a counter in existing entry, we don't need to do anything else. This however
           * most likely depends on f_duplicate implementation. Maybe it is more useful to give behavior_flags
           * argument to f_duplicate to tell us if restart is or is not needed.
           */
          if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_RESTART_ON_DUPLICATE))
            {
              LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
              LF_END_TRAN ();
              LF_UNLOCK_ENTRY ();
              return NO_ERROR;
            }
          else
            {
              /* duplicate does not require restarting search. */
              if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
            {
              /* Could not be inserted. Retire the entry. */
              lf_freelist_retire (tran, freelist, *entry);
              *entry = NULL;
            }

              /* fall through to output current entry. */
            }
#endif /* 0 */
        }
          else
        {
          if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
            {
              /* the given entry could not be inserted. retire it. */
              lf_freelist_retire (tran, freelist, *entry);
              *entry = NULL;
            }

          if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_FIND_OR_INSERT))
            {
              /* found entry is not accepted */
              LF_END_TRAN ();
              LF_UNLOCK_ENTRY ();
              return NO_ERROR;
            }

          /* fall through to output current entry. */
        }

          assert (*entry == NULL);
          LF_ASSERT_USE_MUTEX_OR_TRAN_STARTED ();
          /* We don't end transaction or unlock mutex here. */
          *entry = curr;
          return NO_ERROR;
        }

      /* advance */
      curr_p = (void **) OF_GET_REF (curr, edesc->of_next);
      curr = ADDR_STRIP_MARK (*((void *volatile *) curr_p));
    }
      else
    {
      /* end of bucket, we must insert */
      if (*entry == NULL)
        {
          assert (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN));

          *entry = lf_freelist_claim (tran, freelist);
          if (*entry == NULL)
        {
          assert (false);
          LF_END_TRAN_FORCE ();
          return ER_FAILED;
        }

          LF_UNITTEST_INC (&lf_list_inserts_claim, 1);

          /* set it's key */
          if (edesc->f_key_copy (key, OF_GET_PTR (*entry, edesc->of_key)) != NO_ERROR)
        {
          assert (false);
          LF_END_TRAN_FORCE ();
          return ER_FAILED;
        }
        }

      if (edesc->using_mutex)
        {
          /* entry has a mutex protecting it's members; lock it */
          LF_LOCK_ENTRY (*entry);
        }

      /* attempt an add */
      if (!ATOMIC_CAS_ADDR (curr_p, (void *) NULL, (*entry)))
        {
          if (edesc->using_mutex)
        {
          /* link failed, unlock mutex */
          LF_UNLOCK_ENTRY_FORCE ();
        }

          LF_UNITTEST_INC (&lf_list_inserts_fail_link, 1);

          /* someone added before us, restart process */
          if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_RETURN_ON_RESTART))
        {
          if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
            {
              assert (tran->temp_entry == NULL);
              tran->temp_entry = *entry;
              *entry = NULL;

              LF_UNITTEST_INC (&lf_list_inserts_save_temp_2, 1);
              LF_UNITTEST_INC (&lf_temps, 1);
            }
          LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
          LF_END_TRAN_FORCE ();
          return NO_ERROR;
        }
          else
        {
          LF_END_TRAN_FORCE ();
          goto restart_search;
        }
        }

      LF_UNITTEST_INC (&lf_list_inserts_success_link, 1);

      /* end transaction if mutex is acquired */
      if (edesc->using_mutex)
        {
          LF_END_TRAN_FORCE ();
        }
      if (inserted)
        {
          *inserted = 1;
        }
      LF_UNITTEST_INC (&lf_hash_size, 1);

      /* done! */
      return NO_ERROR;
    }
    }

  /* impossible case */
  assert (false);
  return ER_FAILED;

#undef LF_ASSERT_USE_MUTEX_OR_TRAN_STARTED
#undef LF_START_TRAN
#undef LF_START_TRAN_FORCE
#undef LF_END_TRAN
#undef LF_END_TRAN_FORCE
#undef LF_LOCK_ENTRY
#undef LF_UNLOCK_ENTRY
#undef LF_UNLOCK_ENTRY_FORCE
}

/*
 * lf_list_delete () - delete an entry from a list
 *   returns: error code or NO_ERROR
 *   tran(in): lock free transaction system
 *   list_p(in): address of list head
 *   key(in): key to search for
 *   locked_entry(in): entry already locked.
 *   behavior_flags(in/out): flags that control restart behavior
 *   edesc(in): entry descriptor
 *   freelist(in): freelist to place deleted entries to
 *   success(out): 1 if entry was deleted, 0 otherwise
 */
int
lf_list_delete (LF_TRAN_ENTRY * tran, void **list_p, void *key, void *locked_entry, int *behavior_flags,
        LF_ENTRY_DESCRIPTOR * edesc, LF_FREELIST * freelist, int *success)
{
  /* Start a transaction */
#define LF_START_TRAN_FORCE() \
  assert (!is_tran_started); lf_tran_start_with_mb (tran, false); is_tran_started = true

  /* Promote from transaction without incremented transaction ID to transaction with incremented transaction ID. */
#define LF_PROMOTE_TRAN_FORCE() \
  assert (is_tran_started); MEMORY_BARRIER (); lf_tran_start_with_mb (tran, true)

  /* End a transaction */
  /* Force end transaction; a transaction is expected */
#define LF_END_TRAN_FORCE() \
  assert (is_tran_started); lf_tran_end_with_mb (tran); is_tran_started = false

#if defined (UNITTEST_LF)
  /* Lock current entry (using mutex is expected) */
#define LF_LOCK_ENTRY(tolock) \
  assert (edesc->using_mutex); \
  assert ((tolock) != NULL); \
  assert (entry_mutex == NULL); \
  /* entry has a mutex protecting it's members; lock it */ \
  entry_mutex = (pthread_mutex_t *) OF_GET_PTR (tolock, edesc->of_mutex); \
  assert (tran->locked_mutex == NULL); \
  tran->locked_mutex = entry_mutex; \
  tran->locked_mutex_line = __LINE__; \
  rv = pthread_mutex_lock (entry_mutex)

  /* Force unlocking current entry (it is expected to be locked). */
#define LF_UNLOCK_ENTRY_FORCE() \
  assert (edesc->using_mutex && entry_mutex != NULL); \
  assert (tran->locked_mutex == entry_mutex); \
  tran->locked_mutex = NULL; \
  pthread_mutex_unlock (entry_mutex); \
  entry_mutex = NULL
#else /* !UNITTEST_LF */
  /* Lock current entry (using mutex is expected) */
#define LF_LOCK_ENTRY(tolock) \
  assert (edesc->using_mutex); \
  assert ((tolock) != NULL); \
  assert (entry_mutex == NULL); \
  /* entry has a mutex protecting it's members; lock it */ \
  entry_mutex = (pthread_mutex_t *) OF_GET_PTR (tolock, edesc->of_mutex); \
  rv = pthread_mutex_lock (entry_mutex)

  /* Force unlocking current entry (it is expected to be locked). */
#define LF_UNLOCK_ENTRY_FORCE() \
  assert (edesc->using_mutex && entry_mutex != NULL); \
  pthread_mutex_unlock (entry_mutex); \
  entry_mutex = NULL
#endif /* !UNITTEST_LF */

  pthread_mutex_t *entry_mutex = NULL;
  void **curr_p, **next_p;
  void *curr, *next;
  int rv;
  bool is_tran_started = false;

  /* reset success flag */
  if (success != NULL)
    {
      *success = 0;
    }

  assert (list_p != NULL && edesc != NULL && key != NULL);
  assert (freelist != NULL);
  assert (tran != NULL && tran->tran_system != NULL);

restart_search:

  LF_UNITTEST_INC (&lf_list_deletes, 1);

  /* read transaction; we start a write transaction only after remove */
  LF_START_TRAN_FORCE ();

  curr_p = list_p;
  curr = ADDR_STRIP_MARK (*((void *volatile *) curr_p));

  /* search */
  while (curr != NULL)
    {
      /* is this the droid we are looking for? */
      if (edesc->f_key_cmp (key, OF_GET_PTR (curr, edesc->of_key)) == 0)
    {
      if (locked_entry != NULL && locked_entry != curr)
        {
          assert (edesc->using_mutex && !LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE));

          /* We are here because lf_hash_delete_already_locked was called. The entry found by matching key is
           * different from the entry we were trying to delete.
           * This is possible (please find the description of lf_hash_delete_already_locked). */
          LF_UNITTEST_INC (&lf_list_deletes_not_match, 1);
          LF_END_TRAN_FORCE ();
          return NO_ERROR;
        }

      /* fetch next entry */
      next_p = (void **) OF_GET_REF (curr, edesc->of_next);
      next = ADDR_STRIP_MARK (*((void *volatile *) next_p));

      LF_UNITTEST_INC (&lf_list_deletes_found, 1);

      /* set mark on next pointer; this way, if anyone else is trying to delete the next entry, it will fail */
      if (!ATOMIC_CAS_ADDR (next_p, next, ADDR_WITH_MARK (next)))
        {
          /* joke's on us, this time; somebody else marked it before */

          LF_UNITTEST_INC (&lf_list_deletes_fail_mark_next, 1);

          LF_END_TRAN_FORCE ();
          if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
        {
          *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
          assert ((*behavior_flags) & LF_LIST_BR_RESTARTED);
          return NO_ERROR;
        }
          else
        {
          goto restart_search;
        }
        }

      /* lock mutex if necessary */
      if (edesc->using_mutex)
        {
          if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE))
        {
          LF_LOCK_ENTRY (curr);
        }
          else
        {
          /* Must be already locked! */
#if defined (UNITTEST_LF)
          assert (locked_entry != NULL && locked_entry == curr);
#endif /* UNITTEST_LF */
          entry_mutex = (pthread_mutex_t *) OF_GET_PTR (curr, edesc->of_mutex);

#if defined (UNITTEST_LF)
          assert (tran->locked_mutex != NULL && tran->locked_mutex == entry_mutex);
#endif /* UNITTEST_LF */
        }

          /* since we set the mark, nobody else can delete it, so we have nothing else to check */
        }

      /* unlink */
      if (!ATOMIC_CAS_ADDR (curr_p, curr, next))
        {
          /* unlink failed; first step is to remove lock (if applicable) */
          if (edesc->using_mutex && LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE))
        {
          LF_UNLOCK_ENTRY_FORCE ();
        }

          LF_UNITTEST_INC (&lf_list_deletes_fail_unlink, 1);

          /* remove mark and restart search */
          if (!ATOMIC_CAS_ADDR (next_p, ADDR_WITH_MARK (next), next))
        {
          /* impossible case */
          assert (false);
          LF_END_TRAN_FORCE ();
          return ER_FAILED;
        }

          LF_END_TRAN_FORCE ();
          if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
        {
          *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
          assert ((*behavior_flags) & LF_LIST_BR_RESTARTED);
          return NO_ERROR;
        }
          else
        {
          goto restart_search;
        }
        }
      /* unlink successful */

      LF_UNITTEST_INC (&lf_list_deletes_success_unlink, 1);

      /* unlock mutex */
      if (edesc->using_mutex)
        {
          LF_UNLOCK_ENTRY_FORCE ();
        }

      LF_PROMOTE_TRAN_FORCE ();

      /* now we can feed the entry to the freelist and forget about it */
      if (lf_freelist_retire (tran, freelist, curr) != NO_ERROR)
        {
          assert (false);
          LF_END_TRAN_FORCE ();
          return ER_FAILED;
        }

      /* end the transaction */
      LF_END_TRAN_FORCE ();

      /* set success flag */
      if (success != NULL)
        {
          *success = 1;
        }
      LF_UNITTEST_INC (&lf_hash_size, -1);

      /* success! */
      return NO_ERROR;
    }

      /* advance */
      curr_p = (void **) OF_GET_REF (curr, edesc->of_next);
      curr = ADDR_STRIP_MARK (*((void *volatile *) curr_p));
    }

  LF_UNITTEST_INC (&lf_list_deletes_not_found, 1);

  /* search yielded no result so no delete was performed */
  LF_END_TRAN_FORCE ();
  return NO_ERROR;

#undef LF_START_TRAN_FORCE
#undef LF_PROMOTE_TRAN_FORCE
#undef LF_END_TRAN_FORCE
#undef LF_LOCK_ENTRY
#undef LF_UNLOCK_ENTRY_FORCE
}

/*
 * lf_hash_init () - initialize a lock free hash table
 *   returns: error code or NO_ERROR
 *   table(in): hash table structure to initialize
 *   freelist(in): freelist to use for entries
 *   hash_size(in): size of hash table
 *   edesc(in): entry descriptor
 */
int
lf_hash_init (LF_HASH_TABLE * table, LF_FREELIST * freelist, unsigned int hash_size, LF_ENTRY_DESCRIPTOR * edesc)
{
  assert (table != NULL && freelist != NULL && edesc != NULL);
  assert (hash_size > 0);

  if (table->buckets != NULL)
    {
      /* already initialized */
      return NO_ERROR;
    }

  /* register values */
  table->freelist = freelist;
  table->hash_size = hash_size;
  table->entry_desc = edesc;

  /* allocate bucket space */
  table->buckets = (void **) malloc (sizeof (void *) * hash_size);
  if (table->buckets == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (void *) * hash_size);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }
  else
    {
      /* zero all */
      memset (table->buckets, 0, sizeof (void *) * hash_size);
    }

  /* allocate backbuffer */
  table->backbuffer = (void **) malloc (sizeof (void *) * hash_size);
  if (table->backbuffer == NULL)
    {
      free (table->buckets);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (void *) * hash_size);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }
  else
    {
      int i;

      /* put backbuffer in a "locked" state */
      for (i = 0; i < (int) hash_size; i++)
    {
      table->backbuffer[i] = ADDR_WITH_MARK (NULL);
    }

      /* initialize mutex */
      pthread_mutex_init (&table->backbuffer_mutex, NULL);
    }

  /* all ok */
  return NO_ERROR;
}

/*
 * lf_hash_destroy () - destroy a hash table
 *   table(in): hash table to destroy
 */
void
lf_hash_destroy (LF_HASH_TABLE * table)
{
  unsigned int i;
  void *entry, *next;

  if (table == NULL)
    {
      return;
    }

  if (table->buckets)
    {
      /* free entries */
      for (i = 0; i < table->hash_size; i++)
    {
      entry = table->buckets[i];

      while (entry != NULL)
        {
          next = OF_GET_PTR_DEREF (entry, table->entry_desc->of_next);

          if (table->entry_desc->f_uninit != NULL)
        {
          table->entry_desc->f_uninit (entry);
        }
          table->entry_desc->f_free (entry);
          entry = next;
        }
    }

      /* free memory */
      free (table->buckets);
      table->buckets = NULL;
    }

  pthread_mutex_destroy (&table->backbuffer_mutex);
  if (table->backbuffer)
    {
      free (table->backbuffer);
      table->backbuffer = NULL;
    }

  table->hash_size = 0;
}

/*
 * lf_hash_find () - find an entry in the hash table given a key
 *   returns: error code or NO_ERROR
 *   tran(in): LF transaction entry
 *   table(in): hash table
 *   key(in): key of entry that we seek
 *   entry(out): existing or NULL otherwise
 */
int
lf_hash_find (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, void **entry)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  unsigned int hash_value;
  int rc, bflags;

  assert (table != NULL && key != NULL && entry != NULL);
  edesc = table->entry_desc;
  assert (edesc != NULL);

restart:
  *entry = NULL;
  hash_value = edesc->f_hash (key, table->hash_size);
  if (hash_value >= table->hash_size)
    {
      assert (false);
      return ER_FAILED;
    }

  bflags = LF_LIST_BF_RETURN_ON_RESTART;
  rc = lf_list_find (tran, &table->buckets[hash_value], key, &bflags, edesc, entry);
  if ((rc == NO_ERROR) && (bflags & LF_LIST_BR_RESTARTED))
    {
      goto restart;
    }
  else
    {
      return rc;
    }
}

/*
 * lf_hash_insert_internal () - hash insert function.
 *   returns: error code or NO_ERROR
 *   tran(in): LF transaction entry
 *   table(in): hash table
 *   key(in): key of entry that we seek
 *   bflags(in): behavior flags
 *   entry(out): existing or new entry
 *   inserted(out): returns 1 if inserted, 0 if found or not inserted.
 *
 * Behavior flags:
 *
 * LF_LIST_BF_RETURN_ON_RESTART - When insert fails because last entry in bucket was deleted, if this flag is set,
 *                then the operation is restarted from here, instead of looping inside
 *                lf_list_insert_internal (as a consequence, hash key is recalculated).
 *                NOTE: Currently, this flag is always used (I must find out why).
 *
 * LF_LIST_BF_INSERT_GIVEN  - If this flag is set, the caller means to force its own entry into hash table.
 *                When the flag is not set, a new entry is claimed from freelist.
 *                NOTE: If an entry with the same key already exists, the entry given as argument is
 *                  automatically retired.
 *
 * LF_LIST_BF_FIND_OR_INSERT    - If this flag is set and an entry for the same key already exists, the existing
 *                key will be output. If the flag is not set and key exists, insert just gives up
 *                and a NULL entry is output.
 *                NOTE: insert will not give up when key exists, if edesc->f_update is provided.
 *                  a new key is generated and insert is restarted.
 */
static int
lf_hash_insert_internal (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, int bflags, void **entry,
             int *inserted)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  unsigned int hash_value;
  int rc;

  assert (table != NULL && key != NULL && entry != NULL);
  edesc = table->entry_desc;
  assert (edesc != NULL);

  LF_UNITTEST_INC (&lf_inserts, 1);

restart:
  if (LF_LIST_BF_IS_FLAG_SET (&bflags, LF_LIST_BF_INSERT_GIVEN))
    {
      assert (*entry != NULL);
    }
  else
    {
      *entry = NULL;
    }
  hash_value = edesc->f_hash (key, table->hash_size);
  if (hash_value >= table->hash_size)
    {
      assert (false);
      return ER_FAILED;
    }

  rc =
    lf_list_insert_internal (tran, &table->buckets[hash_value], key, &bflags, edesc, table->freelist, entry, inserted);
  if ((rc == NO_ERROR) && (bflags & LF_LIST_BR_RESTARTED))
    {
      bflags &= ~LF_LIST_BR_RESTARTED;
      LF_UNITTEST_INC (&lf_inserts_restart, 1);
      goto restart;
    }
  else
    {
      return rc;
    }
}

/*
 * lf_hash_find_or_insert () - find or insert an entry in the hash table
 *   returns: error code or NO_ERROR
 *   tran(in): LF transaction entry
 *   table(in): hash table
 *   key(in): key of entry that we seek
 *   entry(out): existing or new entry
 *   inserted(out): returns 1 if inserted, 0 if found or not inserted.
 *
 */
int
lf_hash_find_or_insert (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, void **entry, int *inserted)
{
  return lf_hash_insert_internal (tran, table, key, LF_LIST_BF_RETURN_ON_RESTART | LF_LIST_BF_FIND_OR_INSERT, entry,
                  inserted);
}

/*
 * lf_hash_insert () - insert a new entry with a specified key.
 *   returns: error code or NO_ERROR
 *   tran(in): LF transaction entry
 *   table(in): hash table
 *   key(in): key of entry to insert
 *   entry(out): new entry
 *   inserted(out): returns 1 if inserted, 0 if found or not inserted.
 *
 */
int
lf_hash_insert (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, void **entry, int *inserted)
{
  return lf_hash_insert_internal (tran, table, key, LF_LIST_BF_RETURN_ON_RESTART, entry, inserted);
}

/*
 * lf_hash_insert_given () - insert entry given as argument. if same key exists however, replace it with existing key.
 *   returns: error code or NO_ERROR
 *   tran(in): LF transaction entry
 *   table(in): hash table
 *   key(in): key of entry to insert
 *   entry(in/out): new entry
 *   inserted(out): returns 1 if inserted, 0 if found or not inserted.
 *
 */
int
lf_hash_insert_given (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, void **entry, int *inserted)
{
  assert (entry != NULL && *entry != NULL);
  return lf_hash_insert_internal (tran, table, key,
                  LF_LIST_BF_RETURN_ON_RESTART | LF_LIST_BF_INSERT_GIVEN | LF_LIST_BF_FIND_OR_INSERT,
                  entry, inserted);
}

/*
 * lf_hash_delete_already_locked () - Delete hash entry without locking mutex.
 *
 * return        : error code or NO_ERROR
 * tran (in)         : LF transaction entry
 * table (in)        : hash table
 * key (in)      : key to seek
 * locked_entry (in) : locked entry
 * success (out)     : 1 if entry is deleted, 0 otherwise
 *
 * NOTE: Careful when calling this function. The typical scenario to call this function is to first find entry using
 *   lf_hash_find and then call lf_hash_delete on the found entry.
 * NOTE: lf_hash_delete_already_locks can be called only if entry has mutexes.
 * NOTE: The delete will be successful only if the entry found by key matches the given entry.
 *   Usually, the entry will match. However, we do have a limited scenario when a different entry with the same
 *   key may be found:
 *   1. Entry was found or inserted by this transaction.
 *   2. Another transaction cleared the hash. All current entries are moved to back buffer and will be soon retired.
 *   3. A third transaction inserts a new entry with the same key.
 *   4. This transaction tries to delete the entry but the entry inserted by the third transaction si found.
 */
int
lf_hash_delete_already_locked (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, void *locked_entry, int *success)
{
  assert (locked_entry != NULL);
  assert (table->entry_desc->using_mutex);
  return lf_hash_delete_internal (tran, table, key, locked_entry, LF_LIST_BF_RETURN_ON_RESTART, success);
}

/*
 * lf_hash_delete () - Delete hash entry. If the entries have mutex, it will lock the mutex before deleting.
 *
 * return    : error code or NO_ERROR
 * tran (in)     : LF transaction entry
 * table (in)    : hash table
 * key (in)  : key to seek
 * success (out) : 1 if entry is deleted, 0 otherwise
 */
int
lf_hash_delete (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, int *success)
{
  return lf_hash_delete_internal (tran, table, key, NULL, LF_LIST_BF_RETURN_ON_RESTART | LF_LIST_BF_LOCK_ON_DELETE,
                  success);
}


/*
 * lf_hash_delete () - delete an entry from the hash table
 *   returns: error code or NO_ERROR
 *   tran(in): LF transaction entry
 *   table(in): hash table
 *   locked_entry(in): locked entry
 *   key(in): key to seek
 *   success(out): 1 if entry is deleted, 0 otherwise.
 */
static int
lf_hash_delete_internal (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table, void *key, void *locked_entry, int bflags,
             int *success)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  unsigned int hash_value;
  int rc;

  assert (table != NULL && key != NULL);
  edesc = table->entry_desc;
  assert (edesc != NULL);

  LF_UNITTEST_INC (&lf_deletes, 1);

restart:
  if (success != NULL)
    {
      *success = 0;
    }
  hash_value = edesc->f_hash (key, table->hash_size);
  if (hash_value >= table->hash_size)
    {
      assert (false);
      return ER_FAILED;
    }

  rc = lf_list_delete (tran, &table->buckets[hash_value], key, locked_entry, &bflags, edesc, table->freelist, success);
  if ((rc == NO_ERROR) && (bflags & LF_LIST_BR_RESTARTED))
    {
      /* Remove LF_LIST_BR_RESTARTED from behavior flags. */
      bflags &= ~LF_LIST_BR_RESTARTED;
      LF_UNITTEST_INC (&lf_deletes_restart, 1);
      goto restart;
    }
  else
    {
      return rc;
    }
}

/*
 * lf_hash_clear () - clear the hash table
 *   returns: Void
 *   tran(in): LF transaction entry
 *   table(in): hash table to clear
 *
 * NOTE: This function is NOT lock free.
 */
void
lf_hash_clear (LF_TRAN_ENTRY * tran, LF_HASH_TABLE * table)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  void **old_buckets, *curr, **next_p, *next;
  void *ret_head = NULL, *ret_tail = NULL;
  pthread_mutex_t *mutex_p;
  int rv, i, ret_count = 0;

  assert (tran != NULL && table != NULL && table->freelist != NULL);
  edesc = table->entry_desc;
  assert (edesc != NULL);

#if defined (UNITTEST_LF)
  assert (tran->locked_mutex == NULL);
#endif /* UNITTEST_LF */

  /* lock mutex */
  rv = pthread_mutex_lock (&table->backbuffer_mutex);

  /* swap bucket pointer with current backbuffer */
  do
    {
      old_buckets = VOLATILE_ACCESS (table->buckets, void **);
    }
  while (!ATOMIC_CAS_ADDR (&table->buckets, old_buckets, table->backbuffer));

  /* clear bucket buffer, containing remains of old entries marked for delete */
  for (i = 0; i < (int) table->hash_size; i++)
    {
      assert (table->backbuffer[i] == ADDR_WITH_MARK (NULL));
      table->buckets[i] = NULL;
    }

  /* register new backbuffer */
  table->backbuffer = old_buckets;

  /* retire all entries from old buckets; note that threads currently operating on the entries will not be disturbed
   * since the actual deletion is performed when the entries are no longer handled by active transactions */
  for (i = 0; i < (int) table->hash_size; i++)
    {
      do
    {
      curr = ADDR_STRIP_MARK (VOLATILE_ACCESS (old_buckets[i], void *));
    }
      while (!ATOMIC_CAS_ADDR (&old_buckets[i], curr, ADDR_WITH_MARK (NULL)));

      while (curr)
    {
      next_p = (void **) OF_GET_REF (curr, edesc->of_next);

      /* unlink from hash chain */
      do
        {
          next = ADDR_STRIP_MARK (VOLATILE_ACCESS (*next_p, void *));
        }
      while (!ATOMIC_CAS_ADDR (next_p, next, ADDR_WITH_MARK (next)));

      /* wait for mutex */
      if (edesc->using_mutex)
        {
          mutex_p = (pthread_mutex_t *) OF_GET_PTR (curr, edesc->of_mutex);

          rv = pthread_mutex_lock (mutex_p);
          pthread_mutex_unlock (mutex_p);

          /* there should be only one mutex lock-unlock per entry per access via bucket array, so locking/unlocking
           * once while the entry is inaccessible should be enough to guarantee nobody will be using it afterwards */
        }

      /* save and advance */
      if (ret_head == NULL)
        {
          ret_head = curr;
          ret_tail = curr;
          ret_count = 1;
        }
      else
        {
          OF_GET_PTR_DEREF (ret_tail, edesc->of_local_next) = curr;
          ret_tail = curr;
          ret_count++;
        }

      /* advance */
      curr = next;
    }
    }

  if (ret_head != NULL)
    {
      /* reuse entries */
      lf_tran_start_with_mb (tran, true);

      for (curr = ret_head; curr != NULL; curr = OF_GET_PTR_DEREF (curr, edesc->of_local_next))
    {
      UINT64 *del_id = (UINT64 *) OF_GET_PTR (curr, edesc->of_del_tran_id);
      *del_id = tran->transaction_id;
    }

      OF_GET_PTR_DEREF (ret_tail, edesc->of_local_next) = tran->retired_list;
      tran->retired_list = ret_head;

      ATOMIC_INC_32 (&table->freelist->retired_cnt, ret_count);

      LF_UNITTEST_INC (&lf_clears, ret_count);
      LF_UNITTEST_INC (&lf_hash_size, -ret_count);

      lf_tran_end_with_mb (tran);
    }

  /* unlock mutex and return to caller */
  pthread_mutex_unlock (&table->backbuffer_mutex);
}

/*
 * lf_hash_create_iterator () - create an iterator for a hash table
 *   iterator(out): iterator to be initialized
 *   tran_entry(in):
 *   table(in): hash table to iterate on
 *   returns: void
 */
void
lf_hash_create_iterator (LF_HASH_TABLE_ITERATOR * iterator, LF_TRAN_ENTRY * tran_entry, LF_HASH_TABLE * table)
{
  assert (iterator != NULL && table != NULL);

  iterator->hash_table = table;
  iterator->curr = NULL;
  iterator->bucket_index = -1;
  iterator->tran_entry = tran_entry;
}

/*
 * lf_hash_iterate () - iterate using a pre-created iterator
 *   it(in): iterator
 *   returns: next entry or NULL on end
 *
 * NOTE: Absolutely no order guaranteed.
 * NOTE: Caller must not change HP_LEADER until end of iteration.
 * NOTE: This function will change HP_TRAILER.
 */
void *
lf_hash_iterate (LF_HASH_TABLE_ITERATOR * it)
{
  LF_ENTRY_DESCRIPTOR *edesc;
  LF_TRAN_ENTRY *tran_entry;
  void **next_p;

  if (it == NULL || it->hash_table == NULL)
    {
      assert (false);
      return NULL;
    }

  edesc = it->hash_table->entry_desc;
  tran_entry = it->tran_entry;
  assert (edesc != NULL && tran_entry != NULL);

  do
    {
      /* save current leader as trailer */
      if (it->curr != NULL)
    {
      if (edesc->using_mutex)
        {
          /* follow house rules: lock mutex */
          pthread_mutex_t *mx;
          mx = (pthread_mutex_t *) OF_GET_PTR (it->curr, edesc->of_mutex);
          pthread_mutex_unlock (mx);
        }

      /* load next entry */
      next_p = (void **) OF_GET_REF (it->curr, edesc->of_next);
      it->curr = *(void *volatile *) next_p;
      it->curr = ADDR_STRIP_MARK (it->curr);
    }
      else
    {
      /* reset transaction for each bucket */
      if (it->bucket_index >= 0)
        {
          lf_tran_end_with_mb (tran_entry);
        }
      lf_tran_start_with_mb (tran_entry, false);

      /* load next bucket */
      it->bucket_index++;
      if (it->bucket_index < (int) it->hash_table->hash_size)
        {
          it->curr = VOLATILE_ACCESS (it->hash_table->buckets[it->bucket_index], void *);
          it->curr = ADDR_STRIP_MARK (it->curr);
        }
      else
        {
          /* end */
          lf_tran_end_with_mb (tran_entry);
          return NULL;
        }
    }

      if (it->curr != NULL)
    {
      if (edesc->using_mutex)
        {
          pthread_mutex_t *mx;
          int rv;

          mx = (pthread_mutex_t *) OF_GET_PTR (it->curr, edesc->of_mutex);
          rv = pthread_mutex_lock (mx);

          if (ADDR_HAS_MARK (OF_GET_PTR_DEREF (it->curr, edesc->of_next)))
        {
          /* deleted in the meantime, skip it */
          continue;
        }
        }
    }
    }
  while (it->curr == NULL);

  /* we have a valid entry */
  return it->curr;
}

#if defined (UNITTEST_LF)
/*
 * lf_reset_counters () - Reset all counters.
 *
 * return :
 * void (in) :
 */
void
lf_reset_counters (void)
{
  lf_hash_size = 0;

  lf_inserts = 0;
  lf_inserts_restart = 0;
  lf_list_inserts = 0;
  lf_list_inserts_found = 0;
  lf_list_inserts_save_temp_1 = 0;
  lf_list_inserts_save_temp_2 = 0;
  lf_list_inserts_claim = 0;
  lf_list_inserts_fail_link = 0;
  lf_list_inserts_success_link = 0;

  lf_deletes = 0;
  lf_deletes_restart = 0;
  lf_list_deletes = 0;
  lf_list_deletes_found = 0;
  lf_list_deletes_fail_mark_next = 0;
  lf_list_deletes_fail_unlink = 0;
  lf_list_deletes_success_unlink = 0;
  lf_list_deletes_not_found = 0;
  lf_list_deletes_not_match = 0;

  lf_clears = 0;

  lf_retires = 0;
  lf_claims = 0;
  lf_claims_temp = 0;
  lf_transports = 0;
  lf_temps = 0;
}
#endif /* UNITTEST_LF */