CUBRID Engine  latest
lockfree_hashmap.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 //
20 // lock-free hash map structure
21 //
22 
23 #ifndef _LOCKFREE_HASHMAP_HPP_
24 #define _LOCKFREE_HASHMAP_HPP_
25 
26 #include "error_code.h"
27 #include "lock_free.h" // for lf_entry_descriptor
29 #include "lockfree_freelist.hpp"
31 #include "monitor_collect.hpp"
32 #include "porting.h"
33 
34 #include <cassert>
35 #include <limits>
36 #include <mutex>
37 #include <ostream>
38 
39 namespace lockfree
40 {
41  template <class Key, class T>
42  class hashmap
43  {
44  public:
45  class iterator;
46 
47  hashmap ();
48  ~hashmap ();
49 
50  void init (tran::system &transys, size_t hash_size, size_t freelist_block_size, size_t freelist_block_count,
51  lf_entry_descriptor &edesc);
52  void destroy ();
53 
54  T *find (tran::index tran_index, Key &key);
55  bool find_or_insert (tran::index tran_index, Key &key, T *&entry);
56  bool insert (tran::index tran_index, Key &key, T *&entry);
57  bool insert_given (tran::index tran_index, Key &key, T *&entry);
58  bool erase (tran::index tran_index, Key &key);
59  bool erase_locked (tran::index tran_index, Key &key, T *&locked_entry);
60 
61  void unlock (tran::index tran_index, T *&entry);
62 
63  void clear (tran::index tran_index); // NOT LOCK-FREE
64 
65  T *freelist_claim (tran::index tran_index);
66  void freelist_retire (tran::index tran_index, T *&entry);
67 
68  void start_tran (tran::index tran_index);
69  void end_tran (tran::index tran_index);
70 
71  template <typename D>
72  void dump_stats (std::ostream &os) const;
73  void activate_stats ();
74  void deactivate_stats ();
75 
76  size_t get_size () const;
77  size_t get_element_count () const;
78 
79  private:
81 
82  // wrap T with on_reclaim functionality based on edesc.f_uninit
84  {
87 
88  freelist_node_data () = default;
89  ~freelist_node_data () = default;
90 
91  void on_reclaim ()
92  {
93  if (m_edesc->f_uninit != NULL)
94  {
95  (void) m_edesc->f_uninit (&m_entry);
96  }
97  }
98  };
101 
103 
105  size_t m_size;
106 
109 
111 
112  // statistics
123 
124  void *volatile *get_ref (T *p, size_t offset);
125  void *get_ptr (T *p, size_t offset);
126  void *get_ptr_deref (T *p, size_t offset);
127  void *get_keyp (T *p);
128  T *get_nextp (T *p);
129  T *&get_nextp_ref (T *p);
130  pthread_mutex_t *get_pthread_mutexp (T *p);
131 
134  void save_temporary (tran::descriptor &tdes, T *&p);
136  T *freelist_claim (tran::descriptor &tdes);
137  void freelist_retire (tran::descriptor &tdes, T *&p);
138 
139  void safeguard_use_mutex_or_tran_started (const tran::descriptor &tdes, const pthread_mutex_t *mtx);
141  void start_tran_force (tran::descriptor &tdes);
144  void end_tran_force (tran::descriptor &tdes);
145  void lock_entry (T &tolock);
146  void unlock_entry (T &tounlock);
147  void lock_entry_mutex (T &tolock, pthread_mutex_t *&mtx);
148  void unlock_entry_mutex_if_locked (pthread_mutex_t *&mtx);
149  void unlock_entry_mutex_force (pthread_mutex_t *&mtx);
150 
151  size_t get_hash (Key &key) const;
152  T *&get_bucket (Key &key);
154 
155  void list_find (tran::index tran_index, T *list_head, Key &key, int *behavior_flags, T *&found_node);
156  bool list_insert_internal (tran::index tran_index, T *&list_head, Key &key, int *behavior_flags,
157  T *&found_node);
158  bool list_delete (tran::index tran_index, T *&list_head, Key &key, T *locked_entry, int *behavior_flags);
159 
160  bool hash_insert_internal (tran::index tran_index, Key &key, int bflags, T *&entry);
161  bool hash_erase_internal (tran::index tran_index, Key &key, int bflags, T *locked_entry);
162 
163  template <typename D>
164  void dump_stat (std::ostream &os, const char *name, const ct_stat_type &ct_stat) const;
165 
166  static constexpr std::ptrdiff_t free_node_offset_of_data (free_node_type fn)
167  {
168  return ((char *) (&fn.get_data ().m_entry)) - ((char *) (&fn));
169  }
170  }; // class hashmap
171 
172  template <class Key, class T>
173  class hashmap<Key, T>::iterator
174  {
175  public:
176  iterator () = default;
177  iterator (tran::index tran_index, hashmap &hash);
178  ~iterator () = default;
179 
180  T *iterate ();
181  void restart ();
182 
183  iterator &operator= (iterator &&o);
184 
185  private:
187 
191  T *m_curr;
192  };
193 
194 } // namespace lockfree
195 
196 //
197 // implementation
198 //
199 
200 namespace lockfree
201 {
202  template <class Key, class T>
204  : m_freelist (NULL)
205  , m_buckets (NULL)
206  , m_size (0)
207  , m_backbuffer (NULL)
208  , m_backbuffer_mutex ()
209  , m_edesc (NULL)
210  , m_stat_find ()
211  , m_stat_insert ()
212  , m_stat_erase ()
213  , m_stat_unlock ()
214  , m_stat_clear ()
215  , m_stat_iterates ()
216  , m_stat_claim ()
217  , m_stat_retire ()
218  , m_active_stats ()
219  {
220  }
221 
222  template <class Key, class T>
224  {
225  destroy ();
226  }
227 
228  template <class Key, class T>
229  void
231  {
232  if (m_buckets != NULL)
233  {
234  T *node_iter;
235  T *save_next;
237 
238  for (size_t i = 0; i < m_size; ++i)
239  {
240  for (node_iter = m_buckets[i]; node_iter != NULL; node_iter = save_next)
241  {
243  save_next = get_nextp (node_iter);
244  freelist_retire (tdes, node_iter);
245  }
246  }
247  }
248 
249  delete [] m_buckets;
250  m_buckets = NULL;
251  delete [] m_backbuffer;
252  m_backbuffer = NULL;
253 
254  delete m_freelist;
255  m_freelist = NULL;
256  }
257 
258  template <class Key, class T>
259  void
260  hashmap<Key, T>::init (tran::system &transys, size_t hash_size, size_t freelist_block_size,
261  size_t freelist_block_count, lf_entry_descriptor &edesc)
262  {
263  m_freelist = new freelist_type (transys, freelist_block_size, freelist_block_count);
264 
265  m_edesc = &edesc;
266 
267  m_size = hash_size;
268  m_buckets = new T *[m_size] ();
269 
270  m_backbuffer = new T *[m_size] ();
271  for (size_t i = 0; i < m_size; i++)
272  {
273  m_backbuffer[i] = address_type::set_adress_mark (NULL);
274  }
275  }
276 
277  template <class Key, class T>
278  T *
279  hashmap<Key, T>::find (tran::index tran_index, Key &key)
280  {
282 
283  int bflags = 0;
284  bool restart = true;
285  T *entry = NULL;
286 
287  while (restart)
288  {
289  T *&list_head = get_bucket (key);
290  entry = NULL;
292  list_find (tran_index, list_head, key, &bflags, entry);
293  restart = (bflags & LF_LIST_BR_RESTARTED) != 0;
294  }
295  return entry;
296  }
297 
298  template <class Key, class T>
299  bool
300  hashmap<Key, T>::find_or_insert (tran::index tran_index, Key &key, T *&entry)
301  {
303  }
304 
305  template <class Key, class T>
306  bool
307  hashmap<Key, T>::insert (tran::index tran_index, Key &key, T *&entry)
308  {
309  return hash_insert_internal (tran_index, key, LF_LIST_BF_RETURN_ON_RESTART, entry);
310  }
311 
312  template <class Key, class T>
313  bool
314  hashmap<Key, T>::insert_given (tran::index tran_index, Key &key, T *&entry)
315  {
316  assert (entry != NULL);
317  return hash_insert_internal (tran_index, key,
319  entry);
320  }
321 
322  template <class Key, class T>
323  bool
324  hashmap<Key, T>::erase (tran::index tran_index, Key &key)
325  {
327  }
328 
329  /*
330  * NOTE: Careful when calling this function. The typical scenario to call this function is to first find entry using
331  * lf_hash_find and then call lf_hash_delete on the found entry.
332  * NOTE: lf_hash_delete_already_locks can be called only if entry has mutexes.
333  * NOTE: The delete will be successful only if the entry found by key matches the given entry.
334  * Usually, the entry will match. However, we do have a limited scenario when a different entry with the same
335  * key may be found:
336  * 1. Entry was found or inserted by this transaction.
337  * 2. Another transaction cleared the hash. All current entries are moved to back buffer and will be soon
338  * retired.
339  * 3. A third transaction inserts a new entry with the same key.
340  * 4. This transaction tries to delete the entry but the entry inserted by the third transaction si found.
341  */
342  template <class Key, class T>
343  bool
344  hashmap<Key, T>::erase_locked (tran::index tran_index, Key &key, T *&locked_entry)
345  {
346  assert (locked_entry != NULL);
348 
349  bool success = hash_erase_internal (tran_index, key, LF_LIST_BF_RETURN_ON_RESTART, locked_entry);
350  if (success)
351  {
352  locked_entry = NULL;
353  }
354  return success;
355  }
356 
357  template <class Key, class T>
358  void
359  hashmap<Key, T>::unlock (tran::index tran_index, T *&entry)
360  {
362 
363  assert (entry != NULL);
364  if (m_edesc->using_mutex)
365  {
366  unlock_entry (*entry);
367  }
368  else
369  {
370  get_tran_descriptor (tran_index).end_tran ();
371  }
372  entry = NULL;
373  }
374 
375  template <class Key, class T>
376  void
378  {
380 
381  tran::descriptor &tdes = get_tran_descriptor (tran_index);
382 
383  /* lock mutex */
384  std::unique_lock<std::mutex> ulock (m_backbuffer_mutex);
385 
386  /* swap bucket pointer with current backbuffer */
387  T **old_buckets = ATOMIC_TAS_ADDR (&m_buckets, m_backbuffer);
388 
389  /* clear bucket buffer, containing remains of old entries marked for delete */
390  for (size_t i = 0; i < m_size; ++i)
391  {
393  m_buckets[i] = NULL;
394  }
395 
396  /* register new backbuffer */
397  m_backbuffer = old_buckets;
398 
399  /* retire all entries from old buckets; note that threads currently operating on the entries will not be disturbed
400  * since the actual deletion is performed when the entries are no longer handled by active transactions */
401  T *curr = NULL;
402  T **next_p = NULL;
403  T *next = NULL;
404  pthread_mutex_t *mutex_p = NULL;
405  for (size_t i = 0; i < m_size; ++i)
406  {
407  // remove list from bucket
408  // warning: this may spin
409  do
410  {
411  curr = address_type::strip_address_mark (old_buckets[i]);
412  }
413  while (!ATOMIC_CAS_ADDR (&old_buckets[i], curr, address_type::set_adress_mark (NULL)));
414 
415  while (curr != NULL)
416  {
417  next_p = &get_nextp_ref (curr);
418 
419  /* unlink from hash chain */
420  // warning: this may spin
421  do
422  {
423  next = address_type::strip_address_mark (*next_p);
424  }
425  while (!ATOMIC_CAS_ADDR (next_p, next, address_type::set_adress_mark (next)));
426 
427  /* wait for mutex */
428  if (m_edesc->using_mutex)
429  {
430  lock_entry_mutex (*curr, mutex_p);
431  unlock_entry_mutex_force (mutex_p);
432 
433  /* there should be only one mutex lock-unlock per entry per access via bucket array, so locking/unlocking
434  * once while the entry is inaccessible should be enough to guarantee nobody will be using it afterwards */
435  }
436 
437  /* retire */
438  freelist_retire (tdes, curr);
439 
440  /* advance */
441  curr = next;
442  }
443  }
444  }
445 
446  template <class Key, class T>
447  size_t
449  {
450  return m_size;
451  }
452 
453  template <class Key, class T>
454  size_t
456  {
457  return m_freelist->get_claimed_count ();
458  }
459 
460  template <class Key, class T>
461  size_t
462  hashmap<Key, T>::get_hash (Key &key) const
463  {
464  return m_edesc->f_hash (&key, (int) m_size);
465  }
466 
467  template <class Key, class T>
468  T *&
470  {
471  return m_buckets[get_hash (key)];
472  }
473 
474  template <class Key, class T>
477  {
478  return m_freelist->get_transaction_table ().get_descriptor (tran_index);
479  }
480 
481  template <class Key, class T>
482  void *
483  hashmap<Key, T>::get_ptr (T *p, size_t offset)
484  {
485  assert (p != NULL);
487  return (void *) (((char *) p) + offset);
488  }
489 
490  template <class Key, class T>
491  void *volatile *
492  hashmap<Key, T>::get_ref (T *p, size_t offset)
493  {
494  assert (p != NULL);
496  return (void *volatile *) (((char *) p) + offset);
497  }
498 
499  template <class Key, class T>
500  void *
501  hashmap<Key, T>::get_ptr_deref (T *p, size_t offset)
502  {
503  return *get_ref (p, offset);
504  }
505 
506  template <class Key, class T>
507  void *
509  {
510  return get_ptr (p, m_edesc->of_key);
511  }
512 
513  template <class Key, class T>
514  T *
516  {
517  return (T *) get_ptr_deref (p, m_edesc->of_next);
518  }
519 
520  template <class Key, class T>
521  T *&
523  {
524  return * (T **) get_ref (p, m_edesc->of_next);
525  }
526 
527  template <class Key, class T>
528  pthread_mutex_t *
530  {
531  return (pthread_mutex_t *) get_ptr (p, m_edesc->of_mutex);
532  }
533 
534  template <class Key, class T>
535  T *
537  {
538  assert (fn != NULL);
539  return &fn->get_data ().m_entry;
540  }
541 
542  template <class Key, class T>
545  {
546  // not nice, but necessary until we fully refactor lockfree hashmap
547  const std::ptrdiff_t off = free_node_offset_of_data (free_node_type ());
548  char *cp = (char *) p;
549  cp -= off;
550  return (free_node_type *) cp;
551  }
552 
553  template <class Key, class T>
554  void
556  {
558  tdes.save_reclaimable (fn);
559  p = NULL;
560  }
561 
562  template <class Key, class T>
563  T *
565  {
566  return from_free_node (reinterpret_cast<free_node_type *> (tdes.pull_saved_reclaimable ()));
567  }
568 
569  template <class Key, class T>
570  void
572  {
573  return freelist_retire (get_tran_descriptor (tran_index), entry);
574  }
575 
576  template <class Key, class T>
577  void
579  {
580  get_tran_descriptor (tran_index).start_tran ();
581  }
582 
583  template <class Key, class T>
584  void
586  {
587  get_tran_descriptor (tran_index).end_tran ();
588  }
589 
590  template <class Key, class T>
591  template <typename D>
592  void
593  hashmap<Key, T>::dump_stat (std::ostream &os, const char *name, const ct_stat_type &ct_stat) const
594  {
595  if (ct_stat.get_count () != 0)
596  {
597  os << name;
598  os << "[" << ct_stat.get_count ();
599  os << ", " << std::chrono::duration_cast<D> (ct_stat.get_time ()).count ();
600  os << "] ";
601  }
602  }
603 
604  template <class Key, class T>
605  template <typename D>
606  void
607  hashmap<Key, T>::dump_stats (std::ostream &os) const
608  {
609  dump_stat<D> (os, "find", m_stat_find);
610  dump_stat<D> (os, "insert", m_stat_insert);
611  dump_stat<D> (os, "erase", m_stat_erase);
612 
613  dump_stat<D> (os, "unlock", m_stat_unlock);
614 
615  dump_stat<D> (os, "clear", m_stat_clear);
616  dump_stat<D> (os, "iter", m_stat_iterates);
617 
618  dump_stat<D> (os, "claim", m_stat_claim);
619  dump_stat<D> (os, "retire", m_stat_retire);
620  }
621 
622  template <class Key, class T>
623  void
625  {
626  m_active_stats = true;
627  }
628 
629  template <class Key, class T>
630  void
632  {
633  m_active_stats = false;
634  }
635 
636  template <class Key, class T>
637  void
639  {
641  assert (p != NULL);
642  m_freelist->retire (tdes, *to_free_node (p));
643  p = NULL;
644  }
645 
646  template <class Key, class T>
647  T *
649  {
650  return freelist_claim (get_tran_descriptor (tran_index));
651  }
652 
653  template <class Key, class T>
654  T *
656  {
658  T *claimed = NULL;
659  free_node_type *fn = reinterpret_cast<free_node_type *> (tdes.pull_saved_reclaimable ());
660  bool is_local_tran = false;
661 
662  if (!tdes.is_tran_started ())
663  {
664  tdes.start_tran ();
665  is_local_tran = true;
666  }
667  if (fn == NULL)
668  {
669  fn = m_freelist->claim (tdes);
670  assert (fn != NULL);
671  // make sure m_edesc is initialized
672  fn->get_data ().m_edesc = m_edesc;
673 
674  claimed = from_free_node (fn);
675  // call f_init
676  if (m_edesc->f_init != NULL)
677  {
678  m_edesc->f_init (claimed);
679  }
680  }
681  else
682  {
683  claimed = from_free_node (fn);
684  // already initialized
685  }
686  get_nextp_ref (claimed) = NULL; // make sure link is removed
687 
688  if (is_local_tran)
689  {
690  tdes.end_tran ();
691  }
692  return claimed;
693  }
694 
695  template <class Key, class T>
696  void
698  {
699  /* Assert used to make sure the current entry is protected by either transaction or mutex. */
700 
701  /* The transaction is started if and only if we don't use mutex */
702  assert (tdes.is_tran_started () == !m_edesc->using_mutex);
703 
704  /* If we use mutex, we have a mutex locked. */
705  assert (!m_edesc->using_mutex || mtx != NULL);
706  }
707 
708  template <class Key, class T>
709  void
711  {
712  tdes.start_tran (); // same result if it was started or not
713  }
714 
715  template <class Key, class T>
716  void
718  {
719  assert (!tdes.is_tran_started ());
720  tdes.start_tran ();
721  }
722 
723  template <class Key, class T>
724  void
726  {
727  assert (tdes.is_tran_started ());
729  }
730 
731  template <class Key, class T>
732  void
734  {
735  if (tdes.is_tran_started ())
736  {
737  tdes.end_tran ();
738  }
739  }
740 
741  template <class Key, class T>
742  void
744  {
745  assert (tdes.is_tran_started ());
746  tdes.end_tran ();
747  }
748 
749  template <class Key, class T>
750  void
752  {
753  pthread_mutex_t *no_output_mtx = NULL;
754  lock_entry_mutex (tolock, no_output_mtx);
755  }
756 
757  template <class Key, class T>
758  void
760  {
761  pthread_mutex_t *mtx = get_pthread_mutexp (&tounlock);
763  }
764 
765  template <class Key, class T>
766  void
767  hashmap<Key, T>::lock_entry_mutex (T &tolock, pthread_mutex_t *&mtx)
768  {
770  assert (mtx == NULL);
771 
772  mtx = get_pthread_mutexp (&tolock);
773 
774 #if defined (SERVER_MODE)
775  pthread_mutex_lock (mtx);
776 #endif // SERVER_MODE
777  }
778 
779  template <class Key, class T>
780  void
782  {
783  if (m_edesc->using_mutex && mtx != NULL)
784  {
785 #if defined (SERVER_MODE)
786  pthread_mutex_unlock (mtx);
787 #endif // SERVER_MODE
788  mtx = NULL;
789  }
790  }
791 
792  template <class Key, class T>
793  void
795  {
796  assert (m_edesc->using_mutex && mtx != NULL);
797 #if defined (SERVER_MODE)
798  pthread_mutex_unlock (mtx);
799 #endif // SERVER_MODE
800  mtx = NULL;
801  }
802 
803  template <class Key, class T>
804  void
805  hashmap<Key, T>::list_find (tran::index tran_index, T *list_head, Key &key, int *behavior_flags, T *&entry)
806  {
807  tran::descriptor &tdes = get_tran_descriptor (tran_index);
808  T *curr = NULL;
809  pthread_mutex_t *entry_mutex = NULL;
810 
811  /* by default, not found */
812  entry = NULL;
813 
814  tdes.start_tran ();
815  bool restart_search = true;
816 
817  while (restart_search) // restart_search:
818  {
819  restart_search = false;
820 
821  curr = address_type::atomic_strip_address_mark (list_head);
822  restart_search = false;
823 
824  while (curr != NULL)
825  {
826  if (m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0)
827  {
828  /* found! */
829  if (m_edesc->using_mutex)
830  {
831  /* entry has a mutex protecting it's members; lock it */
832  lock_entry_mutex (*curr, entry_mutex);
833 
834  /* mutex has been locked, no need to keep transaction */
835  tdes.end_tran ();
836 
838  {
839  /* while waiting for lock, somebody else deleted the entry; restart the search */
840  unlock_entry_mutex_force (entry_mutex);
841 
842  if (behavior_flags != NULL && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
843  {
844  *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
845  return;
846  }
847  else
848  {
849  // restart
850  restart_search = true;
851  break;
852  }
853  }
854  } // m_edesc->using_mutex
855 
856  entry = curr;
857  return;
858  }
859 
860  /* advance */
862  }
863  } // while (restart_search)
864 
865  /* all ok but not found */
866  tdes.end_tran ();
867  }
868 
869 
870  /*
871  * Behavior flags:
872  *
873  * LF_LIST_BF_RETURN_ON_RESTART - When insert fails because last entry in bucket was deleted, if this flag is set,
874  * then the operation is restarted from here, instead of looping inside
875  * lf_list_insert_internal (as a consequence, hash key is recalculated).
876  * NOTE: Currently, this flag is always used (I must find out why).
877  *
878  * LF_LIST_BF_INSERT_GIVEN - If this flag is set, the caller means to force its own entry into hash table.
879  * When the flag is not set, a new entry is claimed from freelist.
880  * NOTE: If an entry with the same key already exists, the entry given as argument is
881  * automatically retired.
882  *
883  * LF_LIST_BF_FIND_OR_INSERT - If this flag is set and an entry for the same key already exists, the existing
884  * key will be output. If the flag is not set and key exists, insert just gives up
885  * and a NULL entry is output.
886  * NOTE: insert will not give up when key exists, if m_edesc->f_update is provided.
887  * a new key is generated and insert is restarted.
888  */
889  template <class Key, class T>
890  bool
891  hashmap<Key, T>::list_insert_internal (tran::index tran_index, T *&list_head, Key &key, int *behavior_flags,
892  T *&entry)
893  {
894  pthread_mutex_t *entry_mutex = NULL; /* Locked entry mutex when not NULL */
895  T **curr_p = NULL;
896  T *curr = NULL;
897  tran::descriptor &tdes = get_tran_descriptor (tran_index);
898  bool restart_search = true;
899 
900  while (restart_search)
901  {
902  restart_search = false;
903 
904  start_tran_force (tdes);
905 
906  curr_p = &list_head;
908 
909  /* search */
910  while (curr_p != NULL) // this is always true actually...
911  {
912  assert (tdes.is_tran_started ());
913  assert (entry_mutex == NULL);
914 
915  if (curr != NULL)
916  {
917  if (m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0)
918  {
919  /* found an entry with the same key. */
920 
921  if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN) && entry != NULL)
922  {
923  /* save this for further (local) use. */
924  save_temporary (tdes, entry);
925  }
926 
927  if (m_edesc->using_mutex)
928  {
929  /* entry has a mutex protecting it's members; lock it */
930  lock_entry_mutex (*curr, entry_mutex);
931 
932  /* mutex has been locked, no need to keep transaction alive */
933  end_tran_force (tdes);
934 
936  {
937  /* while waiting for lock, somebody else deleted the entry; restart the search */
938  unlock_entry_mutex_force (entry_mutex);
939 
940  if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
941  {
942  *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
943  return false;
944  }
945  else
946  {
947  restart_search = true;
948  break;
949  }
950  }
951  }
952 
953  safeguard_use_mutex_or_tran_started (tdes, entry_mutex);
954  if (m_edesc->f_duplicate != NULL)
955  {
956  /* we have a duplicate key callback. */
957  if (m_edesc->f_duplicate (&key, curr) != NO_ERROR)
958  {
959  end_tran_if_started (tdes);
960  unlock_entry_mutex_if_locked (entry_mutex);
961  return false;
962  }
963 
964 #if 1
965  LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
966  end_tran_if_started (tdes);
967  unlock_entry_mutex_if_locked (entry_mutex);
968  return false;
969 #else /* !1 = 0 */
970  /* Could we have such cases that we just update existing entry without modifying anything else?
971  * And would it be usable with just a flag?
972  * Then this code may be used.
973  * So far we have only one usage for f_duplicate, which increment SESSION_ID and requires
974  * restarting hash search. This will be the usual approach if f_duplicate.
975  * If we just increment a counter in existing entry, we don't need to do anything else. This
976  * however most likely depends on f_duplicate implementation. Maybe it is more useful to give
977  * behavior_flags argument to f_duplicate to tell us if restart is or is not needed.
978  */
980  {
981  LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
982  end_tran_if_started (tdes);
983  unlock_entry_mutex_if_locked (entry_mutex);
984  return false;
985  }
986  else
987  {
988  /* duplicate does not require restarting search. */
989  if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
990  {
991  /* Could not be inserted. Retire the entry. */
992  freelist_retire (tdes, entry);
993  }
994 
995  /* fall through to output current entry. */
996  }
997 #endif /* 0 */
998  }
999  else // m_edesc->f_duplicate == NULL
1000  {
1001  if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
1002  {
1003  /* the given entry could not be inserted. retire it. */
1004  freelist_retire (tdes, entry);
1005  }
1006 
1007  if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_FIND_OR_INSERT))
1008  {
1009  /* found entry is not accepted */
1010  end_tran_if_started (tdes);
1011  unlock_entry_mutex_if_locked (entry_mutex);
1012  return false;
1013  }
1014 
1015  /* fall through to output current entry. */
1016  } // m_edesc->f_duplicate == NULL
1017 
1018  assert (entry == NULL);
1019  safeguard_use_mutex_or_tran_started (tdes, entry_mutex);
1020  entry = curr;
1021  return false;
1022  } // m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0
1023 
1024  /* advance */
1025  curr_p = &get_nextp_ref (curr);
1026  curr = address_type::strip_address_mark (*curr_p);
1027  }
1028  else // curr == NULL
1029  {
1030  /* end of bucket, we must insert */
1031  if (entry == NULL)
1032  {
1034 
1035  entry = freelist_claim (tdes);
1036  assert (entry != NULL);
1037 
1038  /* set it's key */
1039  if (m_edesc->f_key_copy (&key, get_keyp (entry)) != NO_ERROR)
1040  {
1041  assert (false);
1042  end_tran_force (tdes);
1043  return false;
1044  }
1045  }
1046 
1047  if (m_edesc->using_mutex)
1048  {
1049  /* entry has a mutex protecting it's members; lock it */
1050  lock_entry_mutex (*entry, entry_mutex);
1051  }
1052 
1053  /* attempt an add */
1054  if (!ATOMIC_CAS_ADDR (curr_p, (T *) NULL, entry))
1055  {
1056  if (m_edesc->using_mutex)
1057  {
1058  /* link failed, unlock mutex */
1059  unlock_entry_mutex_force (entry_mutex);
1060  }
1061 
1062  /* someone added before us, restart process */
1064  {
1065  if (!LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_INSERT_GIVEN))
1066  {
1067  save_temporary (tdes, entry);
1068  }
1069  LF_LIST_BR_SET_FLAG (behavior_flags, LF_LIST_BR_RESTARTED);
1070  end_tran_force (tdes);
1071  return false;
1072  }
1073  else
1074  {
1075  end_tran_force (tdes);
1076  restart_search = true;
1077  break;
1078  }
1079  }
1080 
1081  /* end transaction if mutex is acquired */
1082  if (m_edesc->using_mutex)
1083  {
1084  end_tran_force (tdes);
1085  }
1086 
1087  /* done! */
1088  return true;
1089  } // else of if (curr != NULL)
1090  } // while (curr_p != NULL)
1091 
1092  // only way to exit while (curr_p != NULL) loop is to restart search
1093  assert (restart_search);
1094  } // while (restart_search)
1095 
1096  /* impossible case */
1097  assert (false);
1098  return false;
1099  }
1100 
1101  template <class Key, class T>
1102  bool
1103  hashmap<Key, T>::list_delete (tran::index tran_index, T *&list_head, Key &key, T *locked_entry, int *behavior_flags)
1104  {
1105  pthread_mutex_t *entry_mutex = NULL;
1106  tran::descriptor &tdes = get_tran_descriptor (tran_index);
1107  T **curr_p;
1108  T *curr;
1109  T **next_p;
1110  T *next;
1111  bool restart_search = true;
1112 
1113  while (restart_search)
1114  {
1115  restart_search = false;
1116 
1117  start_tran_force (tdes);
1118  curr_p = &list_head;
1119  curr = address_type::atomic_strip_address_mark (*curr_p);
1120 
1121  /* search */
1122  while (curr != NULL)
1123  {
1124  /* is this the droid we are looking for? */
1125  if (m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0)
1126  {
1127  if (locked_entry != NULL && locked_entry != curr)
1128  {
1130  && !LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE));
1131 
1132  /* We are here because lf_hash_delete_already_locked was called. The entry found by matching key is
1133  * different from the entry we were trying to delete.
1134  * This is possible (please find the description of hash_delete_already_locked). */
1135  end_tran_force (tdes);
1136  return false;
1137  }
1138 
1139  /* fetch next entry */
1140  next_p = &get_nextp_ref (curr);
1141  next = address_type::strip_address_mark (*next_p);
1142 
1143  /* set mark on next pointer; this way, if anyone else is trying to delete the next entry, it will fail */
1144  if (!ATOMIC_CAS_ADDR (next_p, next, address_type::set_adress_mark (next)))
1145  {
1146  /* joke's on us, this time; somebody else marked it before */
1147 
1148  end_tran_force (tdes);
1149  if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
1150  {
1151  *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
1152  assert ((*behavior_flags) & LF_LIST_BR_RESTARTED);
1153  return false;
1154  }
1155  else
1156  {
1157  restart_search = true;
1158  break;
1159  }
1160  }
1161 
1162  /* lock mutex if necessary */
1163  if (m_edesc->using_mutex)
1164  {
1165  if (LF_LIST_BF_IS_FLAG_SET (behavior_flags, LF_LIST_BF_LOCK_ON_DELETE))
1166  {
1167  lock_entry_mutex (*curr, entry_mutex);
1168  }
1169  else
1170  {
1171  /* Must be already locked! */
1172  entry_mutex = get_pthread_mutexp (curr);
1173  }
1174 
1175  /* since we set the mark, nobody else can delete it, so we have nothing else to check */
1176  }
1177 
1178  /* unlink */
1179  if (!ATOMIC_CAS_ADDR (curr_p, curr, next))
1180  {
1181  /* unlink failed; first step is to remove lock (if applicable) */
1183  {
1184  unlock_entry_mutex_force (entry_mutex);
1185  }
1186 
1187  /* remove mark and restart search */
1188  if (!ATOMIC_CAS_ADDR (next_p, address_type::set_adress_mark (next), next))
1189  {
1190  /* impossible case */
1191  assert (false);
1192  end_tran_force (tdes);
1193  return false;
1194  }
1195 
1196  end_tran_force (tdes);
1197  if (behavior_flags && (*behavior_flags & LF_LIST_BF_RETURN_ON_RESTART))
1198  {
1199  *behavior_flags = (*behavior_flags) | LF_LIST_BR_RESTARTED;
1200  assert ((*behavior_flags) & LF_LIST_BR_RESTARTED);
1201  return false;
1202  }
1203  else
1204  {
1205  restart_search = true;
1206  break;
1207  }
1208  }
1209  /* unlink successful */
1210 
1211  /* unlock mutex */
1212  if (m_edesc->using_mutex)
1213  {
1214  unlock_entry_mutex_force (entry_mutex);
1215  }
1216 
1217  promote_tran_force (tdes);
1218 
1219  /* now we can feed the entry to the freelist and forget about it */
1220  freelist_retire (tdes, curr);
1221 
1222  /* end the transaction */
1223  end_tran_force (tdes);
1224 
1225  /* success! */
1226  return true;
1227  } // m_edesc->f_key_cmp (&key, get_keyp (curr)) == 0
1228 
1229  /* advance */
1230  curr_p = &get_nextp_ref (curr);
1231  curr = address_type::strip_address_mark (*curr_p);
1232  } // while (curr != NULL)
1233  } // while (restart_search)
1234 
1235  /* search yielded no result so no delete was performed */
1236  end_tran_force (tdes);
1237  return false;
1238  }
1239 
1240  /*
1241  * Behavior flags:
1242  *
1243  * LF_LIST_BF_RETURN_ON_RESTART - When insert fails because last entry in bucket was deleted, if this flag is set,
1244  * then the operation is restarted from here, instead of looping inside
1245  * lf_list_insert_internal (as a consequence, hash key is recalculated).
1246  * NOTE: Currently, this flag is always used (I must find out why).
1247  *
1248  * LF_LIST_BF_INSERT_GIVEN - If this flag is set, the caller means to force its own entry into hash table.
1249  * When the flag is not set, a new entry is claimed from freelist.
1250  * NOTE: If an entry with the same key already exists, the entry given as argument is
1251  * automatically retired.
1252  *
1253  * LF_LIST_BF_FIND_OR_INSERT - If this flag is set and an entry for the same key already exists, the existing
1254  * key will be output. If the flag is not set and key exists, insert just gives up
1255  * and a NULL entry is output.
1256  * NOTE: insert will not give up when key exists, if edesc->f_update is provided.
1257  * a new key is generated and insert is restarted.
1258  */
1259  template <class Key, class T>
1260  bool
1261  hashmap<Key, T>::hash_insert_internal (tran::index tran_index, Key &key, int bflags, T *&entry)
1262  {
1264 
1265  bool inserted = false;
1266 
1267  while (true)
1268  {
1269  T *&list_head = get_bucket (key);
1271  {
1272  assert (entry != NULL);
1273  }
1274  else
1275  {
1276  entry = NULL;
1277  }
1278 
1279  inserted = list_insert_internal (tran_index, list_head, key, &bflags, entry);
1280  if ((bflags & LF_LIST_BR_RESTARTED) != 0)
1281  {
1282  // restart
1283  bflags &= ~LF_LIST_BR_RESTARTED;
1284  }
1285  else
1286  {
1287  // done
1288  break;
1289  }
1290  }
1291  return inserted;
1292  }
1293 
1294 
1295  template <class Key, class T>
1296  bool
1297  hashmap<Key, T>::hash_erase_internal (tran::index tran_index, Key &key, int bflags, T *locked_entry)
1298  {
1300 
1301  bool erased = false;
1302 
1303  while (true)
1304  {
1305  T *&list_head = get_bucket (key);
1306  erased = list_delete (tran_index, list_head, key, locked_entry, &bflags);
1307  if ((bflags & LF_LIST_BR_RESTARTED) != 0)
1308  {
1309  // restart
1310  bflags &= ~LF_LIST_BR_RESTARTED;
1311  }
1312  else
1313  {
1314  // done
1315  break;
1316  }
1317  }
1318  return erased;
1319  }
1320 
1321  //
1322  // hashmap::iterator
1323  //
1324  template <class Key, class T>
1326  : m_hashmap (&hash)
1327  , m_tdes (&hash.get_tran_descriptor (tran_index))
1328  , m_bucket_index (INVALID_INDEX)
1329  , m_curr (NULL)
1330  {
1331  }
1332 
1333  template <class Key, class T>
1334  T *
1336  {
1337  if (m_hashmap == NULL && m_tdes == NULL)
1338  {
1339  assert (false);
1340  return NULL;
1341  }
1342 
1344 
1345  T **next_p = NULL;
1346  do
1347  {
1348  /* save current leader as trailer */
1349  if (m_curr != NULL)
1350  {
1352  {
1353  /* follow house rules: lock mutex */
1355  }
1356 
1357  /* load next entry */
1358  next_p = &m_hashmap->get_nextp_ref (m_curr);
1360  }
1361  else
1362  {
1363  /* reset transaction for each bucket */
1365  {
1366  m_tdes->end_tran ();
1367  }
1368  m_tdes->start_tran ();
1369 
1370  /* load next bucket */
1371  m_bucket_index++;
1372 
1373  if (m_bucket_index < m_hashmap->m_size)
1374  {
1376  }
1377  else
1378  {
1379  /* end */
1381  m_tdes->end_tran ();
1382  return NULL;
1383  }
1384  }
1385 
1386  if (m_curr != NULL)
1387  {
1389  {
1391 
1393  {
1394  /* deleted in the meantime, skip it */
1395  continue;
1396  }
1397  }
1398  }
1399  }
1400  while (m_curr == NULL);
1401 
1402  /* we have a valid entry */
1403  return m_curr;
1404  }
1405 
1406  template <class Key, class T>
1407  void
1409  {
1410  if (m_tdes->is_tran_started ())
1411  {
1412  m_tdes->end_tran();
1413  }
1414  m_curr = NULL;
1415  }
1416 
1417  template <class Key, class T>
1418  typename hashmap<Key, T>::iterator &
1420  {
1421  m_hashmap = o.m_hashmap;
1422  o.m_hashmap = NULL;
1423 
1424  m_tdes = o.m_tdes;
1425  o.m_tdes = NULL;
1426 
1427  m_bucket_index = o.m_bucket_index;
1428  o.m_bucket_index = INVALID_INDEX;
1429 
1430  m_curr = o.m_curr;
1431  o.m_curr = NULL;
1432 
1433  return *this;
1434  }
1435 } // namespace lockfree
1436 
1437 #endif // !_LOCKFREE_HASHMAP_HPP_
static bool is_address_marked(T *addr)
ct_stat_type m_stat_iterates
#define NO_ERROR
Definition: error_code.h:46
#define LF_LIST_BF_INSERT_GIVEN
Definition: lock_free.h:274
void unlock_entry(T &tounlock)
static const index INVALID_INDEX
void *volatile * get_ref(T *p, size_t offset)
bool insert_given(tran::index tran_index, Key &key, T *&entry)
T * claim_temporary(tran::descriptor &tdes)
LF_ENTRY_INITIALIZE_FUNC f_init
Definition: lock_free.h:90
void unlock_entry_mutex_force(pthread_mutex_t *&mtx)
static API_MUTEX mutex
Definition: api_util.c:72
time_rep get_time(fetch_mode mode=FETCH_GLOBAL) const
void end_tran(tran::index tran_index)
bool find_or_insert(tran::index tran_index, Key &key, T *&entry)
#define pthread_mutex_unlock(a)
Definition: area_alloc.c:51
bool insert(tran::index tran_index, Key &key, T *&entry)
LF_ENTRY_UNINITIALIZE_FUNC f_uninit
Definition: lock_free.h:93
lf_entry_descriptor * m_edesc
size_t get_element_count() const
counter_timer_statistic< amount_accumulator_atomic_statistic, time_accumulator_atomic_statistic > atomic_counter_timer_stat
void unlock_entry_mutex_if_locked(pthread_mutex_t *&mtx)
tran::descriptor & get_tran_descriptor(tran::index tran_index)
Definition: lock_free.h:63
ct_stat_type m_stat_clear
ct_stat_type m_stat_find
static constexpr std::ptrdiff_t free_node_offset_of_data(free_node_type fn)
freelist_type * m_freelist
bool list_delete(tran::index tran_index, T *&list_head, Key &key, T *locked_entry, int *behavior_flags)
std::mutex m_backbuffer_mutex
tran::table & get_transaction_table()
T * from_free_node(free_node_type *fn)
size_t get_hash(Key &key) const
size_t get_claimed_count() const
free_node_type * to_free_node(T *p)
static T * strip_address_mark(T *addr)
amount_rep get_count(fetch_mode mode=FETCH_GLOBAL) const
void dump_stats(std::ostream &os) const
#define assert(x)
LF_ENTRY_KEY_COPY_FUNC f_key_copy
Definition: lock_free.h:96
void lock_entry(T &tolock)
bool hash_insert_internal(tran::index tran_index, Key &key, int bflags, T *&entry)
LF_ENTRY_KEY_COMPARE_FUNC f_key_cmp
Definition: lock_free.h:99
unsigned int of_next
Definition: lock_free.h:69
#define LF_LIST_BF_IS_FLAG_SET(bf, flag)
Definition: lock_free.h:277
ct_stat_type m_stat_claim
ct_stat_type m_stat_retire
void dump_stat(std::ostream &os, const char *name, const ct_stat_type &ct_stat) const
void unlock(tran::index tran_index, T *&entry)
#define LF_LIST_BF_RESTART_ON_DUPLICATE
Definition: lock_free.h:273
iterator & operator=(iterator &&o)
size_t get_size() const
#define NULL
Definition: freelistheap.h:34
freelist< freelist_node_data > freelist_type
static int success()
void save_reclaimable(reclaimable_node *&node)
unsigned int of_mutex
Definition: lock_free.h:78
LF_ENTRY_DUPLICATE_KEY_HANDLER f_duplicate
Definition: lock_free.h:106
static T * atomic_strip_address_mark(T *addr)
void safeguard_use_mutex_or_tran_started(const tran::descriptor &tdes, const pthread_mutex_t *mtx)
ct_stat_type m_stat_erase
void start_tran_if_not_started(tran::descriptor &tdes)
bool hash_erase_internal(tran::index tran_index, Key &key, int bflags, T *locked_entry)
#define LF_LIST_BF_FIND_OR_INSERT
Definition: lock_free.h:275
int using_mutex
Definition: lock_free.h:81
#define LF_LIST_BF_RETURN_ON_RESTART
Definition: lock_free.h:272
descriptor & get_descriptor(const index &tran_index)
int count(int &result, const cub_regex_object &reg, const std::string &src, const int position, const INTL_CODESET codeset)
#define max(a, b)
#define LF_LIST_BR_SET_FLAG(br, flag)
Definition: lock_free.h:284
void freelist_retire(tran::index tran_index, T *&entry)
LF_ENTRY_HASH_FUNC f_hash
Definition: lock_free.h:102
bool erase_locked(tran::index tran_index, Key &key, T *&locked_entry)
#define LF_LIST_BF_LOCK_ON_DELETE
Definition: lock_free.h:276
void end_tran_force(tran::descriptor &tdes)
void init(tran::system &transys, size_t hash_size, size_t freelist_block_size, size_t freelist_block_count, lf_entry_descriptor &edesc)
T * find(tran::index tran_index, Key &key)
bool list_insert_internal(tran::index tran_index, T *&list_head, Key &key, int *behavior_flags, T *&found_node)
free_node * claim(tran::descriptor &tdes)
void clear(tran::index tran_index)
void start_tran(tran::index tran_index)
static void free_node(T_NODE_INFO *node)
Definition: cas_runner.c:1444
void lock_entry_mutex(T &tolock, pthread_mutex_t *&mtx)
void retire(tran::descriptor &tdes, free_node &node)
int i
Definition: dynamic_load.c:954
ct_stat_type m_stat_unlock
void promote_tran_force(tran::descriptor &tdes)
#define pthread_mutex_lock(a)
Definition: area_alloc.c:50
ct_stat_type m_stat_insert
#define LF_LIST_BR_RESTARTED
Definition: lock_free.h:281
void * get_ptr_deref(T *p, size_t offset)
T * freelist_claim(tran::index tran_index)
void start_tran_force(tran::descriptor &tdes)
void save_temporary(tran::descriptor &tdes, T *&p)
typename freelist_type::free_node free_node_type
const char ** p
Definition: dynamic_load.c:945
void end_tran_if_started(tran::descriptor &tdes)
void list_find(tran::index tran_index, T *list_head, Key &key, int *behavior_flags, T *&found_node)
bool erase(tran::index tran_index, Key &key)
T *& get_bucket(Key &key)
unsigned int of_key
Definition: lock_free.h:75
void * get_ptr(T *p, size_t offset)
pthread_mutex_t * get_pthread_mutexp(T *p)