File lockfree_hashmap.hpp¶
File List > base > lockfree_hashmap.hpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// lock-free hash map structure
//
#ifndef _LOCKFREE_HASHMAP_HPP_
#define _LOCKFREE_HASHMAP_HPP_
#include "error_code.h"
#include "lock_free.h" // for lf_entry_descriptor
#include "lockfree_address_marker.hpp"
#include "lockfree_freelist.hpp"
#include "lockfree_transaction_reclaimable.hpp"
#include "monitor_collect.hpp"
#include "porting.h"
#include <cassert>
#include <limits>
#include <mutex>
#include <ostream>
namespace lockfree
{
template <class Key, class T>
class hashmap
{
public:
class iterator;
hashmap ();
~hashmap ();
void init (tran::system &transys, size_t hash_size, size_t freelist_block_size, size_t freelist_block_count,
lf_entry_descriptor &edesc);
void destroy ();
T *find (tran::index tran_index, Key &key);
bool find_or_insert (tran::index tran_index, Key &key, T *&entry);
bool insert (tran::index tran_index, Key &key, T *&entry);
bool insert_given (tran::index tran_index, Key &key, T *&entry);
bool erase (tran::index tran_index, Key &key);
bool erase_locked (tran::index tran_index, Key &key, T *&locked_entry);
void unlock (tran::index tran_index, T *&entry);
void clear (tran::index tran_index); // NOT LOCK-FREE
T *freelist_claim (tran::index tran_index);
void freelist_retire (tran::index tran_index, T *&entry);
void start_tran (tran::index tran_index);
void end_tran (tran::index tran_index);
template <typename D>
void dump_stats (std::ostream &os) const;
void activate_stats ();
void deactivate_stats ();
size_t get_size () const;
size_t get_element_count () const;
size_t get_alloc_element_count () const;
private:
using address_type = address_marker<T>;
// wrap T with on_reclaim functionality based on edesc.f_uninit
struct freelist_node_data
{
T m_entry;
lf_entry_descriptor *m_edesc;
freelist_node_data () = default;
~freelist_node_data () = default;
void on_reclaim ()
{
if (m_edesc->f_uninit != NULL)
{
(void) m_edesc->f_uninit (&m_entry);
}
}
};
using freelist_type = freelist<freelist_node_data>;
using free_node_type = typename freelist_type::free_node;
freelist_type *m_freelist;
T **m_buckets;
size_t m_size;
T **m_backbuffer;
std::mutex m_backbuffer_mutex;
lf_entry_descriptor *m_edesc;
// statistics
using ct_stat_type = cubmonitor::atomic_counter_timer_stat;
ct_stat_type m_stat_find;
ct_stat_type m_stat_insert;
ct_stat_type m_stat_erase;
ct_stat_type m_stat_unlock;
ct_stat_type m_stat_clear;
ct_stat_type m_stat_iterates;
ct_stat_type m_stat_claim;
ct_stat_type m_stat_retire;
bool m_active_stats;
void *volatile *get_ref (T *p, size_t offset);
void *get_ptr (T *p, size_t offset);
void *get_ptr_deref (T *p, size_t offset);
void *get_keyp (T *p);
T *get_nextp (T *p);
T *&get_nextp_ref (T *p);
pthread_mutex_t *get_pthread_mutexp (T *p);
free_node_type *to_free_node (T *p);
T *from_free_node (free_node_type *fn);
void save_temporary (tran::descriptor &tdes, T *&p);
T *claim_temporary (tran::descriptor &tdes);
T *freelist_claim (tran::descriptor &tdes);
void freelist_retire (tran::descriptor &tdes, T *&p);
void safeguard_use_mutex_or_tran_started (const tran::descriptor &tdes, const pthread_mutex_t *mtx);
void start_tran_if_not_started (tran::descriptor &tdes);
void start_tran_force (tran::descriptor &tdes);
void promote_tran_force (tran::descriptor &tdes);
void end_tran_if_started (tran::descriptor &tdes);
void end_tran_force (tran::descriptor &tdes);
void lock_entry (T &tolock);
void unlock_entry (T &tounlock);
void lock_entry_mutex (T &tolock, pthread_mutex_t *&mtx);
void unlock_entry_mutex_if_locked (pthread_mutex_t *&mtx);
void unlock_entry_mutex_force (pthread_mutex_t *&mtx);
size_t get_hash (Key &key) const;
T *&get_bucket (Key &key);
tran::descriptor &get_tran_descriptor (tran::index tran_index);
void list_find (tran::index tran_index, T *list_head, Key &key, int *behavior_flags, T *&found_node);
bool list_insert_internal (tran::index tran_index, T *&list_head, Key &key, int *behavior_flags,
T *&found_node);
bool list_delete (tran::index tran_index, T *&list_head, Key &key, T *locked_entry, int *behavior_flags);
bool hash_insert_internal (tran::index tran_index, Key &key, int bflags, T *&entry);
bool hash_erase_internal (tran::index tran_index, Key &key, int bflags, T *locked_entry);
template <typename D>
void dump_stat (std::ostream &os, const char *name, const ct_stat_type &ct_stat) const;
static constexpr std::ptrdiff_t free_node_offset_of_data (free_node_type fn)
{
return ((char *) (&fn.get_data ().m_entry)) - ((char *) (&fn));
}
}; // class hashmap
template <class Key, class T>
class hashmap<Key, T>::iterator
{
public:
iterator () = default;
iterator (tran::index tran_index, hashmap &hash);
~iterator () = default;
T *iterate ();
void restart ();
iterator &operator= (iterator &&o);
private:
const size_t INVALID_INDEX = std::numeric_limits<size_t>::max ();
hashmap *m_hashmap;
tran::descriptor *m_tdes;
size_t m_bucket_index;
T *m_curr;
};
} // namespace lockfree
//
// implementation
//
namespace lockfree
{
template <class Key, class T>
hashmap<Key, T>::hashmap ()
: m_freelist (NULL)
, m_buckets (NULL)
, m_size (0)
, m_backbuffer (NULL)
, m_backbuffer_mutex ()
, m_edesc (NULL)
, m_stat_find ()
, m_stat_insert ()
, m_stat_erase ()
, m_stat_unlock ()
, m_stat_clear ()
, m_stat_iterates ()
, m_stat_claim ()
, m_stat_retire ()
, m_active_stats ()
{
}
template <class Key, class T>
hashmap<Key, T>::~hashmap ()
{
destroy ();
}
template <class Key, class T>
void
hashmap<Key, T>::destroy ()
{
if (m_buckets != NULL)
{
T *node_iter;
T *save_next;
tran::descriptor &tdes = get_tran_descriptor (0);
for (size_t i = 0; i < m_size; ++i)
{
for (node_iter = m_buckets[i]; node_iter != NULL; node_iter = save_next)
{
assert (!address_type::is_address_marked (node_iter));
save_next = get_nextp (node_iter);
freelist_retire (tdes, node_iter);
}
}
}
delete [] m_buckets;
m_buckets = NULL;
delete [] m_backbuffer;
m_backbuffer = NULL;
delete m_freelist;
m_freelist = NULL;
}
template <class Key, class T>
void
hashmap<Key, T>::init (tran::system &transys, size_t hash_size, size_t freelist_block_size,
size_t freelist_block_count, lf_entry_descriptor &edesc)
{
m_freelist = new freelist_type (transys, freelist_block_size, freelist_block_count);
m_edesc = &edesc;
m_size = hash_size;
m_buckets = new T *[m_size] ();
m_backbuffer = new T *[m_size] ();
for (size_t i = 0; i < m_size; i++)
{
m_backbuffer[i] = address_type::set_adress_mark (NULL);
}
}
template <class Key, class T>
T *
hashmap<Key, T>::find (tran::index tran_index, Key &key)
{
ct_stat_type::autotimer stat_autotimer (m_stat_find, m_active_stats);
int bflags = 0;
bool restart = true;
T *entry = NULL;
while (restart)
{
T *&list_head = get_bucket (key);
entry = NULL;
bflags = LF_LIST_BF_RETURN_ON_RESTART;
list_find (tran_index, list_head, key, &bflags, entry);
restart = (bflags & LF_LIST_BR_RESTARTED) != 0;
}
return entry;
}
template <class Key, class T>
bool
hashmap<Key, T>::find_or_insert (tran::index tran_index, Key &key, T *&entry)
{
return hash_insert_internal (tran_index, key, LF_LIST_BF_RETURN_ON_RESTART | LF_LIST_BF_FIND_OR_INSERT, entry);
}
template <class Key, class T>
bool
hashmap<Key, T>::insert (tran::index tran_index, Key &key, T *&entry)
{
return hash_insert_internal (tran_index, key, LF_LIST_BF_RETURN_ON_RESTART, entry);
}
template <class Key, class T>
bool
hashmap<Key, T>::insert_given (tran::index tran_index, Key &key, T *&entry)
{
assert (entry != NULL);
return hash_insert_internal (tran_index, key,
LF_LIST_BF_RETURN_ON_RESTART | LF_LIST_BF_INSERT_GIVEN | LF_LIST_BF_FIND_OR_INSERT,
entry);
}
template <class Key, class T>
bool
hashmap<Key, T>::erase (tran::index tran_index, Key &key)
{
return hash_erase_internal (tran_index, key, LF_LIST_BF_RETURN_ON_RESTART | LF_LIST_BF_LOCK_ON_DELETE, NULL);
}
/*
* NOTE: Careful when calling this function. The typical scenario to call this function is to first find entry using
* lf_hash_find and then call lf_hash_delete on the found entry.
* NOTE: lf_hash_delete_already_locks can be called only if entry has mutexes.
* NOTE: The delete will be successful only if the entry found by key matches the given entry.
* Usually, the entry will match. However, we do have a limited scenario when a different entry with the same
* key may be found:
* 1. Entry was found or inserted by this transaction.
* 2. Another transaction cleared the hash. All current entries are moved to back buffer and will be soon
* retired.
* 3. A third transaction inserts a new entry with the same key.
* 4. This transaction tries to delete the entry but the entry inserted by the third transaction si found.
*/
template <class Key, class T>
bool
hashmap<Key, T>::erase_locked (tran::index tran_index, Key &key, T *&locked_entry)
{
assert (locked_entry != NULL);
assert (m_edesc->using_mutex);
bool success = hash_erase_internal (tran_index, key, LF_LIST_BF_RETURN_ON_RESTART, locked_entry);
if (success)
{
locked_entry = NULL;
}
return success;
}
template <class Key, class T>
void
hashmap<Key, T>::unlock (tran::index tran_index, T *&entry)
{
ct_stat_type::autotimer stat_autotimer (m_stat_unlock, m_active_stats);
assert (entry != NULL);
if (m_edesc->using_mutex)
{
unlock_entry (*entry);
}
else
{
get_tran_descriptor (tran_index).end_tran ();
}
entry = NULL;
}
template <class Key, class T>
void
hashmap<Key, T>::clear (tran::index tran_index)
{
ct_stat_type::autotimer stat_autotimer (m_stat_clear, m_active_stats);
tran::descriptor &tdes = get_tran_descriptor (tran_index);
/* lock mutex */
std::unique_lock<std::mutex> ulock (m_backbuffer_mutex);
/* swap bucket pointer with current backbuffer */
T **old_buckets = ATOMIC_TAS_ADDR (&m_buckets, m_backbuffer);
/* clear bucket buffer, containing remains of old entries marked for delete */
for (size_t i = 0; i < m_size; ++i)
{
assert (m_backbuffer[i] == address_type::set_adress_mark (NULL));
m_buckets[i] = NULL;
}
/* register new backbuffer */
m_backbuffer = old_buckets;
/* retire all entries from old buckets; note that threads currently operating on the entries will not be disturbed
* since the actual deletion is performed when the entries are no longer handled by active transactions */
T *curr = NULL;
T **next_p = NULL;
T *next = NULL;
pthread_mutex_t *mutex_p = NULL;
for (size_t i = 0; i < m_size; ++i)
{
// remove list from bucket
// warning: this may spin
do
{
curr = address_type::strip_address_mark (old_buckets[i]);
}
while (!ATOMIC_CAS_ADDR (&old_buckets[i], curr, address_type::set_adress_mark (NULL)));
while (curr != NULL)
{
next_p = &get_nextp_ref (curr);
/* unlink from hash chain */
// warning: this may spin
do
{
next = address_type::strip_address_mark (*next_p);
}
while (!ATOMIC_CAS_ADDR (next_p, next, address_type::set_adress_mark (next)));
/* wait for mutex */
if (m_edesc->using_mutex)
{
lock_entry_mutex (*curr, mutex_p);
unlock_entry_mutex_force (mutex_p);
/* there should be only one mutex lock-unlock per entry per access via bucket array, so locking/unlocking
* once while the entry is inaccessible should be enough to guarantee nobody will be using it afterwards */
}
/* retire */
freelist_retire (tdes, curr);
/* advance */
curr = next;
}
}
}
template <class Key, class T>
size_t
hashmap<Key, T>::get_size () const
{
return m_size;
}
template <class Key, class T>
size_t
hashmap<Key, T>::get_element_count () const
{
return m_freelist->get_claimed_count ();
}
template <class Key, class T>
size_t
hashmap<Key, T>::get_alloc_element_count () const
{
return m_freelist->get_alloc_count ();
}
template <class Key, class T>
size_t
hashmap<Key, T>::get_hash (Key &key) const
{
return m_edesc->f_hash (&key, (int) m_size);
}
template <class Key, class T>
T *&
hashmap<Key, T>::get_bucket (Key &key)
{
return m_buckets[get_hash (key)];
}
template <class Key, class T>
tran::descriptor &
hashmap<Key, T>::get_tran_descriptor (tran::index tran_index)
{
return m_freelist->get_transaction_table ().get_descriptor (tran_index);
}
template <class Key, class T>
void *
hashmap<Key, T>::get_ptr (T *p, size_t offset)
{
assert (p != NULL);
assert (!address_type::is_address_marked (p));
return (void *) (((char *) p) + offset);
}
template <class Key, class T>
void *volatile *
hashmap<Key, T>::get_ref (T *p, size_t offset)
{
assert (p != NULL);
assert (!address_type::is_address_marked (p));
return (void *volatile *) (((char *) p) + offset);
}
template <class Key, class T>
void *
hashmap<Key, T>::get_ptr_deref (T *p, size_t offset)
{
return *get_ref (p, offset);
}
template <class Key, class T>
void *
hashmap<Key, T>::get_keyp (T *p)
{
return get_ptr (p, m_edesc->of_key);
}
template <class Key, class T>
T *
hashmap<Key, T>::get_nextp (T *p)
{
return (T *) get_ptr_deref (p, m_edesc->of_next);
}
template <class Key, class T>
T *&
hashmap<Key, T>::get_nextp_ref (T *p)
{
return * (T **) get_ref (p, m_edesc->of_next);
}
template <class Key, class T>
pthread_mutex_t *
hashmap<Key, T>::get_pthread_mutexp (T *p)
{
return (pthread_mutex_t *) get_ptr (p, m_edesc->of_mutex);
}
template <class Key, class T>
T *
hashmap<Key, T>::from_free_node (free_node_type *fn)
{
assert (fn != NULL);
return &fn->get_data ().m_entry;
}
template <class Key, class T>
typename hashmap<Key, T>::freelist_type::free_node *
hashmap<Key, T>::to_free_node (T *p)
{
// not nice, but necessary until we fully refactor lockfree hashmap
const std::ptrdiff_t off = free_node_offset_of_data (free_node_type ());
char *cp = (char *) p;
cp -= off;
return (free_node_type *) cp;
}
template <class Key, class T>
void
hashmap<Key, T>::save_temporary (tran::descriptor &tdes, T *&p)
{
tran::reclaimable_node *fn = to_free_node (p);
tdes.save_reclaimable (fn);
p = NULL;
}
template <class Key, class T>
T *
hashmap<Key, T>::claim_temporary (tran::descriptor &tdes)
{
return from_free_node (reinterpret_cast<free_node_type *> (tdes.pull_saved_reclaimable ()));
}
template <class Key, class T>
void
hashmap<Key, T>::freelist_retire (tran::index tran_index, T *&entry)
{
return freelist_retire (get_tran_descriptor (tran_index), entry);
}
template <class Key, class T>
void
hashmap<Key, T>::start_tran (tran::index tran_index)
{
get_tran_descriptor (tran_index).start_tran ();
}
template <class Key, class T>
void
hashmap<Key, T>::end_tran (tran::index tran_index)
{
get_tran_descriptor (tran_index).end_tran ();
}
template <class Key, class T>
template <typename D>
void
hashmap<Key, T>::dump_stat (std::ostream &os, const char *name, const ct_stat_type &ct_stat) const
{
if (ct_stat.get_count () != 0)
{
os << name;
os << "[" << ct_stat.get_count ();
os << ", " << std::chrono::duration_cast<D> (ct_stat.get_time ()).count ();
os << "] ";
}
}
template <class Key, class T>
template <typename D>
void
hashmap<Key, T>::dump_stats (std::ostream &os) const
{
dump_stat<D> (os, "find", m_stat_find);
dump_stat<D> (os, "insert", m_stat_insert);
dump_stat<D> (os, "erase", m_stat_erase);
dump_stat<D> (os, "unlock", m_stat_unlock);
dump_stat<D> (os, "clear", m_stat_clear);
dump_stat<D> (os, "iter", m_stat_iterates);
dump_stat<D> (os, "claim", m_stat_claim);
dump_stat<D> (os, "retire", m_stat_retire);
}
template <class Key, class T>
void
hashmap<Key, T>::activate_stats ()
{
m_active_stats = true;
}
template <class Key, class T>
void
hashmap<Key, T>::deactivate_stats ()
{
m_active_stats = false;
}
template <class Key, class T>
void
hashmap<Key, T>::freelist_retire (tran::descriptor &tdes, T *&p)
{
ct_stat_type::autotimer stat_autotimer (m_stat_retire, m_active_stats);
assert (p != NULL);
m_freelist->retire (tdes, *to_free_node (p));
p = NULL;
}
template <class Key, class T>
T *
hashmap<Key, T>::freelist_claim (tran::index tran_index)
{
return freelist_claim (get_tran_descriptor (tran_index));
}
template <class Key, class T>
T *
hashmap<Key, T>::freelist_claim (tran::descriptor &tdes)
{
ct_stat_type::autotimer stat_autotimer (m_stat_claim, m_active_stats);
T *claimed = NULL;
free_node_type *fn = reinterpret_cast<free_node_type *> (tdes.pull_saved_reclaimable ());
bool is_local_tran = false;
if (!tdes.is_tran_started ())
{
tdes.start_tran ();
is_local_tran = true;
}
if (fn == NULL)
{
fn = m_freelist->claim (tdes);
assert (fn != NULL);
// make sure m_edesc is initialized
fn->get_data ().m_edesc = m_edesc;
claimed = from_free_node (fn);
// call f_init
if (m_edesc->f_init != NULL)
{
m_edesc->f_init (claimed);
}
}
else
{
claimed = from_free_node (fn);
// already initialized
}
get_nextp_ref (claimed) = NULL; // make sure link is removed
if (is_local_tran)
{
tdes.end_tran ();
}
return claimed;
}
template <class Key, class T>
void
hashmap<Key, T>::safeguard_use_mutex_or_tran_started (const tran::descriptor &tdes, const pthread_mutex_t *mtx)
{
/* Assert used to make sure the current entry is protected by either transaction or mutex. */
/* The transaction is started if and only if we don't use mutex */
assert (tdes.is_tran_started () == !m_edesc->using_mutex);
/* If we use mutex, we have a mutex locked. */
assert (!m_edesc->using_mutex || mtx != NULL);
}
template <class Key, class T>
void
hashmap<Key, T>::start_tran_if_not_started (tran::descriptor &tdes)
{
tdes.start_tran (); // same result if it was started or not
}
template <class Key, class T>
void
hashmap<Key, T>::start_tran_force (tran::descriptor &tdes)
{
assert (!tdes.is_tran_started ());
tdes.start_tran ();
}
template <class Key, class T>
void
hashmap<Key, T>::promote_tran_force (tran::descriptor &tdes)
{
assert (tdes.is_tran_started ());
tdes.start_tran_and_increment_id ();
}
template <class Key, class T>
void
hashmap<Key, T>::end_tran_if_started (tran::descriptor &tdes)
{
if (tdes.is_tran_started ())
{
tdes.end_tran ();
}
}
template <class Key, class T>
void
hashmap<Key, T>::end_tran_force (tran::descriptor &tdes)
{
assert (tdes.is_tran_started ());
tdes.end_tran ();
}
template <class Key, class T>
void
hashmap<Key, T>::lock_entry (T &tolock)
{
pthread_mutex_t *no_output_mtx = NULL;
lock_entry_mutex (tolock, no_output_mtx);
}
template <class Key, class T>
void
hashmap<Key, T>::unlock_entry (T &tounlock)
{
pthread_mutex_t *mtx = get_pthread_mutexp (&tounlock);
unlock_entry_mutex_force (mtx);
}
template <class Key, class T>
void
hashmap<Key, T>::lock_entry_mutex (T &tolock, pthread_mutex_t *&mtx)
{
assert (m_edesc->using_mutex);
assert (mtx == NULL);
mtx = get_pthread_mutexp (&tolock);
#if defined (SERVER_MODE)
pthread_mutex_lock (mtx);
#endif // SERVER_MODE
}
template <class Key, class T>
void
hashmap<Key, T>::unlock_entry_mutex_if_locked (pthread_mutex_t *&mtx)
{
if (m_edesc->using_mutex && mtx != NULL)
{
#if defined (SERVER_MODE)
pthread_mutex_unlock (mtx);
#endif // SERVER_MODE
mtx = NULL;
}
}
template <class Key, class T>
void
hashmap<Key, T>::unlock_entry_mutex_force (pthread_mutex_t *&mtx)
{
assert (m_edesc->using_mutex && mtx != NULL);
#if defined (SERVER_MODE)
pthread_mutex_unlock (mtx);
#endif // SERVER_MODE
mtx = NULL;
}
template <class Key, class T>
void
hashmap<Key, T>::list_find (tran::index tran_index, T *list_head, Key &key, int *behavior_flags, T *&entry)
{
tran::descriptor &tdes = get_tran_descriptor (tran_index);
T *curr = NULL;
pthread_mutex_t *entry_mutex = NULL;
/* by default, not found */
entry = NULL;
tdes.start_tran ();
bool restart_search = true;
while (restart_search) // restart_search:
{
restart_search = false;
curr = address_type::atomic_strip_address_mark (list_head);
restart_search = false;
while (curr != NULL)
{
if (m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0)
{
/* found! */
if (m_edesc->using_mutex)
{
/* entry has a mutex protecting it's members; lock it */
lock_entry_mutex (*curr, entry_mutex);
/* mutex has been locked, no need to keep transaction */
tdes.end_tran ();
if (address_type::is_address_marked (get_nextp_ref (curr)))
{
/* while waiting for lock, somebody else deleted the entry; restart the search */
unlock_entry_mutex_force (entry_mutex);
if (behavior_flags != NULL && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
{
*behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
return;
}
else
{
// restart
restart_search = true;
break;
}
}
} // m_edesc->using_mutex
entry = curr;
return;
}
/* advance */
curr = address_type::strip_address_mark (get_nextp_ref (curr));
}
} // while (restart_search)
/* all ok but not found */
tdes.end_tran ();
}
/*
* Behavior flags:
*
* LF_LIST_BF_RETURN_ON_RESTART - When insert fails because last entry in bucket was deleted, if this flag is set,
* then the operation is restarted from here, instead of looping inside
* lf_list_insert_internal (as a consequence, hash key is recalculated).
* NOTE: Currently, this flag is always used (I must find out why).
*
* LF_LIST_BF_INSERT_GIVEN - If this flag is set, the caller means to force its own entry into hash table.
* When the flag is not set, a new entry is claimed from freelist.
* NOTE: If an entry with the same key already exists, the entry given as argument is
* automatically retired.
*
* LF_LIST_BF_FIND_OR_INSERT - If this flag is set and an entry for the same key already exists, the existing
* key will be output. If the flag is not set and key exists, insert just gives up
* and a NULL entry is output.
* NOTE: insert will not give up when key exists, if m_edesc->f_update is provided.
* a new key is generated and insert is restarted.
*/
template <class Key, class T>
bool
hashmap<Key, T>::list_insert_internal (tran::index tran_index, T *&list_head, Key &key, int *behavior_flags,
T *&entry)
{
pthread_mutex_t *entry_mutex = NULL; /* Locked entry mutex when not NULL */
T **curr_p = NULL;
T *curr = NULL;
tran::descriptor &tdes = get_tran_descriptor (tran_index);
bool restart_search = true;
while (restart_search)
{
restart_search = false;
start_tran_force (tdes);
curr_p = &list_head;
curr = address_type::atomic_strip_address_mark (*curr_p);
/* search */
while (curr_p != NULL) // this is always true actually...
{
assert (tdes.is_tran_started ());
assert (entry_mutex == NULL);
if (curr != NULL)
{
if (m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0)
{
/* found an entry with the same key. */
if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN) && entry != NULL)
{
/* save this for further (local) use. */
save_temporary (tdes, entry);
}
if (m_edesc->using_mutex)
{
/* entry has a mutex protecting it's members; lock it */
lock_entry_mutex (*curr, entry_mutex);
/* mutex has been locked, no need to keep transaction alive */
end_tran_force (tdes);
if (address_type::is_address_marked (get_nextp_ref (curr)))
{
/* while waiting for lock, somebody else deleted the entry; restart the search */
unlock_entry_mutex_force (entry_mutex);
if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
{
*behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
return false;
}
else
{
restart_search = true;
break;
}
}
}
safeguard_use_mutex_or_tran_started (tdes, entry_mutex);
if (m_edesc->f_duplicate != NULL)
{
/* we have a duplicate key callback. */
if (m_edesc->f_duplicate (&key, curr) != NO_ERROR)
{
end_tran_if_started (tdes);
unlock_entry_mutex_if_locked (entry_mutex);
return false;
}
#if 1
LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
end_tran_if_started (tdes);
unlock_entry_mutex_if_locked (entry_mutex);
return false;
#else /* !1 = 0 */
/* Could we have such cases that we just update existing entry without modifying anything else?
* And would it be usable with just a flag?
* Then this code may be used.
* So far we have only one usage for f_duplicate, which increment SESSION_ID and requires
* restarting hash search. This will be the usual approach if f_duplicate.
* If we just increment a counter in existing entry, we don't need to do anything else. This
* however most likely depends on f_duplicate implementation. Maybe it is more useful to give
* behavior_flags argument to f_duplicate to tell us if restart is or is not needed.
*/
if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_RESTART_ON_DUPLICATE))
{
LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
end_tran_if_started (tdes);
unlock_entry_mutex_if_locked (entry_mutex);
return false;
}
else
{
/* duplicate does not require restarting search. */
if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
{
/* Could not be inserted. Retire the entry. */
freelist_retire (tdes, entry);
}
/* fall through to output current entry. */
}
#endif /* 0 */
}
else // m_edesc->f_duplicate == NULL
{
if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
{
/* the given entry could not be inserted. retire it. */
freelist_retire (tdes, entry);
}
if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_FIND_OR_INSERT))
{
/* found entry is not accepted */
end_tran_if_started (tdes);
unlock_entry_mutex_if_locked (entry_mutex);
return false;
}
/* fall through to output current entry. */
} // m_edesc->f_duplicate == NULL
assert (entry == NULL);
safeguard_use_mutex_or_tran_started (tdes, entry_mutex);
entry = curr;
return false;
} // m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0
/* advance */
curr_p = &get_nextp_ref (curr);
curr = address_type::strip_address_mark (*curr_p);
}
else // curr == NULL
{
/* end of bucket, we must insert */
if (entry == NULL)
{
assert (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN));
entry = freelist_claim (tdes);
assert (entry != NULL);
/* set it's key */
if (m_edesc->f_key_copy (&key, get_keyp (entry)) != NO_ERROR)
{
assert (false);
end_tran_force (tdes);
return false;
}
}
if (m_edesc->using_mutex)
{
/* entry has a mutex protecting it's members; lock it */
lock_entry_mutex (*entry, entry_mutex);
}
/* attempt an add */
if (!ATOMIC_CAS_ADDR (curr_p, (T *) NULL, entry))
{
if (m_edesc->using_mutex)
{
/* link failed, unlock mutex */
unlock_entry_mutex_force (entry_mutex);
}
/* someone added before us, restart process */
if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_RETURN_ON_RESTART))
{
if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
{
save_temporary (tdes, entry);
}
LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
end_tran_force (tdes);
return false;
}
else
{
end_tran_force (tdes);
restart_search = true;
break;
}
}
/* end transaction if mutex is acquired */
if (m_edesc->using_mutex)
{
end_tran_force (tdes);
}
/* done! */
return true;
} // else of if (curr != NULL)
} // while (curr_p != NULL)
// only way to exit while (curr_p != NULL) loop is to restart search
assert (restart_search);
} // while (restart_search)
/* impossible case */
assert (false);
return false;
}
template <class Key, class T>
bool
hashmap<Key, T>::list_delete (tran::index tran_index, T *&list_head, Key &key, T *locked_entry, int *behavior_flags)
{
pthread_mutex_t *entry_mutex = NULL;
tran::descriptor &tdes = get_tran_descriptor (tran_index);
T **curr_p;
T *curr;
T **next_p;
T *next;
bool restart_search = true;
while (restart_search)
{
restart_search = false;
start_tran_force (tdes);
curr_p = &list_head;
curr = address_type::atomic_strip_address_mark (*curr_p);
/* search */
while (curr != NULL)
{
/* is this the droid we are looking for? */
if (m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0)
{
if (locked_entry != NULL && locked_entry != curr)
{
assert (m_edesc->using_mutex
&& !LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE));
/* We are here because lf_hash_delete_already_locked was called. The entry found by matching key is
* different from the entry we were trying to delete.
* This is possible (please find the description of hash_delete_already_locked). */
end_tran_force (tdes);
return false;
}
/* fetch next entry */
next_p = &get_nextp_ref (curr);
next = address_type::strip_address_mark (*next_p);
/* set mark on next pointer; this way, if anyone else is trying to delete the next entry, it will fail */
if (!ATOMIC_CAS_ADDR (next_p, next, address_type::set_adress_mark (next)))
{
/* joke's on us, this time; somebody else marked it before */
end_tran_force (tdes);
if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
{
*behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
assert ((*behavior_flags) & LF_LIST_BR_RESTARTED);
return false;
}
else
{
restart_search = true;
break;
}
}
/* lock mutex if necessary */
if (m_edesc->using_mutex)
{
if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE))
{
lock_entry_mutex (*curr, entry_mutex);
}
else
{
/* Must be already locked! */
entry_mutex = get_pthread_mutexp (curr);
}
/* since we set the mark, nobody else can delete it, so we have nothing else to check */
}
/* unlink */
if (!ATOMIC_CAS_ADDR (curr_p, curr, next))
{
/* unlink failed; first step is to remove lock (if applicable) */
if (m_edesc->using_mutex && LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE))
{
unlock_entry_mutex_force (entry_mutex);
}
/* remove mark and restart search */
if (!ATOMIC_CAS_ADDR (next_p, address_type::set_adress_mark (next), next))
{
/* impossible case */
assert (false);
end_tran_force (tdes);
return false;
}
end_tran_force (tdes);
if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
{
*behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
assert ((*behavior_flags) & LF_LIST_BR_RESTARTED);
return false;
}
else
{
restart_search = true;
break;
}
}
/* unlink successful */
/* unlock mutex */
if (m_edesc->using_mutex)
{
unlock_entry_mutex_force (entry_mutex);
}
promote_tran_force (tdes);
/* now we can feed the entry to the freelist and forget about it */
freelist_retire (tdes, curr);
/* end the transaction */
end_tran_force (tdes);
/* success! */
return true;
} // m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0
/* advance */
curr_p = &get_nextp_ref (curr);
curr = address_type::strip_address_mark (*curr_p);
} // while (curr != NULL)
} // while (restart_search)
/* search yielded no result so no delete was performed */
end_tran_force (tdes);
return false;
}
/*
* Behavior flags:
*
* LF_LIST_BF_RETURN_ON_RESTART - When insert fails because last entry in bucket was deleted, if this flag is set,
* then the operation is restarted from here, instead of looping inside
* lf_list_insert_internal (as a consequence, hash key is recalculated).
* NOTE: Currently, this flag is always used (I must find out why).
*
* LF_LIST_BF_INSERT_GIVEN - If this flag is set, the caller means to force its own entry into hash table.
* When the flag is not set, a new entry is claimed from freelist.
* NOTE: If an entry with the same key already exists, the entry given as argument is
* automatically retired.
*
* LF_LIST_BF_FIND_OR_INSERT - If this flag is set and an entry for the same key already exists, the existing
* key will be output. If the flag is not set and key exists, insert just gives up
* and a NULL entry is output.
* NOTE: insert will not give up when key exists, if edesc->f_update is provided.
* a new key is generated and insert is restarted.
*/
template <class Key, class T>
bool
hashmap<Key, T>::hash_insert_internal (tran::index tran_index, Key &key, int bflags, T *&entry)
{
ct_stat_type::autotimer stat_autotimer (m_stat_insert, m_active_stats);
bool inserted = false;
while (true)
{
T *&list_head = get_bucket (key);
if (LF_LIST_BF_IS_FLAG_SET (&bflags, LF_LIST_BF_INSERT_GIVEN))
{
assert (entry != NULL);
}
else
{
entry = NULL;
}
inserted = list_insert_internal (tran_index, list_head, key, &bflags, entry);
if ((bflags & LF_LIST_BR_RESTARTED) != 0)
{
// restart
bflags &= ~LF_LIST_BR_RESTARTED;
}
else
{
// done
break;
}
}
return inserted;
}
template <class Key, class T>
bool
hashmap<Key, T>::hash_erase_internal (tran::index tran_index, Key &key, int bflags, T *locked_entry)
{
ct_stat_type::autotimer stat_autotimer (m_stat_erase, m_active_stats);
bool erased = false;
while (true)
{
T *&list_head = get_bucket (key);
erased = list_delete (tran_index, list_head, key, locked_entry, &bflags);
if ((bflags & LF_LIST_BR_RESTARTED) != 0)
{
// restart
bflags &= ~LF_LIST_BR_RESTARTED;
}
else
{
// done
break;
}
}
return erased;
}
//
// hashmap::iterator
//
template <class Key, class T>
hashmap<Key, T>::iterator::iterator (tran::index tran_index, hashmap &hash)
: m_hashmap (&hash)
, m_tdes (&hash.get_tran_descriptor (tran_index))
, m_bucket_index (INVALID_INDEX)
, m_curr (NULL)
{
}
template <class Key, class T>
T *
hashmap<Key, T>::iterator::iterate ()
{
if (m_hashmap == NULL && m_tdes == NULL)
{
assert (false);
return NULL;
}
ct_stat_type::autotimer stat_autotimer (m_hashmap->m_stat_iterates, m_hashmap->m_active_stats);
T **next_p = NULL;
do
{
/* save current leader as trailer */
if (m_curr != NULL)
{
if (m_hashmap->m_edesc->using_mutex)
{
/* follow house rules: lock mutex */
m_hashmap->unlock_entry (*m_curr);
}
/* load next entry */
next_p = &m_hashmap->get_nextp_ref (m_curr);
m_curr = address_type::strip_address_mark (*next_p);
}
else
{
/* reset transaction for each bucket */
if (m_bucket_index != INVALID_INDEX)
{
m_tdes->end_tran ();
}
m_tdes->start_tran ();
/* load next bucket */
m_bucket_index++;
if (m_bucket_index < m_hashmap->m_size)
{
m_curr = address_type::atomic_strip_address_mark (m_hashmap->m_buckets[m_bucket_index]);
}
else
{
/* end */
assert (m_bucket_index == m_hashmap->m_size);
m_tdes->end_tran ();
return NULL;
}
}
if (m_curr != NULL)
{
if (m_hashmap->m_edesc->using_mutex)
{
m_hashmap->lock_entry (*m_curr);
if (address_type::is_address_marked (m_hashmap->get_nextp_ref (m_curr)))
{
/* deleted in the meantime, skip it */
continue;
}
}
}
}
while (m_curr == NULL);
/* we have a valid entry */
return m_curr;
}
template <class Key, class T>
void
hashmap<Key, T>::iterator::restart ()
{
if (m_tdes->is_tran_started ())
{
m_tdes->end_tran();
}
m_curr = NULL;
}
template <class Key, class T>
typename hashmap<Key, T>::iterator &
hashmap<Key, T>::iterator::operator= (iterator &&o)
{
m_hashmap = o.m_hashmap;
o.m_hashmap = NULL;
m_tdes = o.m_tdes;
o.m_tdes = NULL;
m_bucket_index = o.m_bucket_index;
o.m_bucket_index = INVALID_INDEX;
m_curr = o.m_curr;
o.m_curr = NULL;
return *this;
}
} // namespace lockfree
#endif // !_LOCKFREE_HASHMAP_HPP_