CUBRID Engine  latest
double_write_buffer.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * double_write_buffer.c - double write buffer
21  */
22 
23 #ident "$Id$"
24 
25 #include <assert.h>
26 #include <math.h>
27 
28 #include "double_write_buffer.h"
29 
30 #include "system_parameter.h"
31 #include "thread_daemon.hpp"
32 #include "thread_entry_task.hpp"
34 #include "thread_manager.hpp"
35 #include "log_append.hpp"
36 #include "log_impl.h"
37 #include "log_volids.hpp"
38 #include "boot_sr.h"
39 #include "perf_monitor.h"
40 #include "porting_inline.hpp"
41 
42 
43 #define DWB_SLOTS_HASH_SIZE 1000
44 #define DWB_SLOTS_FREE_LIST_SIZE 100
45 
46 #define DWB_MIN_SIZE (512 * 1024)
47 #define DWB_MAX_SIZE (32 * 1024 * 1024)
48 #define DWB_MIN_BLOCKS 1
49 #define DWB_MAX_BLOCKS 32
50 
51 /* The total number of blocks. */
52 #define DWB_NUM_TOTAL_BLOCKS (dwb_Global.num_blocks)
53 
54 /* The total number of pages. */
55 #define DWB_NUM_TOTAL_PAGES (dwb_Global.num_pages)
56 
57 /* The number of pages in each block. */
58 #define DWB_BLOCK_NUM_PAGES (dwb_Global.num_block_pages)
59 
60 /* LOG2 from total number of blocks. */
61 #define DWB_LOG2_BLOCK_NUM_PAGES (dwb_Global.log2_num_block_pages)
62 
63 
64 /* Position mask. */
65 #define DWB_POSITION_MASK 0x000000003fffffff
66 
67 /* Block status mask. */
68 #define DWB_BLOCKS_STATUS_MASK 0xffffffff00000000
69 
70 /* Structure modification mask. */
71 #define DWB_MODIFY_STRUCTURE 0x0000000080000000
72 
73 /* Create mask. */
74 #define DWB_CREATE 0x0000000040000000
75 
76 /* Create or modify mask. */
77 #define DWB_CREATE_OR_MODIFY_MASK (DWB_CREATE | DWB_MODIFY_STRUCTURE)
78 
79 /* Flags mask. */
80 #define DWB_FLAG_MASK (DWB_BLOCKS_STATUS_MASK | DWB_MODIFY_STRUCTURE | DWB_CREATE)
81 
82 
83 /* Get DWB position. */
84 #define DWB_GET_POSITION(position_with_flags) \
85  ((position_with_flags) & DWB_POSITION_MASK)
86 
87 /* Reset DWB position. */
88 #define DWB_RESET_POSITION(position_with_flags) \
89  ((position_with_flags) & DWB_FLAG_MASK)
90 
91 /* Get DWB block status. */
92 #define DWB_GET_BLOCK_STATUS(position_with_flags) \
93  ((position_with_flags) & DWB_BLOCKS_STATUS_MASK)
94 
95 /* Get block number from DWB position. */
96 #define DWB_GET_BLOCK_NO_FROM_POSITION(position_with_flags) \
97  ((unsigned int) DWB_GET_POSITION (position_with_flags) >> (DWB_LOG2_BLOCK_NUM_PAGES))
98 
99 /* Checks whether writing in a DWB block was started. */
100 #define DWB_IS_BLOCK_WRITE_STARTED(position_with_flags, block_no) \
101  (assert (block_no < DWB_MAX_BLOCKS), ((position_with_flags) & (1ULL << (63 - (block_no)))) != 0)
102 
103 /* Checks whether any DWB block was started. */
104 #define DWB_IS_ANY_BLOCK_WRITE_STARTED(position_with_flags) \
105  (((position_with_flags) & DWB_BLOCKS_STATUS_MASK) != 0)
106 
107 /* Starts DWB block writing. */
108 #define DWB_STARTS_BLOCK_WRITING(position_with_flags, block_no) \
109  (assert (block_no < DWB_MAX_BLOCKS), (position_with_flags) | (1ULL << (63 - (block_no))))
110 
111 /* Ends DWB block writing. */
112 #define DWB_ENDS_BLOCK_WRITING(position_with_flags, block_no) \
113  (assert (DWB_IS_BLOCK_WRITE_STARTED (position_with_flags, block_no)), \
114  (position_with_flags) & ~(1ULL << (63 - (block_no))))
115 
116 /* Starts modifying DWB structure. */
117 #define DWB_STARTS_MODIFYING_STRUCTURE(position_with_flags) \
118  ((position_with_flags) | DWB_MODIFY_STRUCTURE)
119 
120 /* Ends modifying DWB structure. */
121 #define DWB_ENDS_MODIFYING_STRUCTURE(position_with_flags) \
122  (assert (DWB_IS_MODIFYING_STRUCTURE (position_with_flags)), (position_with_flags) & ~DWB_MODIFY_STRUCTURE)
123 
124 /* Check whether the DWB structure is modifying. */
125 #define DWB_IS_MODIFYING_STRUCTURE(position_with_flags) \
126  (((position_with_flags) & DWB_MODIFY_STRUCTURE) != 0)
127 
128 /* Starts DWB creation. */
129 #define DWB_STARTS_CREATION(position_with_flags) \
130  ((position_with_flags) | DWB_CREATE)
131 
132 /* Starts DWB creation. */
133 #define DWB_ENDS_CREATION(position_with_flags) \
134  (assert (DWB_IS_CREATED (position_with_flags)), (position_with_flags) & ~DWB_CREATE)
135 
136 /* Check whether the DWB was created. */
137 #define DWB_IS_CREATED(position_with_flags) \
138  (((position_with_flags) & DWB_CREATE) != 0)
139 
140 /* Check whether the DWB structure is not created or is modifying. */
141 #define DWB_NOT_CREATED_OR_MODIFYING(position_with_flags) \
142  (((position_with_flags) & DWB_CREATE_OR_MODIFY_MASK) != DWB_CREATE)
143 
144 /* Get next DWB position with flags. */
145 #define DWB_GET_NEXT_POSITION_WITH_FLAGS(position_with_flags) \
146  ((DWB_GET_POSITION (position_with_flags)) == (DWB_NUM_TOTAL_PAGES - 1) \
147  ? ((position_with_flags) & DWB_FLAG_MASK) : ((position_with_flags) + 1))
148 
149 /* Get position in DWB block from DWB position with flags. */
150 #define DWB_GET_POSITION_IN_BLOCK(position_with_flags) \
151  ((DWB_GET_POSITION (position_with_flags)) & (DWB_BLOCK_NUM_PAGES - 1))
152 
153 /* Get DWB previous block number. */
154 #define DWB_GET_PREV_BLOCK_NO(block_no) \
155  ((block_no) > 0 ? ((block_no) - 1) : (DWB_NUM_TOTAL_BLOCKS - 1))
156 
157 /* Get DWB previous block. */
158 #define DWB_GET_PREV_BLOCK(block_no) \
159  (&(dwb_Global.blocks[DWB_GET_PREV_BLOCK_NO(block_no)]))
160 
161 /* Get DWB next block number. */
162 #define DWB_GET_NEXT_BLOCK_NO(block_no) \
163  ((block_no) == (DWB_NUM_TOTAL_BLOCKS - 1) ? 0 : ((block_no) + 1))
164 
165 /* Get block version. */
166 #define DWB_GET_BLOCK_VERSION(block) \
167  (ATOMIC_INC_64 (&block->version, 0ULL))
168 
169 /* Queue entry. */
172 {
173  void *data; /* The data field. */
174  DWB_WAIT_QUEUE_ENTRY *next; /* The next queue entry field. */
175 };
176 
177 /* DWB queue. */
180 {
181  DWB_WAIT_QUEUE_ENTRY *head; /* Queue head. */
182  DWB_WAIT_QUEUE_ENTRY *tail; /* Queue tail. */
183  DWB_WAIT_QUEUE_ENTRY *free_list; /* Queue free list */
184 
185  int count; /* Count queue elements. */
186  int free_count; /* Count free list elements. */
187 };
188 #define DWB_WAIT_QUEUE_INITIALIZER {NULL, NULL, NULL, 0, 0}
189 
190 /* Flush volume status. */
191 typedef enum
192 {
197 
200 {
201  int vdes; /* The volume descriptor. */
202  volatile int num_pages; /* The number of pages to flush in volume. */
203  volatile bool all_pages_written; /* True, if all pages written. */
204  volatile FLUSH_VOLUME_STATUS flushed_status; /* Flush status. */
205 };
206 
207 /* DWB block. */
210 {
211  FLUSH_VOLUME_INFO *flush_volumes_info; /* Information about volumes to flush. */
212  volatile unsigned int count_flush_volumes_info; /* Count volumes to flush. */
213  unsigned int max_to_flush_vdes; /* Maximum volumes to flush. */
214 
215  pthread_mutex_t mutex; /* The mutex to protect the queue. */
216  DWB_WAIT_QUEUE wait_queue; /* The wait queue for the current block. */
217 
218  char *write_buffer; /* The block write buffer, used to write all pages once. */
219  DWB_SLOT *slots; /* The slots containing the data. Used to write individual pages. */
220  volatile unsigned int count_wb_pages; /* Count the pages added to write buffer. */
221 
222  unsigned int block_no; /* The block number. */
223  volatile UINT64 version; /* The block version. */
224  volatile bool all_pages_written; /* True, if all pages are written */
225 };
226 
227 /* Slots hash entry. */
230 {
231  VPID vpid; /* Page VPID. */
232 
233  DWB_SLOTS_HASH_ENTRY *stack; /* Used in freelist. */
234  DWB_SLOTS_HASH_ENTRY *next; /* Used in hash table. */
235  pthread_mutex_t mutex; /* The mutex. */
236  UINT64 del_id; /* Delete transaction ID (for lock free). */
237 
238  DWB_SLOT *slot; /* DWB slot containing a page. */
239 
240  // *INDENT-OFF*
242  {
243  pthread_mutex_init (&mutex, NULL);
244  }
246  {
247  pthread_mutex_destroy (&mutex);
248  }
249  // *INDENT-ON*
250 };
251 
252 /* Hash that store all pages in DWB. */
253 // *INDENT-OFF*
255 // *INDENT-ON*
256 
257 /* The double write buffer structure. */
260 {
262  DWB_BLOCK *blocks; /* The blocks in DWB. */
263  unsigned int num_blocks; /* The total number of blocks in DWB - power of 2. */
264  unsigned int num_pages; /* The total number of pages in DWB - power of 2. */
265  unsigned int num_block_pages; /* The number of pages in a block - power of 2. */
266  unsigned int log2_num_block_pages; /* Logarithm from block number of pages. */
267  volatile unsigned int blocks_flush_counter; /* The blocks flush counter. */
268  volatile unsigned int next_block_to_flush; /* Next block to flush */
269 
270  pthread_mutex_t mutex; /* The mutex to protect the wait queue. */
271  DWB_WAIT_QUEUE wait_queue; /* The wait queue, used when the DWB structure changed. */
272 
273  UINT64 volatile position_with_flags; /* The current position in double write buffer and flags. Flags keep the
274  * state of each block (started, ended), create DWB status, modify DWB status.
275  */
276  dwb_hashmap_type slots_hashmap; /* The slots hash. */
277  int vdes; /* The volume file descriptor. */
278 
279  DWB_BLOCK *volatile file_sync_helper_block; /* The block that will be sync by helper thread. */
280 
281  // *INDENT-OFF*
283  : logging_enabled (false)
284  , blocks (NULL)
285  , num_blocks (0)
286  , num_pages (0)
287  , num_block_pages (0)
288  , log2_num_block_pages (0)
289  , blocks_flush_counter (0)
290  , next_block_to_flush (0)
291  , mutex PTHREAD_MUTEX_INITIALIZER
292  , wait_queue DWB_WAIT_QUEUE_INITIALIZER
293  , position_with_flags (0)
294  , slots_hashmap {}
295  , vdes (NULL_VOLDES)
296  , file_sync_helper_block (NULL)
297  {
298  }
299  // *INDENT-ON*
300 };
301 
302 /* DWB volume name. */
303 char dwb_Volume_name[PATH_MAX];
304 
305 /* DWB. */
307 
308 #define dwb_Log dwb_Global.logging_enabled
309 
310 #define dwb_check_logging() (dwb_Log = prm_get_bool_value (PRM_ID_DWB_LOGGING))
311 #define dwb_log(...) if (dwb_Log) _er_log_debug (ARG_FILE_LINE, "DWB: " __VA_ARGS__)
312 #define dwb_log_error(...) if (dwb_Log) _er_log_debug (ARG_FILE_LINE, "DWB ERROR: " __VA_ARGS__)
313 
314 #if !defined(SERVER_MODE)
315 #define pthread_mutex_init(a, b)
316 #define pthread_mutex_destroy(a)
317 #define pthread_mutex_lock(a) 0
318 #define pthread_mutex_unlock(a)
319 #endif /* !SERVER_MODE */
320 
321 /* DWB wait queue functions */
324  void *data) __attribute__ ((ALWAYS_INLINE));
326  void *data) __attribute__ ((ALWAYS_INLINE));
328  wait_queue, void *data)
331  DWB_WAIT_QUEUE_ENTRY * wait_queue_entry,
332  int (*func) (void *)) __attribute__ ((ALWAYS_INLINE));
333 STATIC_INLINE void dwb_remove_wait_queue_entry (DWB_WAIT_QUEUE * wait_queue, pthread_mutex_t * mutex,
334  void *data, int (*func) (void *)) __attribute__ ((ALWAYS_INLINE));
335 STATIC_INLINE void dwb_signal_waiting_threads (DWB_WAIT_QUEUE * wait_queue, pthread_mutex_t * mutex)
336  __attribute__ ((ALWAYS_INLINE));
337 STATIC_INLINE void dwb_destroy_wait_queue (DWB_WAIT_QUEUE * wait_queue, pthread_mutex_t * mutex);
338 
339 /* DWB functions */
340 STATIC_INLINE void dwb_adjust_write_buffer_values (unsigned int *p_double_write_buffer_size,
341  unsigned int *p_num_blocks) __attribute__ ((ALWAYS_INLINE));
342 STATIC_INLINE int dwb_wait_for_block_completion (THREAD_ENTRY * thread_p, unsigned int block_no)
343  __attribute__ ((ALWAYS_INLINE));
344 STATIC_INLINE int dwb_signal_waiting_thread (void *data) __attribute__ ((ALWAYS_INLINE));
345 STATIC_INLINE int dwb_set_status_resumed (void *data) __attribute__ ((ALWAYS_INLINE));
347  __attribute__ ((ALWAYS_INLINE));
348 STATIC_INLINE int dwb_block_create_ordered_slots (DWB_BLOCK * block, DWB_SLOT ** p_dwb_ordered_slots,
349  unsigned int *p_ordered_slots_length) __attribute__ ((ALWAYS_INLINE));
350 static int dwb_compare_vol_fd (const void *v1, const void *v2);
352  int vol_fd) __attribute__ ((ALWAYS_INLINE));
353 STATIC_INLINE int dwb_write_block (THREAD_ENTRY * thread_p, DWB_BLOCK * block, DWB_SLOT * p_dwb_slots,
354  unsigned int ordered_slots_length, bool file_sync_helper_can_flush,
355  bool remove_from_hash) __attribute__ ((ALWAYS_INLINE));
356 STATIC_INLINE int dwb_flush_block (THREAD_ENTRY * thread_p, DWB_BLOCK * block, bool file_sync_helper_can_flush,
357  UINT64 * current_position_with_flags) __attribute__ ((ALWAYS_INLINE));
358 STATIC_INLINE void dwb_init_slot (DWB_SLOT * slot) __attribute__ ((ALWAYS_INLINE));
359 STATIC_INLINE int dwb_acquire_next_slot (THREAD_ENTRY * thread_p, bool can_wait, DWB_SLOT ** p_dwb_slot)
360  __attribute__ ((ALWAYS_INLINE));
361 STATIC_INLINE void dwb_set_slot_data (THREAD_ENTRY * thread_p, DWB_SLOT * dwb_slot,
362  FILEIO_PAGE * io_page_p) __attribute__ ((ALWAYS_INLINE));
365 STATIC_INLINE int dwb_starts_structure_modification (THREAD_ENTRY * thread_p, UINT64 * current_position_with_flags)
366  __attribute__ ((ALWAYS_INLINE));
367 STATIC_INLINE void dwb_ends_structure_modification (THREAD_ENTRY * thread_p, UINT64 current_position_with_flags)
368  __attribute__ ((ALWAYS_INLINE));
369 STATIC_INLINE void dwb_destroy_internal (THREAD_ENTRY * thread_p, UINT64 * current_position_with_flags)
370  __attribute__ ((ALWAYS_INLINE));
372  unsigned int position_in_block, unsigned int block_no)
373  __attribute__ ((ALWAYS_INLINE));
374 STATIC_INLINE void dwb_initialize_block (DWB_BLOCK * block, unsigned int block_no,
375  unsigned int count_wb_pages, char *write_buffer, DWB_SLOT * slots,
376  FLUSH_VOLUME_INFO * flush_volumes_info, unsigned int count_flush_volumes_info,
377  unsigned int max_to_flush_vdes) __attribute__ ((ALWAYS_INLINE));
378 STATIC_INLINE int dwb_create_blocks (THREAD_ENTRY * thread_p, unsigned int num_blocks, unsigned int num_block_pages,
379  DWB_BLOCK ** p_blocks) __attribute__ ((ALWAYS_INLINE));
380 STATIC_INLINE void dwb_finalize_block (DWB_BLOCK * block) __attribute__ ((ALWAYS_INLINE));
381 STATIC_INLINE int dwb_create_internal (THREAD_ENTRY * thread_p, const char *dwb_volume_name,
382  UINT64 * current_position_with_flags) __attribute__ ((ALWAYS_INLINE));
383 STATIC_INLINE void dwb_get_next_block_for_flush (THREAD_ENTRY * thread_p, unsigned int *block_no)
384  __attribute__ ((ALWAYS_INLINE));
385 
386 /* Slots hash functions. */
387 static void *dwb_slots_hash_entry_alloc (void);
388 static int dwb_slots_hash_entry_free (void *entry);
389 static int dwb_slots_hash_entry_init (void *entry);
390 static int dwb_slots_hash_key_copy (void *src, void *dest);
391 static int dwb_slots_hash_compare_key (void *key1, void *key2);
392 static unsigned int dwb_slots_hash_key (void *key, int hash_table_size);
393 
394 STATIC_INLINE int dwb_slots_hash_insert (THREAD_ENTRY * thread_p, VPID * vpid, DWB_SLOT * slot, bool * inserted)
395  __attribute__ ((ALWAYS_INLINE));
397 
398 // *INDENT-OFF*
399 #if defined (SERVER_MODE)
400 static cubthread::daemon *dwb_flush_block_daemon = NULL;
401 static cubthread::daemon *dwb_file_sync_helper_daemon = NULL;
402 #endif
403 // *INDENT-ON*
404 
405 static bool dwb_is_flush_block_daemon_available (void);
407 
408 static bool dwb_flush_block_daemon_is_running (void);
409 static bool dwb_file_sync_helper_daemon_is_running (void);
410 
411 static int dwb_file_sync_helper (THREAD_ENTRY * thread_p);
412 static int dwb_flush_next_block (THREAD_ENTRY * thread_p);
413 
414 #if !defined (NDEBUG)
415 static int dwb_debug_check_dwb (THREAD_ENTRY * thread_p, DWB_SLOT * p_dwb_ordered_slots, unsigned int num_pages);
416 #endif // DEBUG
417 static int dwb_check_data_page_is_sane (THREAD_ENTRY * thread_p, DWB_BLOCK * rcv_block, DWB_SLOT * p_dwb_ordered_slots,
418  int *p_num_recoverable_pages);
419 
420 /* Slots entry descriptor */
422  /* offsets */
423  offsetof (DWB_SLOTS_HASH_ENTRY, stack),
424  offsetof (DWB_SLOTS_HASH_ENTRY, next),
425  offsetof (DWB_SLOTS_HASH_ENTRY, del_id),
426  offsetof (DWB_SLOTS_HASH_ENTRY, vpid),
427  offsetof (DWB_SLOTS_HASH_ENTRY, mutex),
428 
429  /* using mutex? */
431 
435  NULL,
439  NULL /* no inserts */
440 };
441 
442 /*
443  * dwb_init_wait_queue () - Intialize wait queue.
444  *
445  * return : Nothing.
446  * wait_queue (in/out) : The wait queue.
447  */
448 STATIC_INLINE void
450 {
451  wait_queue->head = NULL;
452  wait_queue->tail = NULL;
453  wait_queue->count = 0;
454 
455  wait_queue->free_list = NULL;
456  wait_queue->free_count = 0;
457 }
458 
459 /*
460  * dwb_make_wait_queue_entry () - Make wait queue entry.
461  *
462  * return : The queue entry.
463  * wait_queue (in/out) : The wait queue.
464  * data (in): The data.
465  */
467 dwb_make_wait_queue_entry (DWB_WAIT_QUEUE * wait_queue, void *data)
468 {
469  DWB_WAIT_QUEUE_ENTRY *wait_queue_entry;
470 
471  assert (wait_queue != NULL && data != NULL);
472 
473  if (wait_queue->free_list != NULL)
474  {
475  wait_queue_entry = wait_queue->free_list;
476  wait_queue->free_list = wait_queue->free_list->next;
477  wait_queue->free_count--;
478  }
479  else
480  {
481  wait_queue_entry = (DWB_WAIT_QUEUE_ENTRY *) malloc (sizeof (DWB_WAIT_QUEUE_ENTRY));
482  if (wait_queue_entry == NULL)
483  {
485  return NULL;
486  }
487  }
488 
489  wait_queue_entry->data = data;
490  wait_queue_entry->next = NULL;
491  return wait_queue_entry;
492 }
493 
494 /*
495  * dwb_block_add_wait_queue_entry () - Add entry to wait queue.
496  *
497  * return : Added queue entry.
498  * wait_queue (in/out) : The wait queue.
499  * data (in): The data.
500  *
501  * Note: Is user responsibility to protect against queue concurrent access.
502  */
505 {
506  DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = NULL;
507 
508  assert (wait_queue != NULL && data != NULL);
509 
510  wait_queue_entry = dwb_make_wait_queue_entry (wait_queue, data);
511  if (wait_queue_entry == NULL)
512  {
513  return NULL;
514  }
515 
516  if (wait_queue->head == NULL)
517  {
518  wait_queue->tail = wait_queue->head = wait_queue_entry;
519  }
520  else
521  {
522  wait_queue->tail->next = wait_queue_entry;
523  wait_queue->tail = wait_queue_entry;
524  }
525  wait_queue->count++;
526 
527  return wait_queue_entry;
528 }
529 
530 /*
531  * dwb_block_disconnect_wait_queue_entry () - Disconnect entry from wait queue.
532  *
533  * return : Disconnected queue entry.
534  * wait_queue (in/out) : The wait queue.
535  * data (in): The data.
536  * Note: If data is NULL, then first entry will be disconnected.
537  */
540 {
541  DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = NULL, *prev_wait_queue_entry = NULL;
542 
543  assert (wait_queue != NULL);
544 
545  if (wait_queue->head == NULL)
546  {
547  return NULL;
548  }
549 
550  prev_wait_queue_entry = NULL;
551  wait_queue_entry = wait_queue->head;
552  if (data != NULL)
553  {
554  /* search for data */
555  while (wait_queue_entry != NULL)
556  {
557  if (wait_queue_entry->data == data)
558  {
559  break;
560  }
561  prev_wait_queue_entry = wait_queue_entry;
562  wait_queue_entry = wait_queue_entry->next;
563  }
564  }
565 
566  if (wait_queue_entry == NULL)
567  {
568  return NULL;
569  }
570 
571  if (prev_wait_queue_entry != NULL)
572  {
573  prev_wait_queue_entry->next = wait_queue_entry->next;
574  }
575  else
576  {
577  assert (wait_queue_entry == wait_queue->head);
578  wait_queue->head = wait_queue_entry->next;
579  }
580 
581  if (wait_queue_entry == wait_queue->tail)
582  {
583  wait_queue->tail = prev_wait_queue_entry;
584  }
585 
586  wait_queue_entry->next = NULL;
587  wait_queue->count--;
588 
589  return wait_queue_entry;
590 }
591 
592 /*
593  * dwb_block_free_wait_queue_entry () - Free a wait queue entry.
594  *
595  * return : Nothing.
596  * wait_queue (in/out) : The wait queue.
597  * wait_queue_entry (in): The wait queue entry.
598  * func(in): The function to apply on entry.
599  */
600 STATIC_INLINE void
602  int (*func) (void *))
603 {
604  if (wait_queue_entry == NULL)
605  {
606  return;
607  }
608 
609  if (func != NULL)
610  {
611  (void) func (wait_queue_entry);
612  }
613 
614  /* Reuse the entry. Do not set data field to NULL. It may be used at debugging. */
615  wait_queue_entry->next = wait_queue->free_list;
616  wait_queue->free_list = wait_queue_entry;
617  wait_queue->free_count++;
618 }
619 
620 /*
621  * dwb_remove_wait_queue_entry () - Remove the wait queue entry.
622  *
623  * return : True, if entry removed, false otherwise.
624  * wait_queue (in/out): The wait queue.
625  * mutex (in): The mutex to protect the wait queue.
626  * data (in): The data.
627  * func(in): The function to apply on each entry.
628  *
629  * Note: If the data is NULL, the first entry will be removed.
630  */
631 STATIC_INLINE void
632 dwb_remove_wait_queue_entry (DWB_WAIT_QUEUE * wait_queue, pthread_mutex_t * mutex, void *data, int (*func) (void *))
633 {
634  DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = NULL;
635 
636  if (mutex != NULL)
637  {
638  (void) pthread_mutex_lock (mutex);
639  }
640 
641  wait_queue_entry = dwb_block_disconnect_wait_queue_entry (wait_queue, data);
642  if (wait_queue_entry != NULL)
643  {
644  dwb_block_free_wait_queue_entry (wait_queue, wait_queue_entry, func);
645  }
646 
647  if (mutex != NULL)
648  {
649  pthread_mutex_unlock (mutex);
650  }
651 }
652 
653 /*
654  * dwb_signal_waiting_threads () - Signal waiting threads.
655  *
656  * return : Nothing.
657  * wait_queue (in/out): The wait queue.
658  * mutex(in): The mutex to protect the queue.
659  */
660 STATIC_INLINE void
661 dwb_signal_waiting_threads (DWB_WAIT_QUEUE * wait_queue, pthread_mutex_t * mutex)
662 {
663  assert (wait_queue != NULL);
664 
665  if (mutex != NULL)
666  {
667  (void) pthread_mutex_lock (mutex);
668  }
669 
670  while (wait_queue->head != NULL)
671  {
672  dwb_remove_wait_queue_entry (wait_queue, NULL, NULL, dwb_signal_waiting_thread);
673  }
674 
675  if (mutex != NULL)
676  {
677  pthread_mutex_unlock (mutex);
678  }
679 }
680 
681 /*
682  * dwb_destroy_wait_queue () - Destroy the wait queue.
683  *
684  * return : Nothing.
685  * wait_queue (in/out): The wait queue.
686  * mutex(in): The mutex to protect the queue.
687  */
688 STATIC_INLINE void
689 dwb_destroy_wait_queue (DWB_WAIT_QUEUE * wait_queue, pthread_mutex_t * mutex)
690 {
691  DWB_WAIT_QUEUE_ENTRY *wait_queue_entry;
692 
693  assert (wait_queue != NULL);
694 
695  if (mutex != NULL)
696  {
697  (void) pthread_mutex_lock (mutex);
698  }
699 
700  dwb_signal_waiting_threads (wait_queue, NULL);
701 
702  if (wait_queue->free_count > 0)
703  {
704  while (wait_queue->free_list != NULL)
705  {
706  wait_queue_entry = wait_queue->free_list;
707  wait_queue->free_list = wait_queue_entry->next;
708  free_and_init (wait_queue_entry);
709  wait_queue->free_count--;
710  }
711  }
712 
713  if (mutex != NULL)
714  {
715  pthread_mutex_unlock (mutex);
716  }
717 }
718 
719 /*
720  * dwb_adjust_write_buffer_values () - Adjust double write buffer values.
721  *
722  * return : Error code.
723  * p_double_write_buffer_size (in/out) : Double write buffer size.
724  * p_num_blocks (in/out): The number of blocks.
725  *
726  * Note: The buffer size must be a multiple of 512 K. The number of blocks must be a power of 2.
727  */
728 STATIC_INLINE void
729 dwb_adjust_write_buffer_values (unsigned int *p_double_write_buffer_size, unsigned int *p_num_blocks)
730 {
731  unsigned int min_size;
732  unsigned int max_size;
733 
734  assert (p_double_write_buffer_size != NULL && p_num_blocks != NULL
735  && *p_double_write_buffer_size > 0 && *p_num_blocks > 0);
736 
737  min_size = DWB_MIN_SIZE;
738  max_size = DWB_MAX_SIZE;
739 
740  if (*p_double_write_buffer_size < min_size)
741  {
742  *p_double_write_buffer_size = min_size;
743  }
744  else if (*p_double_write_buffer_size > min_size)
745  {
746  if (*p_double_write_buffer_size > max_size)
747  {
748  *p_double_write_buffer_size = max_size;
749  }
750  else
751  {
752  /* find smallest number multiple of 512 k */
753  unsigned int limit1 = min_size;
754 
755  while (*p_double_write_buffer_size > limit1)
756  {
757  assert (limit1 <= DWB_MAX_SIZE);
758  if (limit1 == DWB_MAX_SIZE)
759  {
760  break;
761  }
762  limit1 = limit1 << 1;
763  }
764 
765  *p_double_write_buffer_size = limit1;
766  }
767  }
768 
769  min_size = DWB_MIN_BLOCKS;
770  max_size = DWB_MAX_BLOCKS;
771 
772  assert (*p_num_blocks >= min_size);
773 
774  if (*p_num_blocks > min_size)
775  {
776  if (*p_num_blocks > max_size)
777  {
778  *p_num_blocks = max_size;
779  }
780  else if (!IS_POWER_OF_2 (*p_num_blocks))
781  {
782  unsigned int num_blocks = *p_num_blocks;
783 
784  do
785  {
786  num_blocks = num_blocks & (num_blocks - 1);
787  }
788  while (!IS_POWER_OF_2 (num_blocks));
789 
790  *p_num_blocks = num_blocks << 1;
791 
792  assert (*p_num_blocks <= max_size);
793  }
794  }
795 }
796 
797 /*
798  * dwb_starts_structure_modification () - Starts structure modifications.
799  *
800  * return : Error code
801  * thread_p (in): The thread entry.
802  * current_position_with_flags(out): The current position with flags.
803  *
804  * Note: This function must be called before changing structure of DWB.
805  */
806 STATIC_INLINE int
807 dwb_starts_structure_modification (THREAD_ENTRY * thread_p, UINT64 * current_position_with_flags)
808 {
809  UINT64 local_current_position_with_flags, new_position_with_flags, min_version;
810  unsigned int block_no;
811  int error_code = NO_ERROR;
812  unsigned int start_block_no, blocks_count;
813  DWB_BLOCK *file_sync_helper_block;
814 
815  assert (current_position_with_flags != NULL);
816 
817  do
818  {
819  local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
820  if (DWB_IS_MODIFYING_STRUCTURE (local_current_position_with_flags))
821  {
822  /* Only one thread can change the structure */
823  return ER_FAILED;
824  }
825 
826  new_position_with_flags = DWB_STARTS_MODIFYING_STRUCTURE (local_current_position_with_flags);
827  /* Start structure modifications, the threads that want to flush afterwards, have to wait. */
828  }
829  while (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, local_current_position_with_flags, new_position_with_flags));
830 
831 #if defined(SERVER_MODE)
832  while ((ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, 0) > 0)
834  {
835  /* Can't modify structure while flush thread can access DWB. */
836  thread_sleep (20);
837  }
838 #endif
839 
840  /* Since we set the modify structure flag, I'm the only thread that access the DWB. */
841  file_sync_helper_block = dwb_Global.file_sync_helper_block;
842  if (file_sync_helper_block != NULL)
843  {
844  /* All remaining blocks are flushed by me. */
845  (void) ATOMIC_TAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL);
846  dwb_log ("Structure modification, needs to flush DWB block = %d having version %lld\n",
847  file_sync_helper_block->block_no, file_sync_helper_block->version);
848  }
849 
850  local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
851 
852  /* Need to flush incomplete blocks, ordered by version. */
853  start_block_no = DWB_NUM_TOTAL_BLOCKS;
854  min_version = 0xffffffffffffffff;
855  blocks_count = 0;
856  for (block_no = 0; block_no < DWB_NUM_TOTAL_BLOCKS; block_no++)
857  {
858  if (DWB_IS_BLOCK_WRITE_STARTED (local_current_position_with_flags, block_no))
859  {
860  if (dwb_Global.blocks[block_no].version < min_version)
861  {
862  min_version = dwb_Global.blocks[block_no].version;
863  start_block_no = block_no;
864  }
865  blocks_count++;
866  }
867  }
868 
869  block_no = start_block_no;
870  while (blocks_count > 0)
871  {
872  if (DWB_IS_BLOCK_WRITE_STARTED (local_current_position_with_flags, block_no))
873  {
874  /* Flush all pages from current block. I must flush all remaining data. */
875  error_code =
876  dwb_flush_block (thread_p, &dwb_Global.blocks[block_no], false, &local_current_position_with_flags);
877  if (error_code != NO_ERROR)
878  {
879  /* Something wrong happened. */
880  dwb_log_error ("Can't flush block = %d having version %lld\n", block_no,
881  dwb_Global.blocks[block_no].version);
882 
883  return error_code;
884  }
885 
886  dwb_log_error ("DWB flushed %d block having version %lld\n", block_no, dwb_Global.blocks[block_no].version);
887  blocks_count--;
888  }
889 
890  block_no = (block_no + 1) % DWB_NUM_TOTAL_BLOCKS;
891  }
892 
893  local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
894  assert (DWB_GET_BLOCK_STATUS (local_current_position_with_flags) == 0);
895 
896  *current_position_with_flags = local_current_position_with_flags;
897 
898  return NO_ERROR;
899 }
900 
901 /*
902  * dwb_ends_structure_modification () - Ends structure modifications.
903  *
904  * return : Error code.
905  * thread_p (in): The thread entry.
906  * current_position_with_flags(in): The current position with flags.
907  */
908 STATIC_INLINE void
909 dwb_ends_structure_modification (THREAD_ENTRY * thread_p, UINT64 current_position_with_flags)
910 {
911  UINT64 new_position_with_flags;
912 
913  new_position_with_flags = DWB_ENDS_MODIFYING_STRUCTURE (current_position_with_flags);
914 
915  /* Ends structure modifications. */
916  assert (dwb_Global.position_with_flags == current_position_with_flags);
917 
918  ATOMIC_TAS_64 (&dwb_Global.position_with_flags, new_position_with_flags);
919 
920  /* Signal the other threads. */
922 }
923 
924 /*
925  * dwb_initialize_slot () - Initialize a DWB slot.
926  *
927  * return : Error code.
928  * slot (in/out) : The slot to initialize.
929  * io_page (in) : The page.
930  * position_in_block(in): The position in DWB block.
931  * block_no(in): The number of the block where the slot reside.
932  */
933 STATIC_INLINE void
934 dwb_initialize_slot (DWB_SLOT * slot, FILEIO_PAGE * io_page, unsigned int position_in_block, unsigned int block_no)
935 {
936  assert (slot != NULL && io_page != NULL);
937 
938  slot->io_page = io_page;
939  if (io_page != NULL)
940  {
941  VPID_SET (&slot->vpid, io_page->prv.volid, io_page->prv.pageid);
942  LSA_COPY (&slot->lsa, &io_page->prv.lsa);
943  }
944 
945  slot->position_in_block = position_in_block;
946  slot->block_no = block_no;
947 }
948 
949 /*
950  * dwb_initialize_block () - Initialize a block.
951  *
952  * return : Nothing.
953  * block (in/out) : Double write buffer block.
954  * block_no (in) : Database name.
955  * count_wb_pages (in): Count DWB pages.
956  * write_buffer(in): The write buffer.
957  * slots(in): The slots.
958  * flush_volumes_info(in): The area containing volume descriptors to flush.
959  * count_flush_volumes_info(in): Count volumes to flush.
960  * max_to_flush_vdes(in): The maximum volumes to flush.
961  */
962 STATIC_INLINE void
963 dwb_initialize_block (DWB_BLOCK * block, unsigned int block_no, unsigned int count_wb_pages, char *write_buffer,
964  DWB_SLOT * slots, FLUSH_VOLUME_INFO * flush_volumes_info, unsigned int count_flush_volumes_info,
965  unsigned int max_to_flush_vdes)
966 {
967  assert (block != NULL);
968 
969  block->flush_volumes_info = flush_volumes_info;
970  block->count_flush_volumes_info = count_flush_volumes_info; /* Current volume descriptors to flush. */
971  block->max_to_flush_vdes = max_to_flush_vdes; /* Maximum volume descriptors to flush. */
972 
973  pthread_mutex_init (&block->mutex, NULL);
975 
976  block->write_buffer = write_buffer;
977  block->slots = slots;
978  block->count_wb_pages = count_wb_pages;
979  block->block_no = block_no;
980  block->version = 0;
981  block->all_pages_written = false;
982 }
983 
984 /*
985  * dwb_create_blocks () - Create the blocks.
986  *
987  * return : Error code.
988  * thread_p (in) : The thread entry.
989  * num_blocks(in): The number of blocks.
990  * num_block_pages(in): The number of block pages.
991  * p_blocks(out): The created blocks.
992  */
993 STATIC_INLINE int
994 dwb_create_blocks (THREAD_ENTRY * thread_p, unsigned int num_blocks, unsigned int num_block_pages,
995  DWB_BLOCK ** p_blocks)
996 {
997  DWB_BLOCK *blocks = NULL;
998  char *blocks_write_buffer[DWB_MAX_BLOCKS];
999  FLUSH_VOLUME_INFO *flush_volumes_info[DWB_MAX_BLOCKS];
1000  DWB_SLOT *slots[DWB_MAX_BLOCKS];
1001  unsigned int block_buffer_size, i, j;
1002  int error_code;
1003  FILEIO_PAGE *io_page;
1004 
1005  assert (num_blocks <= DWB_MAX_BLOCKS);
1006 
1007  *p_blocks = NULL;
1008 
1009  for (i = 0; i < DWB_MAX_BLOCKS; i++)
1010  {
1011  blocks_write_buffer[i] = NULL;
1012  slots[i] = NULL;
1013  flush_volumes_info[i] = NULL;
1014  }
1015 
1016  blocks = (DWB_BLOCK *) malloc (num_blocks * sizeof (DWB_BLOCK));
1017  if (blocks == NULL)
1018  {
1020  error_code = ER_OUT_OF_VIRTUAL_MEMORY;
1021  goto exit_on_error;
1022  }
1023  memset (blocks, 0, num_blocks * sizeof (DWB_BLOCK));
1024 
1025  block_buffer_size = num_block_pages * IO_PAGESIZE;
1026  for (i = 0; i < num_blocks; i++)
1027  {
1028  blocks_write_buffer[i] = (char *) malloc (block_buffer_size * sizeof (char));
1029  if (blocks_write_buffer[i] == NULL)
1030  {
1031  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, block_buffer_size * sizeof (char));
1032  error_code = ER_OUT_OF_VIRTUAL_MEMORY;
1033  goto exit_on_error;
1034  }
1035  memset (blocks_write_buffer[i], 0, block_buffer_size * sizeof (char));
1036  }
1037 
1038  for (i = 0; i < num_blocks; i++)
1039  {
1040  slots[i] = (DWB_SLOT *) malloc (num_block_pages * sizeof (DWB_SLOT));
1041  if (slots[i] == NULL)
1042  {
1043  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_block_pages * sizeof (DWB_SLOT));
1044  error_code = ER_OUT_OF_VIRTUAL_MEMORY;
1045  goto exit_on_error;
1046  }
1047  memset (slots[i], 0, num_block_pages * sizeof (DWB_SLOT));
1048  }
1049 
1050  for (i = 0; i < num_blocks; i++)
1051  {
1052  flush_volumes_info[i] = (FLUSH_VOLUME_INFO *) malloc (num_block_pages * sizeof (FLUSH_VOLUME_INFO));
1053  if (flush_volumes_info[i] == NULL)
1054  {
1056  num_block_pages * sizeof (FLUSH_VOLUME_INFO));
1057  error_code = ER_OUT_OF_VIRTUAL_MEMORY;
1058  goto exit_on_error;
1059  }
1060  memset (flush_volumes_info[i], 0, num_block_pages * sizeof (FLUSH_VOLUME_INFO));
1061  }
1062 
1063  for (i = 0; i < num_blocks; i++)
1064  {
1065  /* No need to initialize FILEIO_PAGE header here, since is overwritten before flushing */
1066  for (j = 0; j < num_block_pages; j++)
1067  {
1068  io_page = (FILEIO_PAGE *) (blocks_write_buffer[i] + j * IO_PAGESIZE);
1069 
1070  fileio_initialize_res (thread_p, io_page, IO_PAGESIZE);
1071  dwb_initialize_slot (&slots[i][j], io_page, j, i);
1072  }
1073 
1074  dwb_initialize_block (&blocks[i], i, 0, blocks_write_buffer[i], slots[i], flush_volumes_info[i], 0,
1075  num_block_pages);
1076  }
1077 
1078  *p_blocks = blocks;
1079 
1080  return NO_ERROR;
1081 
1082 exit_on_error:
1083  for (i = 0; i < DWB_MAX_BLOCKS; i++)
1084  {
1085  if (slots[i] != NULL)
1086  {
1087  free_and_init (slots[i]);
1088  }
1089 
1090  if (blocks_write_buffer[i] != NULL)
1091  {
1092  free_and_init (blocks_write_buffer[i]);
1093  }
1094 
1095  if (flush_volumes_info[i] != NULL)
1096  {
1097  free_and_init (flush_volumes_info[i]);
1098  }
1099  }
1100 
1101  if (blocks != NULL)
1102  {
1103  free_and_init (blocks);
1104  }
1105 
1106  return error_code;
1107 }
1108 
1109 /*
1110  * dwb_finalize_block () - Finalize a block.
1111  *
1112  * return : Nothing
1113  * block (in) : Double write buffer block.
1114  */
1115 STATIC_INLINE void
1117 {
1118  if (block->slots != NULL)
1119  {
1120  free_and_init (block->slots);
1121  }
1122  /* destroy block write buffer */
1123  if (block->write_buffer != NULL)
1124  {
1125  free_and_init (block->write_buffer);
1126  }
1127  if (block->flush_volumes_info != NULL)
1128  {
1130  }
1131 
1132  dwb_destroy_wait_queue (&block->wait_queue, &block->mutex);
1133 
1134  pthread_mutex_destroy (&block->mutex);
1135 }
1136 
1137 /*
1138  * dwb_create_internal () - Create double write buffer.
1139  *
1140  * return : Error code.
1141  * thread_p (in): The thread entry.
1142  * dwb_volume_name (in) : The double write buffer volume name.
1143  * current_position_with_flags (in/out): Current position with flags.
1144  *
1145  * Note: Is user responsibility to ensure that no other transaction can access DWB structure, during creation.
1146  */
1147 STATIC_INLINE int
1148 dwb_create_internal (THREAD_ENTRY * thread_p, const char *dwb_volume_name, UINT64 * current_position_with_flags)
1149 {
1150  int error_code = NO_ERROR;
1151  unsigned int double_write_buffer_size, num_blocks = 0;
1152  unsigned int i, num_pages, num_block_pages;
1153  int vdes = NULL_VOLDES;
1154  DWB_BLOCK *blocks = NULL;
1155  UINT64 new_position_with_flags;
1156 
1157  const int freelist_block_count = 2;
1158  const int freelist_block_size = DWB_SLOTS_FREE_LIST_SIZE;
1159 
1160  assert (dwb_volume_name != NULL && current_position_with_flags != NULL);
1161 
1162  double_write_buffer_size = prm_get_integer_value (PRM_ID_DWB_SIZE);
1164  if (double_write_buffer_size == 0 || num_blocks == 0)
1165  {
1166  /* Do not use double write buffer. */
1167  return NO_ERROR;
1168  }
1169 
1170  dwb_adjust_write_buffer_values (&double_write_buffer_size, &num_blocks);
1171 
1172  num_pages = double_write_buffer_size / IO_PAGESIZE;
1173  num_block_pages = num_pages / num_blocks;
1174 
1175  assert (IS_POWER_OF_2 (num_blocks));
1176  assert (IS_POWER_OF_2 (num_pages));
1177  assert (IS_POWER_OF_2 (num_block_pages));
1178  assert (num_blocks <= DWB_MAX_BLOCKS);
1179 
1180  /* Create and open DWB volume first */
1181  vdes = fileio_format (thread_p, boot_db_full_name (), dwb_volume_name, LOG_DBDWB_VOLID, num_block_pages, true,
1182  false, false, IO_PAGESIZE, 0, false);
1183  if (vdes == NULL_VOLDES)
1184  {
1185  goto exit_on_error;
1186  }
1187 
1188  /* Needs to flush dirty page before activating DWB. */
1189  fileio_synchronize_all (thread_p, false);
1190 
1191  /* Create DWB blocks */
1192  error_code = dwb_create_blocks (thread_p, num_blocks, num_block_pages, &blocks);
1193  if (error_code != NO_ERROR)
1194  {
1195  goto exit_on_error;
1196  }
1197 
1198  dwb_Global.blocks = blocks;
1199  dwb_Global.num_blocks = num_blocks;
1200  dwb_Global.num_pages = num_pages;
1201  dwb_Global.num_block_pages = num_block_pages;
1202  dwb_Global.log2_num_block_pages = (unsigned int) (log ((float) num_block_pages) / log ((float) 2));
1203  dwb_Global.blocks_flush_counter = 0;
1204  dwb_Global.next_block_to_flush = 0;
1205  pthread_mutex_init (&dwb_Global.mutex, NULL);
1206  dwb_init_wait_queue (&dwb_Global.wait_queue);
1207  dwb_Global.vdes = vdes;
1208  dwb_Global.file_sync_helper_block = NULL;
1209 
1210  dwb_Global.slots_hashmap.init (dwb_slots_Ts, THREAD_TS_DWB_SLOTS, DWB_SLOTS_HASH_SIZE, freelist_block_size,
1211  freelist_block_count, slots_entry_Descriptor);
1212 
1213  /* Set creation flag. */
1214  new_position_with_flags = DWB_RESET_POSITION (*current_position_with_flags);
1215  new_position_with_flags = DWB_STARTS_CREATION (new_position_with_flags);
1216  if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, *current_position_with_flags, new_position_with_flags))
1217  {
1218  /* Impossible. */
1219  assert (false);
1220  }
1221  *current_position_with_flags = new_position_with_flags;
1222 
1223  return NO_ERROR;
1224 
1225 exit_on_error:
1226  if (vdes != NULL_VOLDES)
1227  {
1228  fileio_dismount (thread_p, vdes);
1229  fileio_unformat (NULL, dwb_volume_name);
1230  }
1231 
1232  if (blocks != NULL)
1233  {
1234  for (i = 0; i < num_blocks; i++)
1235  {
1236  dwb_finalize_block (&blocks[i]);
1237  }
1238  free_and_init (blocks);
1239  }
1240 
1241  return error_code;
1242 }
1243 
1244 /*
1245  * dwb_slots_hash_entry_alloc () - Allocate a new entry in slots hash.
1246  *
1247  * returns: New pointer or NULL on error.
1248  */
1249 static void *
1251 {
1252  DWB_SLOTS_HASH_ENTRY *slots_hash_entry;
1253 
1254  slots_hash_entry = (DWB_SLOTS_HASH_ENTRY *) malloc (sizeof (DWB_SLOTS_HASH_ENTRY));
1255  if (slots_hash_entry == NULL)
1256  {
1258  return NULL;
1259  }
1260 
1261  pthread_mutex_init (&slots_hash_entry->mutex, NULL);
1262 
1263  return (void *) slots_hash_entry;
1264 }
1265 
1266 /*
1267  * dwb_slots_hash_entry_free () - Free an entry in slots hash.
1268  * returns: Error code.
1269  * entry(in): The entry to free.
1270  */
1271 static int
1273 {
1274  DWB_SLOTS_HASH_ENTRY *slots_hash_entry = (DWB_SLOTS_HASH_ENTRY *) entry;
1275 
1276  if (slots_hash_entry == NULL)
1277  {
1278  return ER_FAILED;
1279  }
1280 
1281  pthread_mutex_destroy (&slots_hash_entry->mutex);
1282  free (entry);
1283 
1284  return NO_ERROR;
1285 }
1286 
1287 /*
1288  * dwb_slots_hash_entry_init () - Initialize slots hash entry.
1289  * returns: Error code.
1290  * entry(in): The entry to initialize.
1291  */
1292 static int
1294 {
1295  DWB_SLOTS_HASH_ENTRY *p_entry = (DWB_SLOTS_HASH_ENTRY *) entry;
1296 
1297  if (p_entry == NULL)
1298  {
1299  return ER_FAILED;
1300  }
1301 
1302  VPID_SET_NULL (&p_entry->vpid);
1303  p_entry->slot = NULL;
1304 
1305  return NO_ERROR;
1306 }
1307 
1308 /*
1309  * dwb_slots_hash_key_copy () - Copy a slots hash key.
1310  * returns: Error code.
1311  * src(in): The source.
1312  * dest(out): The destination.
1313  */
1314 static int
1315 dwb_slots_hash_key_copy (void *src, void *dest)
1316 {
1317  if (src == NULL || dest == NULL)
1318  {
1319  return ER_FAILED;
1320  }
1321 
1322  VPID_COPY ((VPID *) dest, (VPID *) src);
1323  return NO_ERROR;
1324 }
1325 
1326 /*
1327  * dwb_slots_hash_compare_key () - Compare slots hash keys.
1328  *
1329  * return : 0 if equal, -1 otherwise.
1330  * key1 (in) : Pointer to first VPID value.
1331  * key2 (in) : Pointer to second VPID value.
1332  */
1333 static int
1334 dwb_slots_hash_compare_key (void *key1, void *key2)
1335 {
1336  if (VPID_EQ ((VPID *) key1, (VPID *) key2))
1337  {
1338  return 0;
1339  }
1340 
1341  return -1;
1342 }
1343 
1344 /*
1345  * dwb_slots_hash_key () - Slots hash function.
1346  *
1347  * return : Hash index.
1348  * key (in) : Key value.
1349  * hash_table_size (in) : Hash size.
1350  */
1351 static unsigned int
1352 dwb_slots_hash_key (void *key, int hash_table_size)
1353 {
1354  const VPID *vpid = (VPID *) key;
1355 
1356  return ((vpid->pageid | ((unsigned int) vpid->volid) << 24) % hash_table_size);
1357 }
1358 
1359 /*
1360  * dwb_slots_hash_insert () - Insert entry in slots hash.
1361  *
1362  * return : Error code.
1363  * thread_p (in): The thread entry.
1364  * vpid(in): The page identifier.
1365  * slot(in): The DWB slot.
1366  * inserted (out): 1, if slot inserted in hash.
1367  */
1368 STATIC_INLINE int
1369 dwb_slots_hash_insert (THREAD_ENTRY * thread_p, VPID * vpid, DWB_SLOT * slot, bool * inserted)
1370 {
1371  int error_code = NO_ERROR;
1372  DWB_SLOTS_HASH_ENTRY *slots_hash_entry = NULL;
1373 
1374  assert (vpid != NULL && slot != NULL && inserted != NULL);
1375 
1376  *inserted = dwb_Global.slots_hashmap.find_or_insert (thread_p, *vpid, slots_hash_entry);
1377 
1378  assert (VPID_EQ (&slots_hash_entry->vpid, &slot->vpid));
1379  assert (slots_hash_entry->vpid.pageid == slot->io_page->prv.pageid
1380  && slots_hash_entry->vpid.volid == slot->io_page->prv.volid);
1381 
1382  if (!(*inserted))
1383  {
1384  assert (slots_hash_entry->slot != NULL);
1385 
1386  if (LSA_LT (&slot->lsa, &slots_hash_entry->slot->lsa))
1387  {
1388  dwb_log ("DWB hash find key (%d, %d), the LSA=(%lld,%d), better than (%lld,%d): \n",
1389  vpid->volid, vpid->pageid, slots_hash_entry->slot->lsa.pageid,
1390  slots_hash_entry->slot->lsa.offset, slot->lsa.pageid, slot->lsa.offset);
1391 
1392  /* The older slot is better than mine - leave it in hash. */
1393  pthread_mutex_unlock (&slots_hash_entry->mutex);
1394  return NO_ERROR;
1395  }
1396  else if (LSA_EQ (&slot->lsa, &slots_hash_entry->slot->lsa))
1397  {
1398  /*
1399  * If LSA's are equals, still replace slot in hash. We are in "flushing to disk without logging" case.
1400  * The page was modified but not logged. We have to flush this version since is the latest one.
1401  */
1402  if (slots_hash_entry->slot->block_no == slot->block_no)
1403  {
1404  /* Invalidate the old slot, if is in the same block. We want to avoid duplicates in block at flush. */
1405  assert (slots_hash_entry->slot->position_in_block < slot->position_in_block);
1406  VPID_SET_NULL (&slots_hash_entry->slot->vpid);
1407  fileio_initialize_res (thread_p, slots_hash_entry->slot->io_page, IO_PAGESIZE);
1408 
1409  dwb_log ("Found same page with same LSA in same block - %d - at positions (%d, %d) \n",
1410  slots_hash_entry->slot->position_in_block, slot->position_in_block);
1411  }
1412  else
1413  {
1414 #if !defined (NDEBUG)
1415  int old_block_no = ATOMIC_INC_32 (&slots_hash_entry->slot->block_no, 0);
1416  if (old_block_no > 0)
1417  {
1418  /* Be sure that the block containing old page version is flushed first. */
1419  DWB_BLOCK *old_block = &dwb_Global.blocks[old_block_no];
1420  DWB_BLOCK *new_block = &dwb_Global.blocks[slot->block_no];
1421 
1422  /* Maybe we will check that the slot is still in old block. */
1423  assert ((old_block->version < new_block->version)
1424  || (old_block->version == new_block->version && old_block->block_no < new_block->block_no));
1425 
1426  dwb_log ("Found same page with same LSA in 2 different blocks old = (%d, %d), new = (%d,%d) \n",
1427  old_block_no, slots_hash_entry->slot->position_in_block, new_block->block_no,
1428  slot->position_in_block);
1429  }
1430 #endif
1431  }
1432  }
1433 
1434  dwb_log ("Replace hash key (%d, %d), the new LSA=(%lld,%d), the old LSA = (%lld,%d)",
1435  vpid->volid, vpid->pageid, slot->lsa.pageid, slot->lsa.offset,
1436  slots_hash_entry->slot->lsa.pageid, slots_hash_entry->slot->lsa.offset);
1437  }
1438  else
1439  {
1440  dwb_log ("Inserted hash key (%d, %d), LSA=(%lld,%d)", vpid->volid, vpid->pageid, slot->lsa.pageid,
1441  slot->lsa.offset);
1442  }
1443 
1444  slots_hash_entry->slot = slot;
1445  pthread_mutex_unlock (&slots_hash_entry->mutex);
1446  *inserted = true;
1447 
1448  return NO_ERROR;
1449 }
1450 
1451 /*
1452  * dwb_destroy_internal () - Destroy DWB.
1453  *
1454  * return : Error code.
1455  * thread_p (in): The thread entry.
1456  *
1457  * Note: Is user responsibility to ensure that no other transaction can access DWB structure, during destroy.
1458  */
1459 STATIC_INLINE void
1460 dwb_destroy_internal (THREAD_ENTRY * thread_p, UINT64 * current_position_with_flags)
1461 {
1462  UINT64 new_position_with_flags;
1463  unsigned int block_no;
1464 
1465  assert (current_position_with_flags != NULL);
1466 
1467  dwb_destroy_wait_queue (&dwb_Global.wait_queue, &dwb_Global.mutex);
1468  pthread_mutex_destroy (&dwb_Global.mutex);
1469 
1470  if (dwb_Global.blocks != NULL)
1471  {
1472  for (block_no = 0; block_no < DWB_NUM_TOTAL_BLOCKS; block_no++)
1473  {
1474  dwb_finalize_block (&dwb_Global.blocks[block_no]);
1475  }
1476  free_and_init (dwb_Global.blocks);
1477  }
1478 
1479  dwb_Global.slots_hashmap.destroy ();
1480 
1481  if (dwb_Global.vdes != NULL_VOLDES)
1482  {
1483  fileio_dismount (thread_p, dwb_Global.vdes);
1484  dwb_Global.vdes = NULL_VOLDES;
1485  fileio_unformat (thread_p, dwb_Volume_name);
1486  }
1487 
1488  /* Set creation flag. */
1489  new_position_with_flags = DWB_RESET_POSITION (*current_position_with_flags);
1490  new_position_with_flags = DWB_ENDS_CREATION (new_position_with_flags);
1491  if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, *current_position_with_flags, new_position_with_flags))
1492  {
1493  /* Impossible. */
1494  assert (false);
1495  }
1496 
1497  *current_position_with_flags = new_position_with_flags;
1498 }
1499 
1500 /*
1501  * dwb_set_status_resumed () - Set status resumed.
1502  *
1503  * return : Error code.
1504  * data (in): The thread entry.
1505  */
1506 STATIC_INLINE int
1508 {
1509 #if defined (SERVER_MODE)
1510  DWB_WAIT_QUEUE_ENTRY *queue_entry = (DWB_WAIT_QUEUE_ENTRY *) data;
1511  THREAD_ENTRY *woken_thread_p;
1512 
1513  if (queue_entry != NULL)
1514  {
1515  woken_thread_p = (THREAD_ENTRY *) queue_entry->data;
1516  if (woken_thread_p != NULL)
1517  {
1518  thread_lock_entry (woken_thread_p);
1519  woken_thread_p->resume_status = THREAD_DWB_QUEUE_RESUMED;
1520  thread_unlock_entry (woken_thread_p);
1521  }
1522  }
1523 
1524  return NO_ERROR;
1525 #else /* !SERVER_MODE */
1526  return NO_ERROR;
1527 #endif /* !SERVER_MODE */
1528 }
1529 
1530 /*
1531  * dwb_wait_for_block_completion () - Wait for DWB block to complete.
1532  *
1533  * return : Error code.
1534  * thread_p (in): The thread entry.
1535  * dwb_block (in): The DWB block number.
1536  */
1537 STATIC_INLINE int
1538 dwb_wait_for_block_completion (THREAD_ENTRY * thread_p, unsigned int block_no)
1539 {
1540 #if defined (SERVER_MODE)
1541  int error_code = NO_ERROR;
1542  DWB_WAIT_QUEUE_ENTRY *double_write_queue_entry = NULL;
1543  DWB_BLOCK *dwb_block = NULL;
1544  PERF_UTIME_TRACKER time_track;
1545  UINT64 current_position_with_flags;
1546  int r;
1547  struct timeval timeval_crt, timeval_timeout;
1548  struct timespec to;
1549  bool save_check_interrupt;
1550 
1551  assert (thread_p != NULL && block_no < DWB_NUM_TOTAL_BLOCKS);
1552 
1553  PERF_UTIME_TRACKER_START (thread_p, &time_track);
1554 
1555  dwb_block = &dwb_Global.blocks[block_no];
1556  (void) pthread_mutex_lock (&dwb_block->mutex);
1557 
1558  thread_lock_entry (thread_p);
1559 
1560  /* Check again after acquiring mutexes. */
1561  current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
1562  if (!DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, block_no))
1563  {
1564  thread_unlock_entry (thread_p);
1565  pthread_mutex_unlock (&dwb_block->mutex);
1566 
1567  PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_WAIT_FLUSH_BLOCK_TIME_COUNTERS);
1568 
1569  return NO_ERROR;
1570  }
1571 
1572  double_write_queue_entry = dwb_block_add_wait_queue_entry (&dwb_block->wait_queue, thread_p);
1573  if (double_write_queue_entry == NULL)
1574  {
1575  /* allocation error */
1576  thread_unlock_entry (thread_p);
1577  pthread_mutex_unlock (&dwb_block->mutex);
1578 
1579  ASSERT_ERROR_AND_SET (error_code);
1580  return error_code;
1581  }
1582 
1583  pthread_mutex_unlock (&dwb_block->mutex);
1584 
1585  save_check_interrupt = logtb_set_check_interrupt (thread_p, false);
1586  /* Waits for maximum 20 milliseconds. */
1587  gettimeofday (&timeval_crt, NULL);
1588  timeval_add_msec (&timeval_timeout, &timeval_crt, 20);
1589  timeval_to_timespec (&to, &timeval_timeout);
1590 
1592 
1593  (void) logtb_set_check_interrupt (thread_p, save_check_interrupt);
1594 
1595  PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_WAIT_FLUSH_BLOCK_TIME_COUNTERS);
1596 
1598  {
1599  /* timeout, remove the entry from queue */
1600  dwb_remove_wait_queue_entry (&dwb_block->wait_queue, &dwb_block->mutex, thread_p, dwb_set_status_resumed);
1601  return r;
1602  }
1603  else if (thread_p->resume_status != THREAD_DWB_QUEUE_RESUMED)
1604  {
1605  /* interruption, remove the entry from queue */
1606  assert (thread_p->resume_status == THREAD_RESUME_DUE_TO_SHUTDOWN);
1607 
1608  dwb_remove_wait_queue_entry (&dwb_block->wait_queue, &dwb_block->mutex, thread_p, NULL);
1610  return ER_INTERRUPTED;
1611  }
1612  else
1613  {
1614  assert (thread_p->resume_status == THREAD_DWB_QUEUE_RESUMED);
1615  return NO_ERROR;
1616  }
1617 #else /* SERVER_MODE */
1618  return NO_ERROR;
1619 #endif /* SERVER_MODE */
1620 }
1621 
1622 /*
1623  * dwb_signal_waiting_thread () - Signal waiting thread.
1624  *
1625  * return : Error code.
1626  * data (in): Queue entry containing the waiting thread.
1627  */
1628 STATIC_INLINE int
1630 {
1631 #if defined (SERVER_MODE)
1632  THREAD_ENTRY *wait_thread_p;
1633  DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = (DWB_WAIT_QUEUE_ENTRY *) data;
1634 
1635  assert (wait_queue_entry != NULL);
1636 
1637  wait_thread_p = (THREAD_ENTRY *) wait_queue_entry->data;
1638  if (wait_thread_p)
1639  {
1640  thread_lock_entry (wait_thread_p);
1641  if (wait_thread_p->resume_status == THREAD_DWB_QUEUE_SUSPENDED)
1642  {
1644  }
1645  thread_unlock_entry (wait_thread_p);
1646  }
1647 
1648  return NO_ERROR;
1649 #else /* !SERVER_MODE */
1650  return NO_ERROR;
1651 #endif /* !SERVER_MODE */
1652 }
1653 
1654 /*
1655  * dwb_signal_block_completion () - Signal double write buffer block completion.
1656  *
1657  * return : Nothing.
1658  * thread_p (in): The thread entry.
1659  * dwb_block (in): The double write buffer block.
1660  */
1661 STATIC_INLINE void
1663 {
1664  assert (dwb_block != NULL);
1665 
1666  /* There are blocked threads. Destroy the wait queue and release the blocked threads. */
1667  dwb_signal_waiting_threads (&dwb_block->wait_queue, &dwb_block->mutex);
1668 }
1669 
1670 /*
1671  * dwb_signal_structure_modificated () - Signal DWB structure changed.
1672  *
1673  * return : Nothing.
1674  * thread_p (in): The thread entry.
1675  */
1676 STATIC_INLINE void
1678 {
1679  /* There are blocked threads. Destroy the wait queue and release the blocked threads. */
1680  dwb_signal_waiting_threads (&dwb_Global.wait_queue, &dwb_Global.mutex);
1681 }
1682 
1683 /*
1684  * dwb_wait_for_strucure_modification () - Wait for double write buffer structure modification.
1685  *
1686  * return : Error code.
1687  * thread_p (in): The thread entry.
1688  */
1689 STATIC_INLINE int
1691 {
1692 #if defined (SERVER_MODE)
1693  int error_code = NO_ERROR;
1694  DWB_WAIT_QUEUE_ENTRY *double_write_queue_entry = NULL;
1695  UINT64 current_position_with_flags;
1696  int r;
1697  struct timeval timeval_crt, timeval_timeout;
1698  struct timespec to;
1699  bool save_check_interrupt;
1700 
1701  (void) pthread_mutex_lock (&dwb_Global.mutex);
1702 
1703  /* Check the actual flags, to avoids unnecessary waits. */
1704  current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
1705  if (!DWB_IS_MODIFYING_STRUCTURE (current_position_with_flags))
1706  {
1707  pthread_mutex_unlock (&dwb_Global.mutex);
1708  return NO_ERROR;
1709  }
1710 
1711  thread_lock_entry (thread_p);
1712 
1713  double_write_queue_entry = dwb_block_add_wait_queue_entry (&dwb_Global.wait_queue, thread_p);
1714  if (double_write_queue_entry == NULL)
1715  {
1716  /* allocation error */
1717  thread_unlock_entry (thread_p);
1718  pthread_mutex_unlock (&dwb_Global.mutex);
1719 
1720  ASSERT_ERROR_AND_SET (error_code);
1721  return error_code;
1722  }
1723 
1724  pthread_mutex_unlock (&dwb_Global.mutex);
1725 
1726  save_check_interrupt = logtb_set_check_interrupt (thread_p, false);
1727  /* Waits for maximum 10 milliseconds. */
1728  gettimeofday (&timeval_crt, NULL);
1729  timeval_add_msec (&timeval_timeout, &timeval_crt, 10);
1730  timeval_to_timespec (&to, &timeval_timeout);
1731 
1733 
1734  (void) logtb_set_check_interrupt (thread_p, save_check_interrupt);
1736  {
1737  /* timeout, remove the entry from queue */
1738  dwb_remove_wait_queue_entry (&dwb_Global.wait_queue, &dwb_Global.mutex, thread_p, NULL);
1739  return r;
1740  }
1741  else if (thread_p->resume_status != THREAD_DWB_QUEUE_RESUMED)
1742  {
1743  /* interruption, remove the entry from queue */
1744  assert (thread_p->resume_status == THREAD_RESUME_DUE_TO_SHUTDOWN);
1745 
1746  dwb_remove_wait_queue_entry (&dwb_Global.wait_queue, &dwb_Global.mutex, thread_p, NULL);
1748  return ER_INTERRUPTED;
1749  }
1750  else
1751  {
1752  assert (thread_p->resume_status == THREAD_DWB_QUEUE_RESUMED);
1753  return NO_ERROR;
1754  }
1755 #else /* !SERVER_MODE */
1756  return NO_ERROR;
1757 #endif /* !SERVER_MODE */
1758 }
1759 
1760 /*
1761  * dwb_compare_slots () - Compare DWB slots.
1762  * return: arg1 - arg2.
1763  * arg1(in): Slot 1.
1764  * arg2(in): Slot 2.
1765  */
1766 static int
1767 dwb_compare_slots (const void *arg1, const void *arg2)
1768 {
1769  int diff;
1770  INT64 diff2;
1771  DWB_SLOT *slot1, *slot2;
1772 
1773  assert (arg1 != NULL);
1774  assert (arg2 != NULL);
1775 
1776  slot1 = (DWB_SLOT *) arg1;
1777  slot2 = (DWB_SLOT *) arg2;
1778 
1779  diff = slot1->vpid.volid - slot2->vpid.volid;
1780  if (diff > 0)
1781  {
1782  return 1;
1783  }
1784  else if (diff < 0)
1785  {
1786  return -1;
1787  }
1788 
1789  diff = slot1->vpid.pageid - slot2->vpid.pageid;
1790  if (diff > 0)
1791  {
1792  return 1;
1793  }
1794  else if (diff < 0)
1795  {
1796  return -1;
1797  }
1798 
1799  diff2 = slot1->lsa.pageid - slot2->lsa.pageid;
1800  if (diff2 > 0)
1801  {
1802  return 1;
1803  }
1804  else if (diff2 < 0)
1805  {
1806  return -1;
1807  }
1808 
1809  diff2 = slot1->lsa.offset - slot2->lsa.offset;
1810  if (diff2 > 0)
1811  {
1812  return 1;
1813  }
1814  else if (diff2 < 0)
1815  {
1816  return -1;
1817  }
1818 
1819  return 0;
1820 }
1821 
1822 /*
1823  * dwb_block_create_ordered_slots () - Create ordered slots from block slots.
1824  *
1825  * return : Error code.
1826  * block(in): The block.
1827  * p_dwb_ordered_slots(out): The ordered slots.
1828  * p_ordered_slots_length(out): The ordered slots array length.
1829  */
1830 STATIC_INLINE int
1831 dwb_block_create_ordered_slots (DWB_BLOCK * block, DWB_SLOT ** p_dwb_ordered_slots,
1832  unsigned int *p_ordered_slots_length)
1833 {
1834  DWB_SLOT *p_local_dwb_ordered_slots = NULL;
1835 
1836  assert (block != NULL && p_dwb_ordered_slots != NULL);
1837 
1838  /* including sentinel */
1839  p_local_dwb_ordered_slots = (DWB_SLOT *) malloc ((block->count_wb_pages + 1) * sizeof (DWB_SLOT));
1840  if (p_local_dwb_ordered_slots == NULL)
1841  {
1843  (block->count_wb_pages + 1) * sizeof (DWB_SLOT));
1844  return ER_OUT_OF_VIRTUAL_MEMORY;
1845  }
1846 
1847  memcpy (p_local_dwb_ordered_slots, block->slots, block->count_wb_pages * sizeof (DWB_SLOT));
1848 
1849  /* init sentinel slot */
1850  dwb_init_slot (&p_local_dwb_ordered_slots[block->count_wb_pages]);
1851 
1852  /* Order pages by (VPID, LSA) */
1853  qsort ((void *) p_local_dwb_ordered_slots, block->count_wb_pages, sizeof (DWB_SLOT), dwb_compare_slots);
1854 
1855  *p_dwb_ordered_slots = p_local_dwb_ordered_slots;
1856  *p_ordered_slots_length = block->count_wb_pages + 1;
1857 
1858  return NO_ERROR;
1859 }
1860 
1861 /*
1862  * dwb_slots_hash_delete () - Delete entry in slots hash.
1863  *
1864  * return : Error code.
1865  * thread_p (in): The thread entry.
1866  * slot(in): The DWB slot.
1867  */
1868 STATIC_INLINE int
1870 {
1871  int error_code = NO_ERROR;
1872  VPID *vpid;
1873  DWB_SLOTS_HASH_ENTRY *slots_hash_entry = NULL;
1874 
1875  vpid = &slot->vpid;
1876  if (VPID_ISNULL (vpid))
1877  {
1878  /* Nothing to do, the slot is not in hash. */
1879  return NO_ERROR;
1880  }
1881 
1882  /* Remove the old vpid from hash. */
1883  slots_hash_entry = dwb_Global.slots_hashmap.find (thread_p, *vpid);
1884  if (slots_hash_entry == NULL)
1885  {
1886  /* Already removed from hash by others, nothing to do. */
1887  return NO_ERROR;
1888  }
1889 
1890  assert (VPID_ISNULL (&(slots_hash_entry->slot->vpid)) || VPID_EQ (&(slots_hash_entry->slot->vpid), vpid));
1891 
1892  /* Check the slot. */
1893  if (slots_hash_entry->slot == slot)
1894  {
1895  assert (VPID_EQ (&(slots_hash_entry->slot->vpid), vpid));
1896  assert (slots_hash_entry->slot->io_page->prv.pageid == vpid->pageid
1897  && slots_hash_entry->slot->io_page->prv.volid == vpid->volid);
1898 
1899  if (!dwb_Global.slots_hashmap.erase_locked (thread_p, *vpid, slots_hash_entry))
1900  {
1901  assert_release (false);
1902  /* Should not happen. */
1903  pthread_mutex_unlock (&slots_hash_entry->mutex);
1904  dwb_log_error ("DWB hash delete key (%d, %d) with %d error: \n", vpid->volid, vpid->pageid, error_code);
1905  return error_code;
1906  }
1907  }
1908  else
1909  {
1910  pthread_mutex_unlock (&slots_hash_entry->mutex);
1911  }
1912 
1913  return NO_ERROR;
1914 }
1915 
1916 /*
1917  * dwb_compare_vol_fd () - Compare two volume file descriptors.
1918  *
1919  * return : v1 - v2.
1920  * v1(in) : Pointer to volume1.
1921  * v2(in) : Pointer to volume2.
1922  */
1923 static int
1924 dwb_compare_vol_fd (const void *v1, const void *v2)
1925 {
1926  int *vol_fd1 = (int *) v1;
1927  int *vol_fd2 = (int *) v2;
1928 
1929  assert (vol_fd1 != NULL && vol_fd2 != NULL);
1930 
1931  return ((*vol_fd1) - (*vol_fd2));
1932 }
1933 
1934 /*
1935  * dwb_add_volume_to_block_flush_area () - Add a volume to block flush area.
1936  *
1937  * return :
1938  * thread_p (in): The thread entry.
1939  * block(in): The block where the flush area reside.
1940  * vol_fd(in): The volume to add.
1941  *
1942  * Note: The volume is added if is not already in block flush area.
1943  */
1946 {
1947  FLUSH_VOLUME_INFO *flush_new_volume_info;
1948 #if !defined (NDEBUG)
1949  unsigned int old_count_flush_volumes_info;
1950 #endif
1951 
1952  assert (block != NULL);
1953  assert (vol_fd != NULL_VOLDES);
1954 
1955 #if !defined (NDEBUG)
1956  old_count_flush_volumes_info = block->count_flush_volumes_info;
1957 #endif
1958 
1959  flush_new_volume_info = &block->flush_volumes_info[block->count_flush_volumes_info];
1960 
1961  flush_new_volume_info->vdes = vol_fd;
1962  flush_new_volume_info->num_pages = 0;
1963  flush_new_volume_info->all_pages_written = false;
1964  flush_new_volume_info->flushed_status = VOLUME_NOT_FLUSHED;
1965 
1966  /* There is only a writer and several (currently 2) readers on flush_new_volume_info array.
1967  * I need to be sure that size is incremented after setting element. Uses atomic to prevent code reordering.
1968  */
1969  ATOMIC_INC_32 (&block->count_flush_volumes_info, 1);
1970 
1971  assert (((old_count_flush_volumes_info + 1) == block->count_flush_volumes_info)
1972  && (block->count_flush_volumes_info <= block->max_to_flush_vdes));
1973 
1974  return flush_new_volume_info;
1975 }
1976 
1977 /*
1978  * dwb_write_block () - Write block pages in specified order.
1979  *
1980  * return : Error code.
1981  * thread_p (in): The thread entry.
1982  * block(in): The block that is written.
1983  * p_dwb_ordered_slots(in): The slots that gives the pages flush order.
1984  * ordered_slots_length(in): The ordered slots array length.
1985  * remove_from_hash(in): True, if needs to remove entries from hash.
1986  * file_sync_helper_can_flush(in): True, if helper can flush.
1987  *
1988  * Note: This function fills to_flush_vdes array with the volumes that must be flushed.
1989  */
1990 STATIC_INLINE int
1991 dwb_write_block (THREAD_ENTRY * thread_p, DWB_BLOCK * block, DWB_SLOT * p_dwb_ordered_slots,
1992  unsigned int ordered_slots_length, bool file_sync_helper_can_flush, bool remove_from_hash)
1993 {
1994  VOLID last_written_volid;
1995  unsigned int i;
1996  int last_written_vol_fd, vol_fd;
1997  VPID *vpid;
1998  int error_code = NO_ERROR;
1999  int count_writes = 0, num_pages_to_sync;
2000  FLUSH_VOLUME_INFO *current_flush_volume_info = NULL;
2001  bool can_flush_volume = false;
2002 
2003  assert (block != NULL && p_dwb_ordered_slots != NULL);
2004 
2005  /*
2006  * Write the whole slots data first and then remove it from hash. Is better to do in this way. Thus, the fileio_write
2007  * may be slow. While the current transaction has delays caused by fileio_write, the concurrent transaction still
2008  * can access the data from memory instead disk
2009  */
2010 
2011  assert (block->count_wb_pages < ordered_slots_length);
2012  assert (block->count_flush_volumes_info == 0);
2013 
2014  num_pages_to_sync = prm_get_integer_value (PRM_ID_PB_SYNC_ON_NFLUSH);
2015 
2016  last_written_volid = NULL_VOLID;
2017  last_written_vol_fd = NULL_VOLDES;
2018 
2019  for (i = 0; i < block->count_wb_pages; i++)
2020  {
2021  vpid = &p_dwb_ordered_slots[i].vpid;
2022  if (VPID_ISNULL (vpid))
2023  {
2024  continue;
2025  }
2026 
2027  assert (VPID_ISNULL (&p_dwb_ordered_slots[i + 1].vpid) || VPID_LT (vpid, &p_dwb_ordered_slots[i + 1].vpid));
2028 
2029  if (last_written_volid != vpid->volid)
2030  {
2031  /* Get the volume descriptor. */
2032  if (current_flush_volume_info != NULL)
2033  {
2034  assert_release (current_flush_volume_info->vdes == last_written_vol_fd);
2035  current_flush_volume_info->all_pages_written = true;
2036  can_flush_volume = true;
2037 
2038  current_flush_volume_info = NULL; /* reset */
2039  }
2040 
2041  vol_fd = fileio_get_volume_descriptor (vpid->volid);
2042  if (vol_fd == NULL_VOLDES)
2043  {
2044  /* probably it was removed meanwhile. skip it! */
2045  continue;
2046  }
2047 
2048  last_written_volid = vpid->volid;
2049  last_written_vol_fd = vol_fd;
2050 
2051  current_flush_volume_info = dwb_add_volume_to_block_flush_area (thread_p, block, last_written_vol_fd);
2052  }
2053 
2054  assert (last_written_vol_fd != NULL_VOLDES);
2055 
2056  assert (p_dwb_ordered_slots[i].io_page->prv.p_reserve_2 == 0);
2057  assert (p_dwb_ordered_slots[i].vpid.pageid == p_dwb_ordered_slots[i].io_page->prv.pageid
2058  && p_dwb_ordered_slots[i].vpid.volid == p_dwb_ordered_slots[i].io_page->prv.volid);
2059 
2060  /* Write the data. */
2061  if (fileio_write (thread_p, last_written_vol_fd, p_dwb_ordered_slots[i].io_page, vpid->pageid, IO_PAGESIZE,
2063  {
2064  ASSERT_ERROR ();
2065  dwb_log_error ("DWB write page VPID=(%d, %d) LSA=(%lld,%d) with %d error: \n",
2066  vpid->volid, vpid->pageid, p_dwb_ordered_slots[i].io_page->prv.lsa.pageid,
2067  (int) p_dwb_ordered_slots[i].io_page->prv.lsa.offset, er_errid ());
2068  assert (false);
2069  /* Something wrong happened. */
2070  return ER_FAILED;
2071  }
2072 
2073  dwb_log ("dwb_write_block: written page = (%d,%d) LSA=(%lld,%d)\n",
2074  vpid->volid, vpid->pageid, p_dwb_ordered_slots[i].io_page->prv.lsa.pageid,
2075  (int) p_dwb_ordered_slots[i].io_page->prv.lsa.offset);
2076 
2077 #if defined (SERVER_MODE)
2078  assert (current_flush_volume_info != NULL);
2079 
2080  ATOMIC_INC_32 (&current_flush_volume_info->num_pages, 1);
2081  count_writes++;
2082 
2083  if (file_sync_helper_can_flush && (count_writes >= num_pages_to_sync || can_flush_volume == true)
2085  {
2086  if (ATOMIC_CAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL, block))
2087  {
2088  dwb_file_sync_helper_daemon->wakeup ();
2089  }
2090 
2091  /* Add statistics. */
2092  perfmon_add_stat (thread_p, PSTAT_PB_NUM_IOWRITES, count_writes);
2093  count_writes = 0;
2094  can_flush_volume = false;
2095  }
2096 #endif
2097  }
2098 
2099  /* the last written volume */
2100  if (current_flush_volume_info != NULL)
2101  {
2102  current_flush_volume_info->all_pages_written = true;
2103  }
2104 
2105 #if !defined (NDEBUG)
2106  for (i = 0; i < block->count_flush_volumes_info; i++)
2107  {
2108  assert (block->flush_volumes_info[i].all_pages_written == true);
2109  assert (block->flush_volumes_info[i].vdes != NULL_VOLDES);
2110  }
2111 #endif
2112 
2113 #if defined (SERVER_MODE)
2114  if (file_sync_helper_can_flush && (dwb_Global.file_sync_helper_block == NULL)
2115  && (block->count_flush_volumes_info > 0))
2116  {
2117  /* If file_sync_helper_block is NULL, it means that the file sync helper thread does not run and was not woken yet. */
2119  && ATOMIC_CAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL, block))
2120  {
2121  dwb_file_sync_helper_daemon->wakeup ();
2122  }
2123  }
2124 #endif
2125 
2126  /* Add statistics. */
2127  perfmon_add_stat (thread_p, PSTAT_PB_NUM_IOWRITES, count_writes);
2128 
2129  /* Remove the corresponding entries from hash. */
2130  if (remove_from_hash)
2131  {
2132  PERF_UTIME_TRACKER time_track;
2133 
2134  PERF_UTIME_TRACKER_START (thread_p, &time_track);
2135 
2136  for (i = 0; i < block->count_wb_pages; i++)
2137  {
2138  vpid = &p_dwb_ordered_slots[i].vpid;
2139  if (VPID_ISNULL (vpid))
2140  {
2141  continue;
2142  }
2143 
2144  assert (p_dwb_ordered_slots[i].position_in_block < DWB_BLOCK_NUM_PAGES);
2145  error_code = dwb_slots_hash_delete (thread_p, &block->slots[p_dwb_ordered_slots[i].position_in_block]);
2146  if (error_code != NO_ERROR)
2147  {
2148  return error_code;
2149  }
2150  }
2151 
2152  PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_DECACHE_PAGES_AFTER_WRITE);
2153  }
2154 
2155  return NO_ERROR;
2156 }
2157 
2158 /*
2159  * dwb_flush_block () - Flush pages from specified block.
2160  *
2161  * return : Error code.
2162  * thread_p (in): Thread entry.
2163  * block(in): The block that needs flush.
2164  * file_sync_helper_can_flush(in): True, if file sync helper thread can flush.
2165  * current_position_with_flags(out): Current position with flags.
2166  *
2167  * Note: The block pages can't be modified by others during flush.
2168  */
2169 STATIC_INLINE int
2170 dwb_flush_block (THREAD_ENTRY * thread_p, DWB_BLOCK * block, bool file_sync_helper_can_flush,
2171  UINT64 * current_position_with_flags)
2172 {
2173  UINT64 local_current_position_with_flags, new_position_with_flags;
2174  int error_code = NO_ERROR;
2175  DWB_SLOT *p_dwb_ordered_slots = NULL;
2176  unsigned int i, ordered_slots_length;
2177  PERF_UTIME_TRACKER time_track;
2178  int num_pages;
2179  unsigned int current_block_to_flush, next_block_to_flush;
2180  int max_pages_to_sync;
2181 #if defined (SERVER_MODE)
2182  bool flush = false;
2183  PERF_UTIME_TRACKER time_track_file_sync_helper;
2184 #endif
2185 #if !defined (NDEBUG)
2186  DWB_BLOCK *saved_file_sync_helper_block = NULL;
2187  LOG_LSA nxio_lsa;
2188 #endif
2189 
2190  assert (block != NULL && block->count_wb_pages > 0 && dwb_is_created ());
2191 
2192  PERF_UTIME_TRACKER_START (thread_p, &time_track);
2193 
2194  /* Currently we allow only one block to be flushed. */
2195  ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, 1);
2196  assert (dwb_Global.blocks_flush_counter <= 1);
2197 
2198  /* Order slots by VPID, to flush faster. */
2199  error_code = dwb_block_create_ordered_slots (block, &p_dwb_ordered_slots, &ordered_slots_length);
2200  if (error_code != NO_ERROR)
2201  {
2202  error_code = ER_FAILED;
2203  goto end;
2204  }
2205 
2206  /* Remove duplicates */
2207  for (i = 0; i < block->count_wb_pages - 1; i++)
2208  {
2209  DWB_SLOT *s1, *s2;
2210 
2211  s1 = &p_dwb_ordered_slots[i];
2212  s2 = &p_dwb_ordered_slots[i + 1];
2213 
2214  assert (s1->io_page->prv.p_reserve_2 == 0);
2215 
2216  if (!VPID_ISNULL (&s1->vpid) && VPID_EQ (&s1->vpid, &s2->vpid))
2217  {
2218  /* Next slot contains the same page, but that page is newer than the current one. Invalidate the VPID to
2219  * avoid flushing the page twice. I'm sure that the current slot is not in hash.
2220  */
2221  assert (LSA_LE (&s1->lsa, &s2->lsa));
2222 
2223  VPID_SET_NULL (&s1->vpid);
2224 
2226  VPID_SET_NULL (&(block->slots[s1->position_in_block].vpid));
2227 
2228  fileio_initialize_res (thread_p, s1->io_page, IO_PAGESIZE);
2229  }
2230 
2231  /* Check for WAL protocol. */
2232 #if !defined (NDEBUG)
2233  if (s1->io_page->prv.pageid != NULL_PAGEID && logpb_need_wal (&s1->io_page->prv.lsa))
2234  {
2235  /* Need WAL. Check whether log buffer pool was destroyed. */
2236  nxio_lsa = log_Gl.append.get_nxio_lsa ();
2237  assert (LSA_ISNULL (&nxio_lsa));
2238  }
2239 #endif
2240  }
2241 
2242  PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_FLUSH_BLOCK_SORT_TIME_COUNTERS);
2243 
2244 #if !defined (NDEBUG)
2245  saved_file_sync_helper_block = (DWB_BLOCK *) dwb_Global.file_sync_helper_block;
2246 #endif
2247 
2248 #if defined (SERVER_MODE)
2249  PERF_UTIME_TRACKER_START (thread_p, &time_track_file_sync_helper);
2250 
2251  while (dwb_Global.file_sync_helper_block != NULL)
2252  {
2253  flush = true;
2254 
2255  /* Be sure that the previous block was written on disk, before writing the current block. */
2257  {
2258  /* Wait for file sync helper. */
2259  thread_sleep (1);
2260  }
2261  else
2262  {
2263  /* Helper not available, flush the volumes from previous block. */
2264  for (i = 0; i < dwb_Global.file_sync_helper_block->count_flush_volumes_info; i++)
2265  {
2267 
2268  if (ATOMIC_INC_32 (&(dwb_Global.file_sync_helper_block->flush_volumes_info[i].num_pages), 0) >= 0)
2269  {
2270  (void) fileio_synchronize (thread_p,
2271  dwb_Global.file_sync_helper_block->flush_volumes_info[i].vdes, NULL,
2273 
2274  dwb_log ("dwb_flush_block: Synchronized volume %d\n",
2276  }
2277  }
2278  (void) ATOMIC_TAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL);
2279  }
2280  }
2281 
2282 #if !defined (NDEBUG)
2283  if (saved_file_sync_helper_block)
2284  {
2285  for (i = 0; i < saved_file_sync_helper_block->count_flush_volumes_info; i++)
2286  {
2287  assert (saved_file_sync_helper_block->flush_volumes_info[i].all_pages_written == true
2288  && saved_file_sync_helper_block->flush_volumes_info[i].num_pages == 0);
2289  }
2290  }
2291 #endif
2292 
2293  if (flush == true)
2294  {
2295  PERF_UTIME_TRACKER_TIME (thread_p, &time_track_file_sync_helper, PSTAT_DWB_WAIT_FILE_SYNC_HELPER_TIME_COUNTERS);
2296  }
2297 #endif /* SERVER_MODE */
2298 
2299  ATOMIC_TAS_32 (&block->count_flush_volumes_info, 0);
2300  block->all_pages_written = false;
2301 
2302  /* First, write and flush the double write file buffer. */
2303  if (fileio_write_pages (thread_p, dwb_Global.vdes, block->write_buffer, 0, block->count_wb_pages,
2305  {
2306  /* Something wrong happened. */
2307  assert (false);
2308  error_code = ER_FAILED;
2309  goto end;
2310  }
2311 
2312  /* Increment statistics after writing in double write volume. */
2314 
2315  if (fileio_synchronize (thread_p, dwb_Global.vdes, dwb_Volume_name, FILEIO_SYNC_ONLY) != dwb_Global.vdes)
2316  {
2317  assert (false);
2318  /* Something wrong happened. */
2319  error_code = ER_FAILED;
2320  goto end;
2321  }
2322  dwb_log ("dwb_flush_block: DWB synchronized\n");
2323 
2324  /* Now, write and flush the original location. */
2325  error_code =
2326  dwb_write_block (thread_p, block, p_dwb_ordered_slots, ordered_slots_length, file_sync_helper_can_flush, true);
2327  if (error_code != NO_ERROR)
2328  {
2329  assert (false);
2330  goto end;
2331  }
2332 
2333  max_pages_to_sync = prm_get_integer_value (PRM_ID_PB_SYNC_ON_NFLUSH) / 2;
2334 
2335  /* Now, flush only the volumes having pages in current block. */
2336  for (i = 0; i < block->count_flush_volumes_info; i++)
2337  {
2338  assert (block->flush_volumes_info[i].vdes != NULL_VOLDES);
2339 
2340  num_pages = ATOMIC_INC_32 (&block->flush_volumes_info[i].num_pages, 0);
2341  if (num_pages == 0)
2342  {
2343  /* Flushed by helper. */
2344  continue;
2345  }
2346 
2347 #if defined (SERVER_MODE)
2348  if (file_sync_helper_can_flush == true)
2349  {
2350  if ((num_pages > max_pages_to_sync) && dwb_is_file_sync_helper_daemon_available ())
2351  {
2352  /* Let the helper thread to flush volumes having many pages. */
2353  assert (dwb_Global.file_sync_helper_block != NULL);
2354  continue;
2355  }
2356  }
2357  else
2358  {
2359  assert (dwb_Global.file_sync_helper_block == NULL);
2360  }
2361 #endif
2362 
2363  if (!ATOMIC_CAS_32 (&block->flush_volumes_info[i].flushed_status, VOLUME_NOT_FLUSHED,
2365  {
2366  /* Flushed by helper. */
2367  continue;
2368  }
2369 
2370  num_pages = ATOMIC_TAS_32 (&block->flush_volumes_info[i].num_pages, 0);
2371  assert (num_pages != 0);
2372 
2373  (void) fileio_synchronize (thread_p, block->flush_volumes_info[i].vdes, NULL, FILEIO_SYNC_ONLY);
2374 
2375  dwb_log ("dwb_flush_block: Synchronized volume %d\n", block->flush_volumes_info[i].vdes);
2376  }
2377 
2378  /* Allow to file sync helper thread to finish. */
2379  block->all_pages_written = true;
2380 
2382  {
2383  perfmon_db_flushed_block_volumes (thread_p, block->count_flush_volumes_info);
2384  }
2385 
2386  /* The block is full or there is only one thread that access DWB. */
2388  || DWB_IS_MODIFYING_STRUCTURE (ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0LL)));
2389 
2390  ATOMIC_TAS_32 (&block->count_wb_pages, 0);
2391  ATOMIC_INC_64 (&block->version, 1ULL);
2392 
2393  /* Reset block bit, since the block was flushed. */
2394 reset_bit_position:
2395  local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0LL);
2396  new_position_with_flags = DWB_ENDS_BLOCK_WRITING (local_current_position_with_flags, block->block_no);
2397 
2398  if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, local_current_position_with_flags, new_position_with_flags))
2399  {
2400  /* The position was changed by others, try again. */
2401  goto reset_bit_position;
2402  }
2403 
2404  /* Advance flushing to next block. */
2405  current_block_to_flush = dwb_Global.next_block_to_flush;
2406  next_block_to_flush = DWB_GET_NEXT_BLOCK_NO (current_block_to_flush);
2407 
2408  if (!ATOMIC_CAS_32 (&dwb_Global.next_block_to_flush, current_block_to_flush, next_block_to_flush))
2409  {
2410  /* I'm the only thread that can advance next block to flush. */
2411  assert_release (false);
2412  }
2413 
2414  /* Release locked threads, if any. */
2415  dwb_signal_block_completion (thread_p, block);
2416  if (current_position_with_flags)
2417  {
2418  *current_position_with_flags = new_position_with_flags;
2419  }
2420 
2421 end:
2422  ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, -1);
2423 
2424  if (p_dwb_ordered_slots != NULL)
2425  {
2426  free_and_init (p_dwb_ordered_slots);
2427  }
2428 
2429  PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_FLUSH_BLOCK_TIME_COUNTERS);
2430 
2431  return error_code;
2432 }
2433 
2434 /*
2435  * dwb_acquire_next_slot () - Acquire the next slot in DWB.
2436  *
2437  * return : Error code.
2438  * thread_p(in): The thread entry.
2439  * can_wait(in): True, if can wait to get the next slot.
2440  * p_dwb_slot(out): The pointer to the next slot in DWB.
2441  */
2442 STATIC_INLINE int
2443 dwb_acquire_next_slot (THREAD_ENTRY * thread_p, bool can_wait, DWB_SLOT ** p_dwb_slot)
2444 {
2445  UINT64 current_position_with_flags, current_position_with_block_write_started, new_position_with_flags;
2446  unsigned int current_block_no, position_in_current_block;
2447  int error_code = NO_ERROR;
2448  DWB_BLOCK *block;
2449 
2450  assert (p_dwb_slot != NULL);
2451  *p_dwb_slot = NULL;
2452 
2453 start:
2454  /* Get the current position in double write buffer. */
2455  current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
2456 
2457  if (DWB_NOT_CREATED_OR_MODIFYING (current_position_with_flags))
2458  {
2459  /* Rarely happens. */
2460  if (DWB_IS_MODIFYING_STRUCTURE (current_position_with_flags))
2461  {
2462  if (can_wait == false)
2463  {
2464  return NO_ERROR;
2465  }
2466 
2467  /* DWB structure change started, needs to wait. */
2468  error_code = dwb_wait_for_strucure_modification (thread_p);
2469  if (error_code != NO_ERROR)
2470  {
2471  if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
2472  {
2473  /* timeout, try again */
2474  goto start;
2475  }
2476  return error_code;
2477  }
2478 
2479  /* Probably someone else advanced the position, try again. */
2480  goto start;
2481  }
2482  else if (!DWB_IS_CREATED (current_position_with_flags))
2483  {
2484  if (DWB_IS_ANY_BLOCK_WRITE_STARTED (current_position_with_flags))
2485  {
2486  /* Someone deleted the DWB, before flushing the data. */
2488  return ER_DWB_DISABLED;
2489  }
2490 
2491  /* Someone deleted the DWB */
2492  return NO_ERROR;
2493  }
2494  else
2495  {
2496  assert (false);
2497  }
2498  }
2499 
2500  current_block_no = DWB_GET_BLOCK_NO_FROM_POSITION (current_position_with_flags);
2501  position_in_current_block = DWB_GET_POSITION_IN_BLOCK (current_position_with_flags);
2502 
2503  assert (current_block_no < DWB_NUM_TOTAL_BLOCKS && position_in_current_block < DWB_BLOCK_NUM_PAGES);
2504 
2505  if (position_in_current_block == 0)
2506  {
2507  /* This is the first write on current block. Before writing, check whether the previous iteration finished. */
2508  if (DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, current_block_no))
2509  {
2510  if (can_wait == false)
2511  {
2512  return NO_ERROR;
2513  }
2514 
2515  dwb_log ("Waits for flushing block=%d having version=%lld) \n",
2516  current_block_no, dwb_Global.blocks[current_block_no].version);
2517 
2518  /*
2519  * The previous iteration didn't finished, needs to wait, in order to avoid buffer overwriting.
2520  * Should happens relative rarely, except the case when the buffer consist in only one block.
2521  */
2522  error_code = dwb_wait_for_block_completion (thread_p, current_block_no);
2523  if (error_code != NO_ERROR)
2524  {
2525  if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
2526  {
2527  /* timeout, try again */
2528  goto start;
2529  }
2530 
2531  dwb_log_error ("Error %d while waiting for flushing block=%d having version %lld \n",
2532  error_code, current_block_no, dwb_Global.blocks[current_block_no].version);
2533  return error_code;
2534  }
2535 
2536  /* Probably someone else advanced the position, try again. */
2537  goto start;
2538  }
2539 
2540  /* First write in the current block. */
2541  assert (!DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, current_block_no));
2542 
2543  current_position_with_block_write_started =
2544  DWB_STARTS_BLOCK_WRITING (current_position_with_flags, current_block_no);
2545 
2546  new_position_with_flags = DWB_GET_NEXT_POSITION_WITH_FLAGS (current_position_with_block_write_started);
2547  }
2548  else
2549  {
2550  /* I'm sure that nobody else can delete the buffer */
2551  assert (DWB_IS_CREATED (dwb_Global.position_with_flags));
2553 
2554  /* Compute the next position with flags */
2555  new_position_with_flags = DWB_GET_NEXT_POSITION_WITH_FLAGS (current_position_with_flags);
2556  }
2557 
2558  /* Compute and advance the global position in double write buffer. */
2559  if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, current_position_with_flags, new_position_with_flags))
2560  {
2561  /* Someone else advanced the global position in double write buffer, try again. */
2562  goto start;
2563  }
2564 
2565  block = dwb_Global.blocks + current_block_no;
2566 
2567  *p_dwb_slot = block->slots + position_in_current_block;
2568 
2569  /* Invalidate slot content. */
2570  VPID_SET_NULL (&(*p_dwb_slot)->vpid);
2571 
2572  assert ((*p_dwb_slot)->position_in_block == position_in_current_block);
2573 
2574  return NO_ERROR;
2575 }
2576 
2577 /*
2578  * dwb_set_slot_data () - Set DWB data at the location indicated by the slot.
2579  *
2580  * return : Error code.
2581  * thread_p(in): Thread entry
2582  * dwb_slot(in/out): DWB slot that contains the location where the data must be set.
2583  * io_page_p(in): The data.
2584  */
2585 STATIC_INLINE void
2586 dwb_set_slot_data (THREAD_ENTRY * thread_p, DWB_SLOT * dwb_slot, FILEIO_PAGE * io_page_p)
2587 {
2588  assert (dwb_slot != NULL && io_page_p != NULL);
2589 
2590  assert (io_page_p->prv.p_reserve_2 == 0);
2591 
2592  if (io_page_p->prv.pageid != NULL_PAGEID)
2593  {
2594  memcpy (dwb_slot->io_page, (char *) io_page_p, IO_PAGESIZE);
2595  }
2596  else
2597  {
2598  /* Initialize page for consistency. */
2599  fileio_initialize_res (thread_p, dwb_slot->io_page, IO_PAGESIZE);
2600  }
2601 
2602  assert (fileio_is_page_sane (io_page_p, IO_PAGESIZE));
2603  LSA_COPY (&dwb_slot->lsa, &io_page_p->prv.lsa);
2604  VPID_SET (&dwb_slot->vpid, io_page_p->prv.volid, io_page_p->prv.pageid);
2605 }
2606 
2607 /*
2608  * dwb_init_slot () - Initialize DWB slot.
2609  *
2610  * return : Nothing.
2611  * slot (in/out) : The DWB slot.
2612  */
2613 STATIC_INLINE void
2615 {
2616  assert (slot != NULL);
2617 
2618  slot->io_page = NULL;
2619  VPID_SET_NULL (&slot->vpid);
2620  LSA_SET_NULL (&slot->lsa);
2621 }
2622 
2623 /*
2624  * dwb_get_next_block_for_flush(): Get next block for flush.
2625  *
2626  * returns: Nothing
2627  * thread_p (in): The thread entry.
2628  * block_no(out): The next block for flush if found, otherwise DWB_NUM_TOTAL_BLOCKS.
2629  */
2630 STATIC_INLINE void
2631 dwb_get_next_block_for_flush (THREAD_ENTRY * thread_p, unsigned int *block_no)
2632 {
2633  assert (block_no != NULL);
2634 
2635  *block_no = DWB_NUM_TOTAL_BLOCKS;
2636 
2637  /* check whether the next block can be flushed. */
2638  if (dwb_Global.blocks[dwb_Global.next_block_to_flush].count_wb_pages != DWB_BLOCK_NUM_PAGES)
2639  {
2640  /* Next block is not full yet. */
2641  return;
2642  }
2643 
2644  *block_no = dwb_Global.next_block_to_flush;
2645 }
2646 
2647 /*
2648  * dwb_set_data_on_next_slot () - Sets data at the next DWB slot, if possible.
2649  *
2650  * return : Error code.
2651  * thread_p(in): The thread entry.
2652  * io_page_p(in): The data that will be set on next slot.
2653  * can_wait(in): True, if waiting is allowed.
2654  * p_dwb_slot(out): Pointer to the next free DWB slot.
2655  */
2656 int
2657 dwb_set_data_on_next_slot (THREAD_ENTRY * thread_p, FILEIO_PAGE * io_page_p, bool can_wait, DWB_SLOT ** p_dwb_slot)
2658 {
2659  int error_code;
2660 
2661  assert (p_dwb_slot != NULL && io_page_p != NULL);
2662 
2663  /* Acquire the slot before setting the data. */
2664  error_code = dwb_acquire_next_slot (thread_p, can_wait, p_dwb_slot);
2665  if (error_code != NO_ERROR)
2666  {
2667  return error_code;
2668  }
2669 
2670  assert (can_wait == false || *p_dwb_slot != NULL);
2671  if (*p_dwb_slot == NULL)
2672  {
2673  /* Can't acquire next slot. */
2674  return NO_ERROR;
2675  }
2676 
2677  /* Set data on slot. */
2678  dwb_set_slot_data (thread_p, *p_dwb_slot, io_page_p);
2679 
2680  return NO_ERROR;
2681 }
2682 
2683 /*
2684  * dwb_add_page () - Add page content to DWB.
2685  *
2686  * return : Error code.
2687  * thread_p (in): The thread entry.
2688  * io_page_p(in): In-memory address where the current content of page resides.
2689  * vpid(in): Page identifier.
2690  * p_dwb_slot(in/out): DWB slot where the page content must be added.
2691  *
2692  * Note: thread may flush the block, if flush thread is not available or we are in stand alone.
2693  */
2694 int
2695 dwb_add_page (THREAD_ENTRY * thread_p, FILEIO_PAGE * io_page_p, VPID * vpid, DWB_SLOT ** p_dwb_slot)
2696 {
2697  unsigned int count_wb_pages;
2698  int error_code = NO_ERROR;
2699  bool inserted = false;
2700  DWB_BLOCK *block = NULL;
2701  DWB_SLOT *dwb_slot = NULL;
2702  bool needs_flush;
2703 
2704  assert (p_dwb_slot != NULL && (io_page_p != NULL || (*p_dwb_slot)->io_page != NULL) && vpid != NULL);
2705 
2706  if (thread_p == NULL)
2707  {
2708  thread_p = thread_get_thread_entry_info ();
2709  }
2710 
2711  if (*p_dwb_slot == NULL)
2712  {
2713  error_code = dwb_set_data_on_next_slot (thread_p, io_page_p, true, p_dwb_slot);
2714  if (error_code != NO_ERROR)
2715  {
2716  return error_code;
2717  }
2718 
2719  if (*p_dwb_slot == NULL)
2720  {
2721  return NO_ERROR;
2722  }
2723  }
2724 
2725  dwb_slot = *p_dwb_slot;
2726 
2727  assert (VPID_EQ (vpid, &dwb_slot->vpid));
2728  if (!VPID_ISNULL (vpid))
2729  {
2730  error_code = dwb_slots_hash_insert (thread_p, vpid, dwb_slot, &inserted);
2731  if (error_code != NO_ERROR)
2732  {
2733  return error_code;
2734  }
2735 
2736  if (!inserted)
2737  {
2738  /* Invalidate the slot to avoid flushing the same data twice. */
2739  VPID_SET_NULL (&dwb_slot->vpid);
2740  fileio_initialize_res (thread_p, dwb_slot->io_page, IO_PAGESIZE);
2741  }
2742  }
2743 
2744  dwb_log ("dwb_add_page: added page = (%d,%d) on block (%d) position (%d)\n", vpid->volid, vpid->pageid,
2745  dwb_slot->block_no, dwb_slot->position_in_block);
2746 
2747  block = &dwb_Global.blocks[dwb_slot->block_no];
2748  count_wb_pages = ATOMIC_INC_32 (&block->count_wb_pages, 1);
2749  assert_release (count_wb_pages <= DWB_BLOCK_NUM_PAGES);
2750 
2751  if (count_wb_pages < DWB_BLOCK_NUM_PAGES)
2752  {
2753  needs_flush = false;
2754  }
2755  else
2756  {
2757  needs_flush = true;
2758  }
2759 
2760  if (needs_flush == false)
2761  {
2762  return NO_ERROR;
2763  }
2764 
2765  /*
2766  * The blocks must be flushed in the order they are filled to have consistent data. The flush block thread knows
2767  * how to flush the blocks in the order they are filled. So, we don't care anymore about the flushing order here.
2768  * Initially, we waited here if the previous block was not flushed. That approach created delays.
2769  */
2770 
2771 #if defined (SERVER_MODE)
2772  /*
2773  * Wake ups flush block thread to flush the current block. The current block will be flushed after flushing the
2774  * previous block.
2775  */
2777  {
2778  /* Wakeup the thread that will flush the block. */
2779  dwb_flush_block_daemon->wakeup ();
2780 
2781  return NO_ERROR;
2782  }
2783 #endif /* SERVER_MODE */
2784 
2785  /* Flush all pages from current block */
2786  error_code = dwb_flush_block (thread_p, block, false, NULL);
2787  if (error_code != NO_ERROR)
2788  {
2789  dwb_log_error ("Can't flush block = %d having version %lld\n", block->block_no, block->version);
2790 
2791  return error_code;
2792  }
2793 
2794  dwb_log ("Successfully flushed DWB block = %d having version %lld\n", block->block_no, block->version);
2795 
2796  return NO_ERROR;
2797 }
2798 
2799 /*
2800  * dwb_is_created () - Checks whether double write buffer was created.
2801  *
2802  * return : True, if created.
2803  */
2804 bool
2806 {
2807  UINT64 position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
2808 
2809  return DWB_IS_CREATED (position_with_flags);
2810 }
2811 
2812 /*
2813  * dwb_create () - Create DWB.
2814  *
2815  * return : Error code.
2816  * thread_p (in): The thread entry.
2817  * dwb_path_p (in) : The double write buffer volume path.
2818  * db_name_p (in) : The database name.
2819  */
2820 int
2821 dwb_create (THREAD_ENTRY * thread_p, const char *dwb_path_p, const char *db_name_p)
2822 {
2823  UINT64 current_position_with_flags;
2824  int error_code = NO_ERROR;
2825 
2826  error_code = dwb_starts_structure_modification (thread_p, &current_position_with_flags);
2827  if (error_code != NO_ERROR)
2828  {
2829  dwb_log_error ("Can't create DWB: error = %d\n", error_code);
2830  return error_code;
2831  }
2832 
2833  /* DWB structure modification started, no other transaction can modify the global position with flags */
2834  if (DWB_IS_CREATED (current_position_with_flags))
2835  {
2836  /* Already created, restore the modification flag. */
2837  goto end;
2838  }
2839 
2840  fileio_make_dwb_name (dwb_Volume_name, dwb_path_p, db_name_p);
2841 
2842  error_code = dwb_create_internal (thread_p, dwb_Volume_name, &current_position_with_flags);
2843  if (error_code != NO_ERROR)
2844  {
2845  dwb_log_error ("Can't create DWB: error = %d\n", error_code);
2846  goto end;
2847  }
2848 
2849 end:
2850  /* Ends the modification, allowing to others to modify global position with flags. */
2851  dwb_ends_structure_modification (thread_p, current_position_with_flags);
2852 
2853  return error_code;
2854 }
2855 
2856 /*
2857  * dwb_recreate () - Recreate double write buffer with new user parameter values.
2858  *
2859  * return : Error code.
2860  * thread_p (in): The thread entry.
2861  */
2862 int
2864 {
2865  int error_code = NO_ERROR;
2866  UINT64 current_position_with_flags;
2867 
2868  if (thread_p == NULL)
2869  {
2870  thread_p = thread_get_thread_entry_info ();
2871  }
2872 
2873  error_code = dwb_starts_structure_modification (thread_p, &current_position_with_flags);
2874  if (error_code != NO_ERROR)
2875  {
2876  return error_code;
2877  }
2878 
2879  /* DWB structure modification started, no other transaction can modify the global position with flags */
2880  if (DWB_IS_CREATED (current_position_with_flags))
2881  {
2882  dwb_destroy_internal (thread_p, &current_position_with_flags);
2883  }
2884 
2885  error_code = dwb_create_internal (thread_p, dwb_Volume_name, &current_position_with_flags);
2886  if (error_code != NO_ERROR)
2887  {
2888  goto end;
2889  }
2890 
2891 end:
2892  /* Ends the modification, allowing to others to modify global position with flags. */
2893  dwb_ends_structure_modification (thread_p, current_position_with_flags);
2894 
2895  return error_code;
2896 }
2897 
2898 #if !defined (NDEBUG)
2899 /*
2900  * dwb_debug_check_dwb () - check sanity of ordered slots
2901  *
2902  * return : Error code
2903  * thread_p (in): The thread entry.
2904  * p_dwb_ordered_slots(in):
2905  * num_dwb_pages(in):
2906  *
2907  */
2908 static int
2909 dwb_debug_check_dwb (THREAD_ENTRY * thread_p, DWB_SLOT * p_dwb_ordered_slots, unsigned int num_dwb_pages)
2910 {
2911  char page_buf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
2912  FILEIO_PAGE *iopage;
2913  int error_code;
2914  unsigned int i;
2915  bool is_page_corrupted;
2916 
2917  iopage = (FILEIO_PAGE *) PTR_ALIGN (page_buf, MAX_ALIGNMENT);
2918  memset (iopage, 0, IO_PAGESIZE);
2919 
2920  /* Check for duplicates in DWB. */
2921  for (i = 1; i < num_dwb_pages; i++)
2922  {
2923  if (VPID_ISNULL (&p_dwb_ordered_slots[i].vpid))
2924  {
2925  continue;
2926  }
2927 
2928  if (VPID_EQ (&p_dwb_ordered_slots[i].vpid, &p_dwb_ordered_slots[i - 1].vpid))
2929  {
2930  /* Same VPID, at least one is corrupted. */
2931  error_code = fileio_page_check_corruption (thread_p, p_dwb_ordered_slots[i - 1].io_page, &is_page_corrupted);
2932  if (error_code != NO_ERROR)
2933  {
2934  return error_code;
2935  }
2936 
2937  if (is_page_corrupted)
2938  {
2939  continue;
2940  }
2941 
2942  error_code = fileio_page_check_corruption (thread_p, p_dwb_ordered_slots[i].io_page, &is_page_corrupted);
2943  if (error_code != NO_ERROR)
2944  {
2945  return error_code;
2946  }
2947 
2948  if (is_page_corrupted)
2949  {
2950  continue;
2951  }
2952 
2953  if (memcmp (p_dwb_ordered_slots[i - 1].io_page, iopage, IO_PAGESIZE) == 0)
2954  {
2955  /* Skip not initialized pages. */
2956  continue;
2957  }
2958 
2959  if (memcmp (p_dwb_ordered_slots[i].io_page, iopage, IO_PAGESIZE) == 0)
2960  {
2961  /* Skip not initialized pages. */
2962  continue;
2963  }
2964 
2965  /* Found duplicates - something is wrong. We may still can check for same LSAs.
2966  * But, duplicates occupies disk space so is better to avoid it.
2967  */
2968  assert (false);
2969  }
2970  }
2971 
2972  return NO_ERROR;
2973 }
2974 #endif // DEBUG
2975 
2976 /*
2977  * dwb_check_data_page_is_sane () - Check whether the data page is corrupted.
2978  *
2979  * return : Error code
2980  * thread_p (in): The thread entry.
2981  * block(in): DWB recovery block.
2982  * p_dwb_ordered_slots(in): DWB ordered slots
2983  * p_num_recoverable_pages(out): number of recoverable corrupted pages
2984  *
2985  */
2986 static int
2987 dwb_check_data_page_is_sane (THREAD_ENTRY * thread_p, DWB_BLOCK * rcv_block, DWB_SLOT * p_dwb_ordered_slots,
2988  int *p_num_recoverable_pages)
2989 {
2990  char page_buf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
2991  FILEIO_PAGE *iopage;
2992  VPID *vpid;
2993  int vol_fd = NULL_VOLDES, temp_vol_fd = NULL_VOLDES, vol_pages = 0;
2994  INT16 volid;
2995  int error_code;
2996  unsigned int i;
2997  int num_recoverable_pages = 0;
2998  bool is_page_corrupted;
2999 
3000  assert (rcv_block != NULL && p_dwb_ordered_slots != NULL && p_num_recoverable_pages != NULL);
3001  iopage = (FILEIO_PAGE *) PTR_ALIGN (page_buf, MAX_ALIGNMENT);
3002  memset (iopage, 0, IO_PAGESIZE);
3003 
3004  volid = NULL_VOLID;
3005 
3006  /* Check whether the data page is corrupted. If true, replaced with the DWB page. */
3007  for (i = 0; i < rcv_block->count_wb_pages; i++)
3008  {
3009  vpid = &p_dwb_ordered_slots[i].vpid;
3010  if (VPID_ISNULL (vpid))
3011  {
3012  continue;
3013  }
3014 
3015  if (volid != vpid->volid)
3016  {
3017  /* Update the current VPID and get the volume descriptor. */
3018  temp_vol_fd = fileio_get_volume_descriptor (vpid->volid);
3019  if (temp_vol_fd == NULL_VOLDES)
3020  {
3021  continue;
3022  }
3023  vol_fd = temp_vol_fd;
3024  volid = vpid->volid;
3025  vol_pages = fileio_get_number_of_volume_pages (vol_fd, IO_PAGESIZE);
3026  }
3027 
3028  assert (vol_fd != NULL_VOLDES);
3029 
3030  if (vpid->pageid >= vol_pages)
3031  {
3032  /* The page was written in DWB, not in data volume. */
3033  continue;
3034  }
3035 
3036  /* Read the page from data volume. */
3037  if (fileio_read (thread_p, vol_fd, iopage, vpid->pageid, IO_PAGESIZE) == NULL)
3038  {
3039  /* There was an error in reading the page. */
3040  ASSERT_ERROR_AND_SET (error_code);
3041  return error_code;
3042  }
3043 
3044  error_code = fileio_page_check_corruption (thread_p, iopage, &is_page_corrupted);
3045  if (error_code != NO_ERROR)
3046  {
3047  return error_code;
3048  }
3049 
3050  if (!is_page_corrupted)
3051  {
3052  /* The page in data volume is not corrupted. Do not overwrite its content - reset slot VPID. */
3053  VPID_SET_NULL (&p_dwb_ordered_slots[i].vpid);
3054  fileio_initialize_res (thread_p, p_dwb_ordered_slots[i].io_page, IO_PAGESIZE);
3055  continue;
3056  }
3057 
3058  /* Corrupted page in data volume. Check DWB. */
3059  error_code = fileio_page_check_corruption (thread_p, p_dwb_ordered_slots[i].io_page, &is_page_corrupted);
3060  if (error_code != NO_ERROR)
3061  {
3062  return error_code;
3063  }
3064 
3065  if (is_page_corrupted)
3066  {
3067  /* The page is corrupted in data volume and DWB. Something wrong happened. */
3068  assert_release (false);
3069  dwb_log_error ("Can't recover page = (%d,%d)\n", vpid->volid, vpid->pageid);
3070  return ER_FAILED;
3071  }
3072 
3073  /* The page content in data volume will be replaced later with the DWB page content. */
3074  dwb_log ("page = (%d,%d) is recovered with DWB.\n", vpid->volid, vpid->pageid);
3075  num_recoverable_pages++;
3076  }
3077 
3078  *p_num_recoverable_pages = num_recoverable_pages;
3079  return NO_ERROR;
3080 }
3081 
3082 /*
3083  * dwb_load_and_recover_pages () - Load and recover pages from DWB.
3084  *
3085  * return : Error code.
3086  * thread_p (in): The thread entry.
3087  * dwb_path_p (in): The double write buffer path.
3088  * db_name_p (in): The database name.
3089  *
3090  * Note: This function is called at recovery. The corrupted pages are recovered from double write volume buffer disk.
3091  * Then, double write volume buffer disk is recreated according to user specifications.
3092  * Currently we use a DWB block in memory to recover corrupted page.
3093  */
3094 int
3095 dwb_load_and_recover_pages (THREAD_ENTRY * thread_p, const char *dwb_path_p, const char *db_name_p)
3096 {
3097  int error_code = NO_ERROR, read_fd = NULL_VOLDES;
3098  unsigned int num_dwb_pages, ordered_slots_length, i;
3099  DWB_BLOCK *rcv_block = NULL;
3100  DWB_SLOT *p_dwb_ordered_slots = NULL;
3101  FILEIO_PAGE *iopage;
3102  int num_recoverable_pages;
3103 
3104  assert (dwb_Global.vdes == NULL_VOLDES);
3105 
3106  dwb_check_logging ();
3107 
3108  fileio_make_dwb_name (dwb_Volume_name, dwb_path_p, db_name_p);
3109 
3111  {
3112  /* Open DWB volume first */
3113  read_fd = fileio_mount (thread_p, boot_db_full_name (), dwb_Volume_name, LOG_DBDWB_VOLID, false, false);
3114  if (read_fd == NULL_VOLDES)
3115  {
3116  return ER_IO_MOUNT_FAIL;
3117  }
3118 
3119  num_dwb_pages = fileio_get_number_of_volume_pages (read_fd, IO_PAGESIZE);
3120  dwb_log ("dwb_load_and_recover_pages: The number of pages in DWB %d\n", num_dwb_pages);
3121 
3122  /* We are in recovery phase. The system may be restarted with DWB size different than parameter value.
3123  * There may be one of the following two reasons:
3124  * - the user may intentionally modified the DWB size, before restarting.
3125  * - DWB size didn't changed, but the previous DWB was created, partially flushed and the system crashed.
3126  * We know that a valid DWB size must be a power of 2. In this case we recover from DWB. Otherwise, skip
3127  * recovering from DWB - the modifications are not reflected in data pages.
3128  * Another approach would be to recover, even if the DWB size is not a power of 2 (DWB partially flushed).
3129  */
3130  if ((num_dwb_pages > 0) && IS_POWER_OF_2 (num_dwb_pages))
3131  {
3132  /* Create DWB block for recovery purpose. */
3133  error_code = dwb_create_blocks (thread_p, 1, num_dwb_pages, &rcv_block);
3134  if (error_code != NO_ERROR)
3135  {
3136  goto end;
3137  }
3138 
3139  /* Read pages in block write area. This means that slot pages are set. */
3140  if (fileio_read_pages (thread_p, read_fd, rcv_block->write_buffer, 0, num_dwb_pages, IO_PAGESIZE) == NULL)
3141  {
3142  error_code = ER_FAILED;
3143  goto end;
3144  }
3145 
3146  /* Set slots VPID and LSA from pages. */
3147  for (i = 0; i < num_dwb_pages; i++)
3148  {
3149  iopage = rcv_block->slots[i].io_page;
3150 
3151  VPID_SET (&rcv_block->slots[i].vpid, iopage->prv.volid, iopage->prv.pageid);
3152  LSA_COPY (&rcv_block->slots[i].lsa, &iopage->prv.lsa);
3153  }
3154  rcv_block->count_wb_pages = num_dwb_pages;
3155 
3156  /* Order slots by VPID, to flush faster. */
3157  error_code = dwb_block_create_ordered_slots (rcv_block, &p_dwb_ordered_slots, &ordered_slots_length);
3158  if (error_code != NO_ERROR)
3159  {
3160  error_code = ER_FAILED;
3161  goto end;
3162  }
3163 
3164  /* Remove duplicates. Normally, we do not expect duplicates in DWB. However, this happens if
3165  * the system crashes in the middle of flushing into double write file. In this case, some pages in DWB
3166  * are from the last DWB flush and the other from the previous DWB flush.
3167  */
3168  for (i = 0; i < rcv_block->count_wb_pages - 1; i++)
3169  {
3170  DWB_SLOT *s1, *s2;
3171 
3172  s1 = &p_dwb_ordered_slots[i];
3173  s2 = &p_dwb_ordered_slots[i + 1];
3174 
3175  if (!VPID_ISNULL (&s1->vpid) && VPID_EQ (&s1->vpid, &s2->vpid))
3176  {
3177  /* Next slot contains the same page. Search for the oldest version. */
3178  assert (LSA_LE (&s1->lsa, &s2->lsa));
3179 
3180  dwb_log ("dwb_load_and_recover_pages: Found duplicates in DWB at positions = (%d,%d) %d\n",
3182 
3183  if (LSA_LT (&s1->lsa, &s2->lsa))
3184  {
3185  /* Invalidate the oldest page version. */
3186  VPID_SET_NULL (&s1->vpid);
3187  dwb_log ("dwb_load_and_recover_pages: Invalidated the page at position = (%d)\n",
3188  s1->position_in_block);
3189  }
3190  else
3191  {
3192  /* Same LSA. This is the case when page was modified without setting LSA.
3193  * The first appearance in DWB contains the oldest page modification - last flush in DWB!
3194  */
3196 
3197  if (s1->position_in_block < s2->position_in_block)
3198  {
3199  /* Page of s1 is valid. */
3200  VPID_SET_NULL (&s2->vpid);
3201  dwb_log ("dwb_load_and_recover_pages: Invalidated the page at position = (%d)\n",
3202  s2->position_in_block);
3203  }
3204  else
3205  {
3206  /* Page of s2 is valid. */
3207  VPID_SET_NULL (&s1->vpid);
3208  dwb_log ("dwb_load_and_recover_pages: Invalidated the page at position = (%d)\n",
3209  s1->position_in_block);
3210  }
3211  }
3212  }
3213  }
3214 
3215 #if !defined (NDEBUG)
3216  // check sanity of ordered slots
3217  error_code = dwb_debug_check_dwb (thread_p, p_dwb_ordered_slots, num_dwb_pages);
3218  if (error_code != NO_ERROR)
3219  {
3220  goto end;
3221  }
3222 #endif // DEBUG
3223 
3224  /* Check whether the data page is corrupted. If the case, it will be replaced with the DWB page. */
3225  error_code = dwb_check_data_page_is_sane (thread_p, rcv_block, p_dwb_ordered_slots, &num_recoverable_pages);
3226  if (error_code != NO_ERROR)
3227  {
3228  goto end;
3229  }
3230 
3231  if (0 < num_recoverable_pages)
3232  {
3233  /* Replace the corrupted pages in data volume with the DWB content. */
3234  error_code =
3235  dwb_write_block (thread_p, rcv_block, p_dwb_ordered_slots, ordered_slots_length, false, false);
3236  if (error_code != NO_ERROR)
3237  {
3238  goto end;
3239  }
3240 
3241  /* Now, flush the volumes having pages in current block. */
3242  for (i = 0; i < rcv_block->count_flush_volumes_info; i++)
3243  {
3244  if (fileio_synchronize (thread_p, rcv_block->flush_volumes_info[i].vdes, NULL,
3246  {
3247  error_code = ER_FAILED;
3248  goto end;
3249  }
3250 
3251  dwb_log ("dwb_load_and_recover_pages: Synchronized volume %d\n",
3252  rcv_block->flush_volumes_info[i].vdes);
3253  }
3254 
3255  rcv_block->count_flush_volumes_info = 0;
3256  }
3257 
3258  assert (rcv_block->count_flush_volumes_info == 0);
3259  }
3260 
3261  /* Dismount the file. */
3262  fileio_dismount (thread_p, read_fd);
3263 
3264  /* Destroy the old file, since data recovered. */
3265  fileio_unformat (thread_p, dwb_Volume_name);
3266  read_fd = NULL_VOLDES;
3267  }
3268 
3269  /* Since old file destroyed, now we can rebuild the new double write buffer with user specifications. */
3270  error_code = dwb_create (thread_p, dwb_path_p, db_name_p);
3271  if (error_code != NO_ERROR)
3272  {
3273  dwb_log_error ("Can't create DWB \n");
3274  }
3275 
3276 end:
3277  /* Do not remove the old file if an error occurs. */
3278  if (p_dwb_ordered_slots != NULL)
3279  {
3280  free_and_init (p_dwb_ordered_slots);
3281  }
3282 
3283  if (rcv_block != NULL)
3284  {
3285  dwb_finalize_block (rcv_block);
3286  free_and_init (rcv_block);
3287  }
3288 
3289  return error_code;
3290 }
3291 
3292 /*
3293  * dwb_destroy () - Destroy DWB.
3294  *
3295  * return : Error code.
3296  * thread_p (in): The thread entry.
3297  */
3298 int
3300 {
3301  int error_code = NO_ERROR;
3302  UINT64 current_position_with_flags;
3303 
3304  error_code = dwb_starts_structure_modification (thread_p, &current_position_with_flags);
3305  if (error_code != NO_ERROR)
3306  {
3307  return error_code;
3308  }
3309 
3310  /* DWB structure modification started, no other transaction can modify the global position with flags */
3311  if (!DWB_IS_CREATED (current_position_with_flags))
3312  {
3313  /* Not created, nothing to destroy, restore the modification flag. */
3314  goto end;
3315  }
3316 
3317  dwb_destroy_internal (thread_p, &current_position_with_flags);
3318 
3319 end:
3320  /* Ends the modification, allowing to others to modify global position with flags. */
3321  dwb_ends_structure_modification (thread_p, current_position_with_flags);
3322 
3323  /* DWB is destroyed, */
3324 #if defined(SERVER_MODE)
3325  dwb_daemons_destroy ();
3326 #endif
3327 
3328  return error_code;
3329 }
3330 
3331 /*
3332  * dwb_get_volume_name - Get the double write volume name.
3333  * return : DWB volume name.
3334  */
3335 char *
3337 {
3338  if (dwb_is_created ())
3339  {
3340  return dwb_Volume_name;
3341  }
3342  else
3343  {
3344  return NULL;
3345  }
3346 }
3347 
3348 /*
3349  * dwb_flush_next_block(): Flush next block.
3350  *
3351  * returns: error code
3352  * thread_p (in): The thread entry.
3353  */
3354 static int
3356 {
3357  unsigned int block_no;
3358  DWB_BLOCK *flush_block = NULL;
3359  int error_code = NO_ERROR;
3360  UINT64 position_with_flags;
3361 
3362 start:
3363  position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
3364  if (!DWB_IS_CREATED (position_with_flags) || DWB_IS_MODIFYING_STRUCTURE (position_with_flags))
3365  {
3366  return NO_ERROR;
3367  }
3368 
3369  dwb_get_next_block_for_flush (thread_p, &block_no);
3370  if (block_no < DWB_NUM_TOTAL_BLOCKS)
3371  {
3372  flush_block = &dwb_Global.blocks[block_no];
3373 
3374  /* Flush all pages from current block */
3375  assert (flush_block != NULL && flush_block->count_wb_pages == DWB_BLOCK_NUM_PAGES);
3376 
3377  if (DWB_GET_PREV_BLOCK (flush_block->block_no)->version < flush_block->version
3378  || ((DWB_GET_PREV_BLOCK (flush_block->block_no)->version == flush_block->version)
3379  && (flush_block->block_no > DWB_GET_PREV_BLOCK_NO (flush_block->block_no))))
3380  {
3381  if (DWB_GET_PREV_BLOCK (flush_block->block_no)->count_wb_pages != 0)
3382  {
3383  assert_release (false);
3384  }
3385  }
3386 
3387  error_code = dwb_flush_block (thread_p, flush_block, true, NULL);
3388  if (error_code != NO_ERROR)
3389  {
3390  /* Something wrong happened. */
3391  dwb_log_error ("Can't flush block = %d having version %lld\n", flush_block->block_no, flush_block->version);
3392 
3393  return error_code;
3394  }
3395 
3396  dwb_log ("Successfully flushed DWB block = %d having version %lld\n",
3397  flush_block->block_no, flush_block->version);
3398 
3399  /* Check whether is another block available for flush. */
3400  goto start;
3401  }
3402 
3403  return NO_ERROR;
3404 }
3405 
3406 /*
3407  * dwb_flush_force () - Force flushing the current content of DWB.
3408  *
3409  * return : Error code.
3410  * thread_p (in): The thread entry.
3411  * all_sync (out): True, if everything synchronized.
3412  */
3413 int
3414 dwb_flush_force (THREAD_ENTRY * thread_p, bool * all_sync)
3415 {
3416  UINT64 initial_position_with_flags, current_position_with_flags, prev_position_with_flags;
3417  UINT64 initial_block_version, current_block_version;
3418  int initial_block_no, current_block_no = DWB_NUM_TOTAL_BLOCKS;
3419  char page_buf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
3420  FILEIO_PAGE *iopage = NULL;
3421  VPID null_vpid = { NULL_VOLID, NULL_PAGEID };
3422  int error_code = NO_ERROR;
3423  DWB_SLOT *dwb_slot = NULL;
3424  unsigned int count_added_pages = 0, max_pages_to_add = 0, initial_num_pages = 0;
3425  DWB_BLOCK *initial_block;
3426  PERF_UTIME_TRACKER time_track;
3427  int block_no;
3428 
3429  assert (all_sync != NULL);
3430 
3431  PERF_UTIME_TRACKER_START (thread_p, &time_track);
3432 
3433  *all_sync = false;
3434 
3435 start:
3436  initial_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
3437  dwb_log ("dwb_flush_force: Started with initital position = %lld\n", initial_position_with_flags);
3438 
3439 #if !defined (NDEBUG)
3440  if (dwb_Global.blocks != NULL)
3441  {
3442  for (block_no = 0; block_no < (int) DWB_NUM_TOTAL_BLOCKS; block_no++)
3443  {
3444  dwb_log_error ("dwb_flush_force start: Block %d, Num pages = %d, version = %lld\n",
3445  block_no, dwb_Global.blocks[block_no].count_wb_pages, dwb_Global.blocks[block_no].version);
3446  }
3447  }
3448 #endif
3449 
3450  if (DWB_NOT_CREATED_OR_MODIFYING (initial_position_with_flags))
3451  {
3452  if (!DWB_IS_CREATED (initial_position_with_flags))
3453  {
3454  /* Nothing to do. Everything flushed. */
3455  assert (dwb_Global.file_sync_helper_block == NULL);
3456  dwb_log ("dwb_flush_force: Everything flushed\n");
3457  goto end;
3458  }
3459 
3460  if (DWB_IS_MODIFYING_STRUCTURE (initial_position_with_flags))
3461  {
3462  /* DWB structure change started, needs to wait for flush. */
3463  error_code = dwb_wait_for_strucure_modification (thread_p);
3464  if (error_code != NO_ERROR)
3465  {
3466  if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
3467  {
3468  /* timeout, try again */
3469  goto start;
3470  }
3471  dwb_log_error ("dwb_flush_force : Error %d while waiting for structure modification=%lld\n",
3472  error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
3473  return error_code;
3474  }
3475  }
3476  }
3477 
3478  if (DWB_GET_BLOCK_STATUS (initial_position_with_flags) == 0)
3479  {
3480  /* Check helper flush block. */
3481  initial_block_no = DWB_GET_BLOCK_NO_FROM_POSITION (initial_position_with_flags);
3482  initial_block = dwb_Global.file_sync_helper_block;
3483  if (initial_block == NULL)
3484  {
3485  /* Nothing to flush. */
3486  goto end;
3487  }
3488 
3489  goto wait_for_file_sync_helper_block;
3490  }
3491 
3492  /* Search for latest not flushed block - not flushed yet, having highest version. */
3493  initial_block_no = -1;
3494  initial_block_version = 0;
3495  for (block_no = 0; block_no < (int) DWB_NUM_TOTAL_BLOCKS; block_no++)
3496  {
3497  if (DWB_IS_BLOCK_WRITE_STARTED (initial_position_with_flags, block_no)
3498  && (dwb_Global.blocks[block_no].version >= initial_block_version))
3499  {
3500  initial_block_no = block_no;
3501  initial_block_version = dwb_Global.blocks[initial_block_no].version;
3502  }
3503  }
3504 
3505  /* At least one block was not flushed. */
3506  assert (initial_block_no != -1);
3507 
3508  initial_num_pages = dwb_Global.blocks[initial_block_no].count_wb_pages;
3509  if (initial_position_with_flags != ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL))
3510  {
3511  /* The position_with_flags was modified meanwhile by concurrent threads. */
3512  goto start;
3513  }
3514  prev_position_with_flags = initial_position_with_flags;
3515 
3516  max_pages_to_add = DWB_BLOCK_NUM_PAGES - initial_num_pages;
3517 
3518  iopage = (FILEIO_PAGE *) PTR_ALIGN (page_buf, MAX_ALIGNMENT);
3519  memset (iopage, 0, IO_MAX_PAGE_SIZE);
3520  fileio_initialize_res (thread_p, iopage, IO_PAGESIZE);
3521 
3522  dwb_log ("dwb_flush_force: Waits for flushing the block %d having version %lld and %d pages\n",
3523  initial_block_no, initial_block_version, initial_num_pages);
3524 
3525  /* Check whether the initial block was flushed */
3526 check_flushed_blocks:
3527 
3528  assert (initial_block_no >= 0);
3529  if ((ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, 0) > 0)
3530  && (ATOMIC_INC_32 (&dwb_Global.next_block_to_flush, 0) == (unsigned int) initial_block_no)
3531  && (ATOMIC_INC_32 (&dwb_Global.blocks[initial_block_no].count_wb_pages, 0) == DWB_BLOCK_NUM_PAGES))
3532  {
3533  /* The initial block is currently flushing, wait for it. */
3534  error_code = dwb_wait_for_block_completion (thread_p, initial_block_no);
3535  if (error_code != NO_ERROR)
3536  {
3537  if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
3538  {
3539  /* timeout, try again */
3540  goto check_flushed_blocks;
3541  }
3542 
3543  dwb_log_error ("dwb_flush_force : Error %d while waiting for block completion = %lld\n",
3544  error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
3545  return error_code;
3546  }
3547 
3548  goto check_flushed_blocks;
3549  }
3550 
3551  /* Read again the current position and check whether initial block was flushed. */
3552  current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
3553  if (DWB_NOT_CREATED_OR_MODIFYING (current_position_with_flags))
3554  {
3555  if (!DWB_IS_CREATED (current_position_with_flags))
3556  {
3557  /* Nothing to do. Everything flushed. */
3558  assert (dwb_Global.file_sync_helper_block == NULL);
3559  dwb_log ("dwb_flush_force: Everything flushed\n");
3560  goto end;
3561  }
3562 
3563  if (DWB_IS_MODIFYING_STRUCTURE (current_position_with_flags))
3564  {
3565  /* DWB structure change started, needs to wait for flush. */
3566  error_code = dwb_wait_for_strucure_modification (thread_p);
3567  if (error_code != NO_ERROR)
3568  {
3569  if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
3570  {
3571  /* timeout, try again */
3572  goto check_flushed_blocks;
3573  }
3574 
3575  dwb_log_error ("dwb_flush_force : Error %d while waiting for structure modification = %lld\n",
3576  error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
3577  return error_code;
3578  }
3579  }
3580  }
3581 
3582  if (!DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, initial_block_no))
3583  {
3584  /* Check helper flush block. */
3585  goto wait_for_file_sync_helper_block;
3586  }
3587 
3588  /* Check whether initial block content was overwritten. */
3589  current_block_no = DWB_GET_BLOCK_NO_FROM_POSITION (current_position_with_flags);
3590  current_block_version = dwb_Global.blocks[current_block_no].version;
3591 
3592  if ((current_block_no == initial_block_no) && (current_block_version != initial_block_version))
3593  {
3594  assert (current_block_version > initial_block_version);
3595 
3596  /* Check helper flush block. */
3597  goto wait_for_file_sync_helper_block;
3598  }
3599 
3600  if (current_position_with_flags == prev_position_with_flags && count_added_pages < max_pages_to_add)
3601  {
3602  /* The system didn't advanced, add null pages to force flush block. */
3603  dwb_slot = NULL;
3604 
3605  error_code = dwb_add_page (thread_p, iopage, &null_vpid, &dwb_slot);
3606  if (error_code != NO_ERROR)
3607  {
3608  dwb_log_error ("dwb_flush_force : Error %d while adding page = %lld\n",
3609  error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
3610  return error_code;
3611  }
3612  else if (dwb_slot == NULL)
3613  {
3614  /* DWB disabled meanwhile, everything flushed. */
3615  assert (dwb_Global.file_sync_helper_block == NULL);
3616  assert (!DWB_IS_CREATED (ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL)));
3617  dwb_log ("dwb_flush_force: DWB disabled = %lld\n", ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
3618  goto end;
3619  }
3620 
3621  count_added_pages++;
3622  }
3623 
3624  prev_position_with_flags = current_position_with_flags;
3625  goto check_flushed_blocks;
3626 
3627 wait_for_file_sync_helper_block:
3628  dwb_log ("dwb_flush_force: Wait for helper flush = %lld\n", ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
3629  initial_block = &dwb_Global.blocks[initial_block_no];
3630 
3631 #if defined (SERVER_MODE)
3632  while (dwb_Global.file_sync_helper_block == initial_block)
3633  {
3634  /* Wait for file sync helper thread to finish. */
3635  thread_sleep (1);
3636  }
3637 #endif
3638 
3639 end:
3640  *all_sync = true;
3641 
3642  dwb_log ("dwb_flush_force: Ended with position = %lld\n", ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
3643  PERF_UTIME_TRACKER_TIME_AND_RESTART (thread_p, &time_track, PSTAT_DWB_FLUSH_FORCE_TIME_COUNTERS);
3644 
3645 #if !defined (NDEBUG)
3646  if (dwb_Global.blocks != NULL)
3647  {
3648  for (block_no = 0; block_no < (int) DWB_NUM_TOTAL_BLOCKS; block_no++)
3649  {
3650  dwb_log_error ("dwb_flush_force end: Block %d, Num pages = %d, version = %lld\n",
3651  block_no, dwb_Global.blocks[block_no].count_wb_pages, dwb_Global.blocks[block_no].version);
3652  }
3653  }
3654 #endif
3655 
3656  return NO_ERROR;
3657 }
3658 
3659 /*
3660  * dwb_file_sync_helper () - Helps file sync.
3661  *
3662  * return : Error code.
3663  * thread_p (in): Thread entry.
3664  */
3665 static int
3667 {
3668  unsigned int i;
3669  int num_pages, num_pages2, num_pages_to_sync;
3670  DWB_BLOCK *block = NULL;
3671  UINT64 position_with_flags;
3672  FLUSH_VOLUME_INFO *current_flush_volume_info = NULL;
3673  unsigned int count_flush_volumes_info = 0;
3674  bool all_block_pages_written = false, need_wait = false, can_flush_volume = false;
3675  unsigned int start_flush_volume = 0;
3676  int first_partial_flushed_volume = -1;
3677  PERF_UTIME_TRACKER time_track;
3678 
3679  PERF_UTIME_TRACKER_START (thread_p, &time_track);
3680  num_pages_to_sync = prm_get_integer_value (PRM_ID_PB_SYNC_ON_NFLUSH);
3681 
3682  position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
3683  if (!DWB_IS_CREATED (position_with_flags) || DWB_IS_MODIFYING_STRUCTURE (position_with_flags))
3684  {
3685  /* Needs to modify structure. Stop flushing. */
3686  return NO_ERROR;
3687  }
3688 
3689  block = (DWB_BLOCK *) dwb_Global.file_sync_helper_block;
3690  if (block == NULL)
3691  {
3692  return NO_ERROR;
3693  }
3694 
3695  do
3696  {
3697  count_flush_volumes_info = block->count_flush_volumes_info;
3698  all_block_pages_written = block->all_pages_written;
3699  need_wait = false;
3700  first_partial_flushed_volume = -1;
3701 
3702  for (i = start_flush_volume; i < count_flush_volumes_info; i++)
3703  {
3704  current_flush_volume_info = &block->flush_volumes_info[i];
3705 
3706  if (current_flush_volume_info->flushed_status != VOLUME_FLUSHED_BY_DWB_FILE_SYNC_HELPER_THREAD)
3707  {
3708  if (!ATOMIC_CAS_32 (&current_flush_volume_info->flushed_status, VOLUME_NOT_FLUSHED,
3710  {
3711  /* Flushed by DWB flusher, skip it. */
3712  assert_release (current_flush_volume_info->flushed_status == VOLUME_FLUSHED_BY_DWB_FLUSH_THREAD);
3713  continue;
3714  }
3715  }
3716 
3717  /* I'm the flusher of the volume. */
3719 
3720  num_pages = ATOMIC_INC_32 (&current_flush_volume_info->num_pages, 0);
3721  if (num_pages < num_pages_to_sync)
3722  {
3723  if (current_flush_volume_info->all_pages_written == true)
3724  {
3725  if (num_pages == 0)
3726  {
3727  /* Already flushed. */
3728  continue;
3729  }
3730 
3731  /* Needs flushing. */
3732  }
3733  else
3734  {
3735  /* Not enough pages, check the other volumes and retry. */
3736  assert (all_block_pages_written == false);
3737 
3738  if (first_partial_flushed_volume == -1)
3739  {
3740  first_partial_flushed_volume = i;
3741  }
3742 
3743  need_wait = true;
3744  break;
3745  }
3746  }
3747  else if (current_flush_volume_info->all_pages_written == false)
3748  {
3749  if (first_partial_flushed_volume == -1)
3750  {
3751  first_partial_flushed_volume = i;
3752  }
3753  }
3754 
3755  /* Reset the number of pages in volume. */
3756  num_pages2 = ATOMIC_TAS_32 (&current_flush_volume_info->num_pages, 0);
3757  assert_release (num_pages2 >= num_pages);
3758 
3759  /*
3760  * Flush the volume. If not all volume pages are available now, continue with next volume, if any,
3761  * and then resume the current one.
3762  */
3763  (void) fileio_synchronize (thread_p, current_flush_volume_info->vdes, NULL, FILEIO_SYNC_ONLY);
3764 
3765  dwb_log ("dwb_file_sync_helper: Synchronized volume %d\n", current_flush_volume_info->vdes);
3766  }
3767 
3768  /* Set next volume to flush. */
3769  if (first_partial_flushed_volume != -1)
3770  {
3771  assert ((first_partial_flushed_volume >= 0)
3772  && ((unsigned int) first_partial_flushed_volume < count_flush_volumes_info));
3773 
3774  /* Continue with partial flushed volume. */
3775  start_flush_volume = first_partial_flushed_volume;
3776  }
3777  else
3778  {
3779  /* Continue with the next volume. */
3780  start_flush_volume = count_flush_volumes_info;
3781  }
3782 
3783  can_flush_volume = false;
3784  if (count_flush_volumes_info != block->count_flush_volumes_info)
3785  {
3786  /* Do not wait since a new volume arrived. */
3787  can_flush_volume = true;
3788  }
3789  else if (all_block_pages_written == false)
3790  {
3791  /* Not all pages written at the beginning of the iteration, check whether new data arrived. */
3792  if (first_partial_flushed_volume != -1)
3793  {
3794  current_flush_volume_info = &block->flush_volumes_info[first_partial_flushed_volume];
3795 
3796  if ((ATOMIC_INC_32 (&current_flush_volume_info->num_pages, 0) < num_pages_to_sync)
3797  && (current_flush_volume_info->all_pages_written == false))
3798  {
3799  /* Needs more data. */
3800  need_wait = true;
3801  }
3802  else
3803  {
3804  /* New data arrived. */
3805  can_flush_volume = true;
3806  }
3807  }
3808  else if (block->all_pages_written == false)
3809  {
3810  /* Not all pages were written and no volume available for flush yet. */
3811  need_wait = true;
3812  }
3813  else
3814  {
3815  can_flush_volume = true;
3816  }
3817  }
3818 
3819  if (!can_flush_volume)
3820  {
3821  /* Can't flush a volume. Not enough data available or nothing to flush. */
3822  if (need_wait == true)
3823  {
3824  /* Wait for new data. */
3825 #if defined (SERVER_MODE)
3826  thread_sleep (1);
3827 #endif
3828  /* Flush the new arrived data, if is the case. */
3829  can_flush_volume = true;
3830  }
3831  }
3832  }
3833  while (can_flush_volume == true);
3834 
3835 #if !defined (NDEBUG)
3836  if (count_flush_volumes_info != 0)
3837  {
3838  assert (count_flush_volumes_info == block->count_flush_volumes_info);
3839 
3840  for (i = 0; i < count_flush_volumes_info; i++)
3841  {
3842  current_flush_volume_info = &block->flush_volumes_info[i];
3843 
3844  assert ((current_flush_volume_info->all_pages_written == true)
3845  && (current_flush_volume_info->flushed_status != VOLUME_NOT_FLUSHED));
3846  }
3847  }
3848 #endif
3849 
3850  /* Be sure that the helper flush block was not changed by other thread. */
3851  assert (block == dwb_Global.file_sync_helper_block);
3852  (void) ATOMIC_TAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL);
3853 
3854  PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_FILE_SYNC_HELPER_TIME_COUNTERS);
3855 
3856  return NO_ERROR;
3857 }
3858 
3859 /*
3860  * dwb_read_page () - Reads page from DWB.
3861  *
3862  * return : Error code.
3863  * thread_p (in): The thread entry.
3864  * vpid(in): The page identifier.
3865  * io_page(out): In-memory address where the content of the page will be copied.
3866  * success(out): True, if found and read from DWB.
3867  */
3868 int
3869 dwb_read_page (THREAD_ENTRY * thread_p, const VPID * vpid, void *io_page, bool * success)
3870 {
3871  DWB_SLOTS_HASH_ENTRY *slots_hash_entry = NULL;
3872  int error_code = NO_ERROR;
3873 
3874  assert (vpid != NULL && io_page != NULL && success != NULL);
3875 
3876  *success = false;
3877 
3878  if (!dwb_is_created ())
3879  {
3880  return NO_ERROR;
3881  }
3882 
3883  VPID key_vpid = *vpid;
3884  slots_hash_entry = dwb_Global.slots_hashmap.find (thread_p, key_vpid);
3885  if (slots_hash_entry != NULL)
3886  {
3887  assert (slots_hash_entry->slot->io_page != NULL);
3888 
3889  /* Check whether the slot data changed meanwhile. */
3890  if (VPID_EQ (&slots_hash_entry->slot->vpid, vpid))
3891  {
3892  memcpy ((char *) io_page, (char *) slots_hash_entry->slot->io_page, IO_PAGESIZE);
3893 
3894  /* Be sure that no other transaction has modified slot data meanwhile. */
3895  assert (slots_hash_entry->slot->io_page->prv.pageid == vpid->pageid
3896  && slots_hash_entry->slot->io_page->prv.volid == vpid->volid);
3897 
3898  *success = true;
3899  }
3900 
3901  pthread_mutex_unlock (&slots_hash_entry->mutex);
3902  }
3903 
3904  return NO_ERROR;
3905 }
3906 
3907 // *INDENT-OFF*
3908 #if defined(SERVER_MODE)
3909 // class dwb_flush_block_daemon_task
3910 //
3911 // description:
3912 // dwb flush block daemon task
3913 //
3914 class dwb_flush_block_daemon_task: public cubthread::entry_task
3915 {
3916  private:
3917  PERF_UTIME_TRACKER m_perf_track;
3918 
3919  public:
3920  dwb_flush_block_daemon_task ()
3921  {
3922  PERF_UTIME_TRACKER_START (NULL, &m_perf_track);
3923  }
3924 
3925  void execute (cubthread::entry &thread_ref) override
3926  {
3927  if (!BO_IS_SERVER_RESTARTED ())
3928  {
3929  // wait for boot to finish
3930  return;
3931  }
3932 
3933  /* performance tracking */
3934  PERF_UTIME_TRACKER_TIME (NULL, &m_perf_track, PSTAT_DWB_FLUSH_BLOCK_COND_WAIT);
3935 
3936  /* flush pages as long as necessary */
3938  {
3939  if (dwb_flush_next_block (&thread_ref) != NO_ERROR)
3940  {
3941  assert_release (false);
3942  }
3943  }
3944 
3945  PERF_UTIME_TRACKER_START (&thread_ref, &m_perf_track);
3946  }
3947 };
3948 
3949 // class dwb_file_sync_helper_daemon_task
3950 //
3951 // description:
3952 // dwb file sync helper daemon task
3953 //
3954 void
3955 dwb_file_sync_helper_execute (cubthread::entry &thread_ref)
3956 {
3957  if (!BO_IS_SERVER_RESTARTED ())
3958  {
3959  // wait for boot to finish
3960  return;
3961  }
3962 
3963  /* flush pages as long as necessary */
3965  {
3966  dwb_file_sync_helper (&thread_ref);
3967  }
3968 }
3969 
3970 /*
3971  * dwb_flush_block_daemon_init () - initialize DWB flush block daemon thread
3972  */
3973 void
3974 dwb_flush_block_daemon_init ()
3975 {
3976  cubthread::looper looper = cubthread::looper (std::chrono::milliseconds (1));
3977  dwb_flush_block_daemon_task *daemon_task = new dwb_flush_block_daemon_task ();
3978 
3979  dwb_flush_block_daemon = cubthread::get_manager ()->create_daemon (looper, daemon_task);
3980 }
3981 
3982 /*
3983  * dwb_file_sync_helper_daemon_init () - initialize DWB file sync helper daemon thread
3984  */
3985 void
3986 dwb_file_sync_helper_daemon_init ()
3987 {
3988  cubthread::looper looper = cubthread::looper (std::chrono::milliseconds (10));
3989  cubthread::entry_callable_task *daemon_task = new cubthread::entry_callable_task (dwb_file_sync_helper_execute);
3990 
3991  dwb_file_sync_helper_daemon = cubthread::get_manager ()->create_daemon (looper, daemon_task);
3992 }
3993 
3994 /*
3995  * dwb_daemons_init () - initialize DWB daemon threads
3996  */
3997 void
3998 dwb_daemons_init ()
3999 {
4000  dwb_flush_block_daemon_init ();
4001  dwb_file_sync_helper_daemon_init ();
4002 }
4003 
4004 /*
4005  * dwb_daemons_destroy () - destroy DWB daemon threads
4006  */
4007 void
4008 dwb_daemons_destroy ()
4009 {
4010  cubthread::get_manager ()->destroy_daemon (dwb_flush_block_daemon);
4011  cubthread::get_manager ()->destroy_daemon (dwb_file_sync_helper_daemon);
4012 }
4013 #endif /* SERVER_MODE */
4014 // *INDENT-ON*
4015 
4016 /*
4017  * dwb_is_flush_block_daemon_available () - Check if flush block daemon is available
4018  * return: true if flush block daemon is available, false otherwise
4019  */
4020 static bool
4022 {
4023 #if defined (SERVER_MODE)
4024  return prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && dwb_flush_block_daemon != NULL;
4025 #else
4026  return false;
4027 #endif
4028 }
4029 
4030 /*
4031  * dwb_is_file_sync_helper_daemon_available () - Check if file sync helper daemon is available
4032  * return: true if file sync helper daemon is available, false otherwise
4033  */
4034 static bool
4036 {
4037 #if defined (SERVER_MODE)
4038  return prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && dwb_file_sync_helper_daemon != NULL;
4039 #else
4040  return false;
4041 #endif
4042 }
4043 
4044 /*
4045  * dwb_flush_block_daemon_is_running () - Check whether flush block daemon is running
4046  *
4047  * return: true, if flush block thread is running
4048  */
4049 static bool
4051 {
4052 #if defined (SERVER_MODE)
4053  return (prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && (dwb_flush_block_daemon != NULL)
4054  && (dwb_flush_block_daemon->is_running ()));
4055 #else
4056  return false;
4057 #endif /* SERVER_MODE */
4058 }
4059 
4060 /*
4061  * dwb_file_sync_helper_daemon_is_running () - Check whether file sync helper daemon is running
4062  *
4063  * return: true, if file sync helper thread is running
4064  */
4065 static bool
4067 {
4068 #if defined (SERVER_MODE)
4069  return (prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && (dwb_file_sync_helper_daemon != NULL)
4070  && (dwb_file_sync_helper_daemon->is_running ()));
4071 #else
4072  return false;
4073 #endif /* SERVER_MODE */
4074 }
#define DWB_NUM_TOTAL_BLOCKS
#define DWB_IS_ANY_BLOCK_WRITE_STARTED(position_with_flags)
bool logpb_need_wal(const LOG_LSA *lsa)
cubthread::entry * thread_get_thread_entry_info(void)
#define NO_ERROR
Definition: error_code.h:46
#define __attribute__(X)
Definition: porting.h:36
STATIC_INLINE int dwb_slots_hash_delete(THREAD_ENTRY *thread_p, DWB_SLOT *slot)
#define IO_PAGESIZE
#define BO_IS_SERVER_RESTARTED()
Definition: boot_sr.h:84
STATIC_INLINE void dwb_destroy_wait_queue(DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex)
dwb_hashmap_type slots_hashmap
STATIC_INLINE int dwb_set_status_resumed(void *data) __attribute__((ALWAYS_INLINE))
#define ASSERT_ERROR()
pthread_mutex_t mutex
#define ER_CSS_PTHREAD_COND_TIMEDOUT
Definition: error_code.h:1428
bool LSA_EQ(const log_lsa *plsa1, const log_lsa *plsa2)
Definition: log_lsa.hpp:160
int dwb_recreate(THREAD_ENTRY *thread_p)
void LSA_COPY(log_lsa *plsa1, const log_lsa *plsa2)
Definition: log_lsa.hpp:139
#define DWB_MAX_SIZE
static int dwb_debug_check_dwb(THREAD_ENTRY *thread_p, DWB_SLOT *p_dwb_ordered_slots, unsigned int num_pages)
static int dwb_slots_hash_key_copy(void *src, void *dest)
#define VPID_COPY(dest_ptr, src_ptr)
Definition: dbtype_def.h:909
void * fileio_read(THREAD_ENTRY *thread_p, int vol_fd, void *io_page_p, PAGEID page_id, size_t page_size)
Definition: file_io.c:3950
volatile FLUSH_VOLUME_STATUS flushed_status
static API_MUTEX mutex
Definition: api_util.c:72
#define ER_FAILED
Definition: error_code.h:47
LOG_GLOBAL log_Gl
#define ALWAYS_INLINE
#define DWB_ENDS_BLOCK_WRITING(position_with_flags, block_no)
static int write_buffer(SOCKET sock_fd, const char *buf, int size)
Definition: cas_network.c:616
int dwb_load_and_recover_pages(THREAD_ENTRY *thread_p, const char *dwb_path_p, const char *db_name_p)
bool dwb_is_created(void)
void fileio_unformat(THREAD_ENTRY *thread_p, const char *vol_label_p)
Definition: file_io.c:2721
#define DWB_ENDS_MODIFYING_STRUCTURE(position_with_flags)
STATIC_INLINE int dwb_create_internal(THREAD_ENTRY *thread_p, const char *dwb_volume_name, UINT64 *current_position_with_flags) __attribute__((ALWAYS_INLINE))
#define ASSERT_ERROR_AND_SET(error_code)
static void * dwb_slots_hash_entry_alloc(void)
void thread_sleep(double millisec)
STATIC_INLINE int dwb_slots_hash_insert(THREAD_ENTRY *thread_p, VPID *vpid, DWB_SLOT *slot, bool *inserted) __attribute__((ALWAYS_INLINE))
Definition: lock_free.h:63
#define assert_release(e)
Definition: error_manager.h:96
void thread_wakeup_already_had_mutex(cubthread::entry *thread_p, thread_resume_suspend_status resume_reason)
dwb_slots_hash_entry()
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY * dwb_make_wait_queue_entry(DWB_WAIT_QUEUE *wait_queue, void *data) __attribute__((ALWAYS_INLINE))
INT16 VOLID
static bool dwb_flush_block_daemon_is_running(void)
static int dwb_compare_vol_fd(const void *v1, const void *v2)
STATIC_INLINE void dwb_initialize_slot(DWB_SLOT *slot, FILEIO_PAGE *io_page, unsigned int position_in_block, unsigned int block_no) __attribute__((ALWAYS_INLINE))
STATIC_INLINE void dwb_initialize_block(DWB_BLOCK *block, unsigned int block_no, unsigned int count_wb_pages, char *write_buffer, DWB_SLOT *slots, FLUSH_VOLUME_INFO *flush_volumes_info, unsigned int count_flush_volumes_info, unsigned int max_to_flush_vdes) __attribute__((ALWAYS_INLINE))
bool fileio_is_volume_exist(const char *vol_label_p)
Definition: file_io.c:5094
int dwb_create(THREAD_ENTRY *thread_p, const char *dwb_path_p, const char *db_name_p)
int32_t pageid
Definition: dbtype_def.h:879
#define diff
Definition: mprec.h:352
int er_errid(void)
DWB_SLOTS_HASH_ENTRY * next
int dwb_read_page(THREAD_ENTRY *thread_p, const VPID *vpid, void *io_page, bool *success)
#define PTR_ALIGN(addr, boundary)
Definition: memory_alloc.h:77
#define IS_POWER_OF_2(x)
bool LSA_LT(const log_lsa *plsa1, const log_lsa *plsa2)
Definition: log_lsa.hpp:174
unsigned int log2_num_block_pages
#define DWB_MAX_BLOCKS
STATIC_INLINE void dwb_init_wait_queue(DWB_WAIT_QUEUE *wait_queue) __attribute__((ALWAYS_INLINE))
#define NULL_VOLDES
Definition: file_io.h:44
#define MAX_ALIGNMENT
Definition: memory_alloc.h:70
int fileio_mount(THREAD_ENTRY *thread_p, const char *db_full_name_p, const char *vol_label_p, VOLID vol_id, int lock_wait, bool is_do_sync)
Definition: file_io.c:2957
void * fileio_read_pages(THREAD_ENTRY *thread_p, int vol_fd, char *io_pages_p, PAGEID page_id, int num_pages, size_t page_size)
Definition: file_io.c:4227
bool find_or_insert(cubthread::entry *thread_p, Key &key, T *&t)
DWB_SLOT * slot
STATIC_INLINE void dwb_init_slot(DWB_SLOT *slot) __attribute__((ALWAYS_INLINE))
void THREAD_ENTRY
#define NULL_PAGEID
DWB_WAIT_QUEUE_ENTRY * free_list
volatile unsigned int next_block_to_flush
STATIC_INLINE void dwb_signal_block_completion(THREAD_ENTRY *thread_p, DWB_BLOCK *dwb_block) __attribute__((ALWAYS_INLINE))
STATIC_INLINE int dwb_signal_waiting_thread(void *data) __attribute__((ALWAYS_INLINE))
manager * get_manager(void)
const char * boot_db_full_name()
Definition: boot_sr.c:470
UINT64 volatile position_with_flags
static int dwb_slots_hash_compare_key(void *key1, void *key2)
#define VPID_SET(vpid_ptr, volid_value, pageid_value)
Definition: dbtype_def.h:899
void er_set(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
FLUSH_VOLUME_INFO * flush_volumes_info
#define assert(x)
char dwb_Volume_name[PATH_MAX]
static LF_ENTRY_DESCRIPTOR slots_entry_Descriptor
LOG_APPEND_INFO append
Definition: log_impl.h:651
static bool dwb_is_file_sync_helper_daemon_available(void)
#define DWB_STARTS_CREATION(position_with_flags)
static int dwb_check_data_page_is_sane(THREAD_ENTRY *thread_p, DWB_BLOCK *rcv_block, DWB_SLOT *p_dwb_ordered_slots, int *p_num_recoverable_pages)
#define DWB_IS_MODIFYING_STRUCTURE(position_with_flags)
DWB_WAIT_QUEUE_ENTRY * head
int prm_get_integer_value(PARAM_ID prm_id)
T * find(cubthread::entry *thread_p, Key &key)
#define STATIC_INLINE
#define pthread_mutex_destroy(a)
#define DWB_MIN_BLOCKS
STATIC_INLINE void dwb_remove_wait_queue_entry(DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex, void *data, int(*func)(void *)) __attribute__((ALWAYS_INLINE))
bool erase_locked(cubthread::entry *thread_p, Key &key, T *&t)
#define ER_OUT_OF_VIRTUAL_MEMORY
Definition: error_code.h:50
#define DWB_GET_PREV_BLOCK(block_no)
#define DWB_STARTS_BLOCK_WRITING(position_with_flags, block_no)
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY * dwb_block_disconnect_wait_queue_entry(DWB_WAIT_QUEUE *wait_queue, void *data) __attribute__((ALWAYS_INLINE))
#define DWB_SLOTS_FREE_LIST_SIZE
STATIC_INLINE int dwb_block_create_ordered_slots(DWB_BLOCK *block, DWB_SLOT **p_dwb_ordered_slots, unsigned int *p_ordered_slots_length) __attribute__((ALWAYS_INLINE))
bool LSA_LE(const log_lsa *plsa1, const log_lsa *plsa2)
Definition: log_lsa.hpp:167
LF_TRAN_SYSTEM dwb_slots_Ts
Definition: lock_free.c:56
char * dwb_get_volume_name(void)
#define VPID_EQ(vpid_ptr1, vpid_ptr2)
Definition: dbtype_def.h:915
short volid
Definition: dbtype_def.h:880
volatile bool all_pages_written
#define DWB_STARTS_MODIFYING_STRUCTURE(position_with_flags)
STATIC_INLINE void dwb_signal_waiting_threads(DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex) __attribute__((ALWAYS_INLINE))
DWB_WAIT_QUEUE_ENTRY * next
void fileio_dismount(THREAD_ENTRY *thread_p, int vol_fd)
Definition: file_io.c:3134
#define DWB_RESET_POSITION(position_with_flags)
std::int64_t pageid
Definition: log_lsa.hpp:36
int fileio_synchronize_all(THREAD_ENTRY *thread_p, bool is_include)
Definition: file_io.c:4618
static int dwb_flush_next_block(THREAD_ENTRY *thread_p)
volatile UINT64 version
STATIC_INLINE void dwb_destroy_internal(THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags) __attribute__((ALWAYS_INLINE))
#define NULL
Definition: freelistheap.h:34
STATIC_INLINE void dwb_ends_structure_modification(THREAD_ENTRY *thread_p, UINT64 current_position_with_flags) __attribute__((ALWAYS_INLINE))
if(extra_options)
Definition: dynamic_load.c:958
STATIC_INLINE bool perfmon_is_perf_tracking_and_active(int activation_flag) __attribute__((ALWAYS_INLINE))
#define DWB_GET_BLOCK_NO_FROM_POSITION(position_with_flags)
#define ER_IO_MOUNT_FAIL
Definition: error_code.h:59
static int success()
bool LSA_ISNULL(const log_lsa *lsa_ptr)
Definition: log_lsa.hpp:153
void init(lf_tran_system &transys, int entry_idx, int hash_size, int freelist_block_size, int freelist_block_count, lf_entry_descriptor &edesc)
#define pthread_mutex_init(a, b)
#define SERVER_MODE
STATIC_INLINE void dwb_finalize_block(DWB_BLOCK *block) __attribute__((ALWAYS_INLINE))
static DOUBLE_WRITE_BUFFER dwb_Global
STATIC_INLINE void dwb_signal_structure_modificated(THREAD_ENTRY *thread_p) __attribute__((ALWAYS_INLINE))
void thread_lock_entry(cubthread::entry *thread_p)
#define DWB_NOT_CREATED_OR_MODIFYING(position_with_flags)
volatile bool all_pages_written
#define DWB_BLOCK_NUM_PAGES
bool logtb_set_check_interrupt(THREAD_ENTRY *thread_p, bool flag)
const VOLID LOG_DBDWB_VOLID
Definition: log_volids.hpp:57
#define DWB_GET_BLOCK_STATUS(position_with_flags)
int timeval_to_timespec(struct timespec *to, const struct timeval *from)
Definition: porting.c:2173
~dwb_slots_hash_entry()
int dwb_set_data_on_next_slot(THREAD_ENTRY *thread_p, FILEIO_PAGE *io_page_p, bool can_wait, DWB_SLOT **p_dwb_slot)
static int dwb_file_sync_helper(THREAD_ENTRY *thread_p)
STATIC_INLINE int dwb_wait_for_strucure_modification(THREAD_ENTRY *thread_p) __attribute__((ALWAYS_INLINE))
DWB_WAIT_QUEUE_ENTRY * tail
#define DWB_IS_CREATED(position_with_flags)
STATIC_INLINE int dwb_wait_for_block_completion(THREAD_ENTRY *thread_p, unsigned int block_no) __attribute__((ALWAYS_INLINE))
static DB_VALUE * new_block(long n)
Definition: set_object.c:224
#define VPID_ISNULL(vpid_ptr)
Definition: dbtype_def.h:925
void thread_unlock_entry(cubthread::entry *thread_p)
void * fileio_write(THREAD_ENTRY *thread_p, int vol_fd, void *io_page_p, PAGEID page_id, size_t page_size, FILEIO_WRITE_MODE write_mode)
Definition: file_io.c:4150
DWB_SLOTS_HASH_ENTRY * stack
#define ER_INTERRUPTED
Definition: error_code.h:51
#define ARG_FILE_LINE
Definition: error_manager.h:44
void destroy_daemon(daemon *&daemon_arg)
STATIC_INLINE FLUSH_VOLUME_INFO * dwb_add_volume_to_block_flush_area(THREAD_ENTRY *thread_p, DWB_BLOCK *block, int vol_fd) __attribute__((ALWAYS_INLINE))
#define DWB_WAIT_QUEUE_INITIALIZER
#define DWB_SLOTS_HASH_SIZE
STATIC_INLINE int dwb_acquire_next_slot(THREAD_ENTRY *thread_p, bool can_wait, DWB_SLOT **p_dwb_slot) __attribute__((ALWAYS_INLINE))
int fileio_synchronize(THREAD_ENTRY *thread_p, int vol_fd, const char *vlabel, FILEIO_SYNC_OPTION sync_dwb)
Definition: file_io.c:4441
DKNPAGES fileio_get_number_of_volume_pages(int vol_fd, size_t page_size)
Definition: file_io.c:4918
#define dwb_check_logging()
void * data
#define free_and_init(ptr)
Definition: memory_alloc.h:147
#define DWB_IS_BLOCK_WRITE_STARTED(position_with_flags, block_no)
#define pthread_mutex_lock(a)
void LSA_SET_NULL(log_lsa *lsa_ptr)
Definition: log_lsa.hpp:146
STATIC_INLINE void dwb_block_free_wait_queue_entry(DWB_WAIT_QUEUE *wait_queue, DWB_WAIT_QUEUE_ENTRY *wait_queue_entry, int(*func)(void *)) __attribute__((ALWAYS_INLINE))
STATIC_INLINE void dwb_adjust_write_buffer_values(unsigned int *p_double_write_buffer_size, unsigned int *p_num_blocks) __attribute__((ALWAYS_INLINE))
#define const
Definition: cnvlex.c:77
void fileio_make_dwb_name(char *dwb_name_p, const char *dwb_path_p, const char *db_name_p)
Definition: file_io.c:5881
VPID vpid
volatile unsigned int blocks_flush_counter
STATIC_INLINE int dwb_write_block(THREAD_ENTRY *thread_p, DWB_BLOCK *block, DWB_SLOT *p_dwb_slots, unsigned int ordered_slots_length, bool file_sync_helper_can_flush, bool remove_from_hash) __attribute__((ALWAYS_INLINE))
volatile unsigned int count_flush_volumes_info
bool prm_get_bool_value(PARAM_ID prm_id)
UINT64 del_id
int thread_suspend_timeout_wakeup_and_unlock_entry(cubthread::entry *thread_p, struct timespec *time_p, thread_resume_suspend_status suspended_reason)
int fileio_format(THREAD_ENTRY *thread_p, const char *db_full_name_p, const char *vol_label_p, VOLID vol_id, DKNPAGES npages, bool is_sweep_clean, bool is_do_lock, bool is_do_sync, size_t page_size, int kbytes_to_be_written_per_sec, bool reuse_file)
Definition: file_io.c:2314
STATIC_INLINE void dwb_get_next_block_for_flush(THREAD_ENTRY *thread_p, unsigned int *block_no) __attribute__((ALWAYS_INLINE))
static int dwb_slots_hash_entry_init(void *entry)
unsigned int position_in_block
int i
Definition: dynamic_load.c:954
int dwb_flush_force(THREAD_ENTRY *thread_p, bool *all_sync)
static bool dwb_file_sync_helper_daemon_is_running(void)
volatile unsigned int count_wb_pages
static unsigned int dwb_slots_hash_key(void *key, int hash_table_size)
void * fileio_write_pages(THREAD_ENTRY *thread_p, int vol_fd, char *io_pages_p, PAGEID page_id, int num_pages, size_t page_size, FILEIO_WRITE_MODE write_mode)
Definition: file_io.c:4314
void fileio_initialize_res(THREAD_ENTRY *thread_p, FILEIO_PAGE *io_page, PGLENGTH page_size)
Definition: file_io.c:11577
STATIC_INLINE void perfmon_add_stat(THREAD_ENTRY *thread_p, PERF_STAT_ID psid, UINT64 amount) __attribute__((ALWAYS_INLINE))
#define NULL_VOLID
LOG_LSA get_nxio_lsa() const
Definition: log_append.cpp:106
STATIC_INLINE int fileio_is_page_sane(FILEIO_PAGE *io_page, PGLENGTH page_size)
Definition: file_io.h:237
#define IO_MAX_PAGE_SIZE
STATIC_INLINE int dwb_create_blocks(THREAD_ENTRY *thread_p, unsigned int num_blocks, unsigned int num_block_pages, DWB_BLOCK **p_blocks) __attribute__((ALWAYS_INLINE))
int dwb_destroy(THREAD_ENTRY *thread_p)
#define dwb_log(...)
#define DWB_ENDS_CREATION(position_with_flags)
unsigned int max_to_flush_vdes
STATIC_INLINE void dwb_set_slot_data(THREAD_ENTRY *thread_p, DWB_SLOT *dwb_slot, FILEIO_PAGE *io_page_p) __attribute__((ALWAYS_INLINE))
#define DWB_GET_PREV_BLOCK_NO(block_no)
DWB_WAIT_QUEUE wait_queue
STATIC_INLINE int dwb_starts_structure_modification(THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags) __attribute__((ALWAYS_INLINE))
static int dwb_compare_slots(const void *arg1, const void *arg2)
DWB_BLOCK *volatile file_sync_helper_block
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY * dwb_block_add_wait_queue_entry(DWB_WAIT_QUEUE *wait_queue, void *data) __attribute__((ALWAYS_INLINE))
struct double_write_slot DWB_SLOT
daemon * create_daemon(const looper &looper_arg, entry_task *exec_p, const char *daemon_name="", entry_manager *context_manager=NULL)
#define pthread_mutex_unlock(a)
int dwb_add_page(THREAD_ENTRY *thread_p, FILEIO_PAGE *io_page_p, VPID *vpid, DWB_SLOT **p_dwb_slot)
FILEIO_PAGE_RESERVED prv
Definition: file_io.h:194
#define DWB_GET_NEXT_BLOCK_NO(block_no)
#define VPID_LT(vpid_ptr1, vpid_ptr2)
Definition: dbtype_def.h:919
#define dwb_log_error(...)
int timeval_add_msec(struct timeval *added_time, const struct timeval *start_time, int msec)
Definition: porting.c:2152
callable_task< entry > entry_callable_task
#define DWB_MIN_SIZE
#define VPID_SET_NULL(vpid_ptr)
Definition: dbtype_def.h:906
STATIC_INLINE int dwb_flush_block(THREAD_ENTRY *thread_p, DWB_BLOCK *block, bool file_sync_helper_can_flush, UINT64 *current_position_with_flags) __attribute__((ALWAYS_INLINE))
FLUSH_VOLUME_STATUS
#define DWB_GET_POSITION_IN_BLOCK(position_with_flags)
#define LF_EM_USING_MUTEX
Definition: lock_free.h:60
int fileio_page_check_corruption(THREAD_ENTRY *thread_p, FILEIO_PAGE *io_page, bool *is_page_corrupted)
Definition: file_io.c:11831
std::int64_t offset
Definition: log_lsa.hpp:37
#define ER_DWB_DISABLED
Definition: error_code.h:1588
#define DWB_GET_NEXT_POSITION_WITH_FLAGS(position_with_flags)
static bool dwb_is_flush_block_daemon_available(void)
int fileio_get_volume_descriptor(VOLID vol_id)
Definition: file_io.c:6488
static int dwb_slots_hash_entry_free(void *entry)