File double_write_buffer.cpp¶
File List > cubrid > src > storage > double_write_buffer.cpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* double_write_buffer.cpp - double write buffer
*/
#ident "$Id$"
#include <assert.h>
#include <math.h>
#include "double_write_buffer.hpp"
#include "system_parameter.h"
#if defined (SERVER_MODE)
#include "thread_daemon.hpp"
#endif
#include "thread_entry_task.hpp"
#include "thread_lockfree_hash_map.hpp"
#include "thread_manager.hpp"
#include "log_append.hpp"
#include "log_impl.h"
#include "log_volids.hpp"
#include "boot_sr.h"
#include "perf_monitor.h"
#include "porting_inline.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#define DWB_SLOTS_HASH_SIZE 1000
#define DWB_SLOTS_FREE_LIST_SIZE 100
/* These values must be power of two. */
#define DWB_MIN_SIZE (512 * 1024)
#define DWB_MAX_SIZE (32 * 1024 * 1024)
#define DWB_MIN_BLOCKS 1
#define DWB_MAX_BLOCKS 32
/* The total number of blocks. */
#define DWB_NUM_TOTAL_BLOCKS (dwb_Global.num_blocks)
/* The total number of pages. */
#define DWB_NUM_TOTAL_PAGES (dwb_Global.num_pages)
/* The number of pages in each block. */
#define DWB_BLOCK_NUM_PAGES (dwb_Global.num_block_pages)
/* LOG2 from total number of blocks. */
#define DWB_LOG2_BLOCK_NUM_PAGES (dwb_Global.log2_num_block_pages)
/* Position mask. */
#define DWB_POSITION_MASK 0x000000003fffffff
/* Block status mask. */
#define DWB_BLOCKS_STATUS_MASK 0xffffffff00000000
/* Structure modification mask. */
#define DWB_MODIFY_STRUCTURE 0x0000000080000000
/* Create mask. */
#define DWB_CREATE 0x0000000040000000
/* Create or modify mask. */
#define DWB_CREATE_OR_MODIFY_MASK (DWB_CREATE | DWB_MODIFY_STRUCTURE)
/* Flags mask. */
#define DWB_FLAG_MASK (DWB_BLOCKS_STATUS_MASK | DWB_MODIFY_STRUCTURE | DWB_CREATE)
/* Get DWB position. */
#define DWB_GET_POSITION(position_with_flags) \
((position_with_flags) & DWB_POSITION_MASK)
/* Reset DWB position. */
#define DWB_RESET_POSITION(position_with_flags) \
((position_with_flags) & DWB_FLAG_MASK)
/* Get DWB block status. */
#define DWB_GET_BLOCK_STATUS(position_with_flags) \
((position_with_flags) & DWB_BLOCKS_STATUS_MASK)
/* Get block number from DWB position. */
#define DWB_GET_BLOCK_NO_FROM_POSITION(position_with_flags) \
((unsigned int) DWB_GET_POSITION (position_with_flags) >> (DWB_LOG2_BLOCK_NUM_PAGES))
/* Checks whether writing in a DWB block was started. */
#define DWB_IS_BLOCK_WRITE_STARTED(position_with_flags, block_no) \
(assert (block_no < DWB_MAX_BLOCKS), ((position_with_flags) & (1ULL << (63 - (block_no)))) != 0)
/* Checks whether any DWB block was started. */
#define DWB_IS_ANY_BLOCK_WRITE_STARTED(position_with_flags) \
(((position_with_flags) & DWB_BLOCKS_STATUS_MASK) != 0)
/* Starts DWB block writing. */
#define DWB_STARTS_BLOCK_WRITING(position_with_flags, block_no) \
(assert (block_no < DWB_MAX_BLOCKS), (position_with_flags) | (1ULL << (63 - (block_no))))
/* Ends DWB block writing. */
#define DWB_ENDS_BLOCK_WRITING(position_with_flags, block_no) \
(assert (DWB_IS_BLOCK_WRITE_STARTED (position_with_flags, block_no)), \
(position_with_flags) & ~(1ULL << (63 - (block_no))))
/* Starts modifying DWB structure. */
#define DWB_STARTS_MODIFYING_STRUCTURE(position_with_flags) \
((position_with_flags) | DWB_MODIFY_STRUCTURE)
/* Ends modifying DWB structure. */
#define DWB_ENDS_MODIFYING_STRUCTURE(position_with_flags) \
(assert (DWB_IS_MODIFYING_STRUCTURE (position_with_flags)), (position_with_flags) & ~DWB_MODIFY_STRUCTURE)
/* Check whether the DWB structure is modifying. */
#define DWB_IS_MODIFYING_STRUCTURE(position_with_flags) \
(((position_with_flags) & DWB_MODIFY_STRUCTURE) != 0)
/* Starts DWB creation. */
#define DWB_STARTS_CREATION(position_with_flags) \
((position_with_flags) | DWB_CREATE)
/* Starts DWB creation. */
#define DWB_ENDS_CREATION(position_with_flags) \
(assert (DWB_IS_CREATED (position_with_flags)), (position_with_flags) & ~DWB_CREATE)
/* Check whether the DWB was created. */
#define DWB_IS_CREATED(position_with_flags) \
(((position_with_flags) & DWB_CREATE) != 0)
/* Check whether the DWB structure is not created or is modifying. */
#define DWB_NOT_CREATED_OR_MODIFYING(position_with_flags) \
(((position_with_flags) & DWB_CREATE_OR_MODIFY_MASK) != DWB_CREATE)
/* Get next DWB position with flags. */
#define DWB_GET_NEXT_POSITION_WITH_FLAGS(position_with_flags) \
((DWB_GET_POSITION (position_with_flags)) == (DWB_NUM_TOTAL_PAGES - 1) \
? ((position_with_flags) & DWB_FLAG_MASK) : ((position_with_flags) + 1))
/* Get position in DWB block from DWB position with flags. */
#define DWB_GET_POSITION_IN_BLOCK(position_with_flags) \
((DWB_GET_POSITION (position_with_flags)) & (DWB_BLOCK_NUM_PAGES - 1))
/* Get DWB previous block number. */
#define DWB_GET_PREV_BLOCK_NO(block_no) \
((block_no) > 0 ? ((block_no) - 1) : (DWB_NUM_TOTAL_BLOCKS - 1))
/* Get DWB previous block. */
#define DWB_GET_PREV_BLOCK(block_no) \
(&(dwb_Global.blocks[DWB_GET_PREV_BLOCK_NO(block_no)]))
/* Get DWB next block number. */
#define DWB_GET_NEXT_BLOCK_NO(block_no) \
((block_no) == (DWB_NUM_TOTAL_BLOCKS - 1) ? 0 : ((block_no) + 1))
/* Get block version. */
#define DWB_GET_BLOCK_VERSION(block) \
(ATOMIC_INC_64 (&block->version, 0ULL))
/* Queue entry. */
typedef struct double_write_wait_queue_entry DWB_WAIT_QUEUE_ENTRY;
struct double_write_wait_queue_entry
{
void *data; /* The data field. */
DWB_WAIT_QUEUE_ENTRY *next; /* The next queue entry field. */
};
/* DWB queue. */
typedef struct double_write_wait_queue DWB_WAIT_QUEUE;
struct double_write_wait_queue
{
DWB_WAIT_QUEUE_ENTRY *head; /* Queue head. */
DWB_WAIT_QUEUE_ENTRY *tail; /* Queue tail. */
DWB_WAIT_QUEUE_ENTRY *free_list; /* Queue free list */
int count; /* Count queue elements. */
int free_count; /* Count free list elements. */
};
#define DWB_WAIT_QUEUE_INITIALIZER {NULL, NULL, NULL, 0, 0}
/* Flush volume status. */
typedef enum
{
VOLUME_NOT_FLUSHED,
VOLUME_FLUSHED_BY_DWB_FILE_SYNC_HELPER_THREAD,
VOLUME_FLUSHED_BY_DWB_FLUSH_THREAD
} FLUSH_VOLUME_STATUS;
typedef struct flush_volume_info FLUSH_VOLUME_INFO;
struct flush_volume_info
{
int vdes; /* The volume descriptor. */
volatile int num_pages; /* The number of pages to flush in volume. */
volatile bool all_pages_written; /* True, if all pages written. */
volatile bool metadata; /* Include metadata when syncing */
volatile FLUSH_VOLUME_STATUS flushed_status; /* Flush status. */
};
/* DWB block. */
typedef struct double_write_block DWB_BLOCK;
struct double_write_block
{
FLUSH_VOLUME_INFO *flush_volumes_info; /* Information about volumes to flush. */
volatile unsigned int count_flush_volumes_info; /* Count volumes to flush. */
unsigned int max_to_flush_vdes; /* Maximum volumes to flush. */
pthread_mutex_t mutex; /* The mutex to protect the queue. */
DWB_WAIT_QUEUE wait_queue; /* The wait queue for the current block. */
char *write_buffer; /* The block write buffer, used to write all pages once. */
DWB_SLOT *slots; /* The slots containing the data. Used to write individual pages. */
volatile unsigned int count_wb_pages; /* Count the pages added to write buffer. */
unsigned int block_no; /* The block number. */
volatile UINT64 version; /* The block version. */
volatile bool all_pages_written; /* True, if all pages are written */
};
/* Slots hash entry. */
typedef struct dwb_slots_hash_entry DWB_SLOTS_HASH_ENTRY;
struct dwb_slots_hash_entry
{
VPID vpid; /* Page VPID. */
DWB_SLOTS_HASH_ENTRY *stack; /* Used in freelist. */
DWB_SLOTS_HASH_ENTRY *next; /* Used in hash table. */
pthread_mutex_t mutex; /* The mutex. */
UINT64 del_id; /* Delete transaction ID (for lock free). */
DWB_SLOT *slot; /* DWB slot containing a page. */
dwb_slots_hash_entry ()
{
pthread_mutex_init (&mutex, NULL);
}
~dwb_slots_hash_entry ()
{
pthread_mutex_destroy (&mutex);
}
};
/* Hash that store all pages in DWB. */
using dwb_hashmap_type = cubthread::lockfree_hashmap<VPID, dwb_slots_hash_entry>;
/* The double write buffer structure. */
typedef struct double_write_buffer DOUBLE_WRITE_BUFFER;
struct double_write_buffer
{
bool logging_enabled;
DWB_BLOCK *blocks; /* The blocks in DWB. */
unsigned int num_blocks; /* The total number of blocks in DWB - power of 2. */
unsigned int num_pages; /* The total number of pages in DWB - power of 2. */
unsigned int num_block_pages; /* The number of pages in a block - power of 2. */
unsigned int log2_num_block_pages; /* Logarithm from block number of pages. */
volatile unsigned int blocks_flush_counter; /* The blocks flush counter. */
volatile unsigned int next_block_to_flush; /* Next block to flush */
pthread_mutex_t mutex; /* The mutex to protect the wait queue. */
DWB_WAIT_QUEUE wait_queue; /* The wait queue, used when the DWB structure changed. */
UINT64 volatile position_with_flags; /* The current position in double write buffer and flags. Flags keep the
* state of each block (started, ended), create DWB status, modify DWB status.
*/
dwb_hashmap_type slots_hashmap; /* The slots hash. */
int vdes; /* The volume file descriptor. */
DWB_BLOCK *volatile file_sync_helper_block; /* The block that will be sync by helper thread. */
double_write_buffer ()
: logging_enabled (false)
, blocks (NULL)
, num_blocks (0)
, num_pages (0)
, num_block_pages (0)
, log2_num_block_pages (0)
, blocks_flush_counter (0)
, next_block_to_flush (0)
, mutex PTHREAD_MUTEX_INITIALIZER
, wait_queue DWB_WAIT_QUEUE_INITIALIZER
, position_with_flags (0)
, slots_hashmap {}
, vdes (NULL_VOLDES)
, file_sync_helper_block (NULL)
{
}
};
/* DWB volume name. */
char dwb_Volume_name[PATH_MAX];
/* DWB. */
static DOUBLE_WRITE_BUFFER dwb_Global;
#define dwb_Log dwb_Global.logging_enabled
#define dwb_check_logging() (dwb_Log = prm_get_bool_value (PRM_ID_DWB_LOGGING))
#define dwb_log(...) if (dwb_Log) _er_log_debug (ARG_FILE_LINE, "DWB: " __VA_ARGS__)
#define dwb_log_error(...) if (dwb_Log) _er_log_debug (ARG_FILE_LINE, "DWB ERROR: " __VA_ARGS__)
#if !defined(SERVER_MODE)
#define pthread_mutex_init(a, b)
#define pthread_mutex_destroy(a)
#define pthread_mutex_lock(a) 0
#define pthread_mutex_unlock(a)
#endif /* !SERVER_MODE */
/* DWB wait queue functions */
STATIC_INLINE void dwb_init_wait_queue (DWB_WAIT_QUEUE *wait_queue) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY *dwb_make_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue,
void *data) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY *dwb_block_add_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue,
void *data) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY *dwb_block_disconnect_wait_queue_entry (DWB_WAIT_QUEUE *
wait_queue, void *data)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_block_free_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue,
DWB_WAIT_QUEUE_ENTRY *wait_queue_entry,
int (*func) (void *)) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_remove_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex,
void *data, int (*func) (void *)) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_signal_waiting_threads (DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_destroy_wait_queue (DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex);
/* DWB functions */
STATIC_INLINE void dwb_power2_ceil (unsigned int min, unsigned int max, unsigned int *p_value)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE bool dwb_load_buffer_size (unsigned int *p_double_write_buffer_size) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE bool dwb_load_block_count (unsigned int *p_num_blocks) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_wait_for_block_completion (THREAD_ENTRY *thread_p, unsigned int block_no)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_signal_waiting_thread (void *data) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_set_status_resumed (void *data) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_signal_block_completion (THREAD_ENTRY *thread_p, DWB_BLOCK *dwb_block)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_block_create_ordered_slots (DWB_BLOCK *block, DWB_SLOT **p_dwb_ordered_slots,
unsigned int *p_ordered_slots_length) __attribute__ ((ALWAYS_INLINE));
static int dwb_compare_vol_fd (const void *v1, const void *v2);
STATIC_INLINE FLUSH_VOLUME_INFO *dwb_add_volume_to_block_flush_area (THREAD_ENTRY *thread_p, DWB_BLOCK *block,
int vol_fd, bool ensure_metadata) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_write_block (THREAD_ENTRY *thread_p, DWB_BLOCK *block, DWB_SLOT *p_dwb_slots,
unsigned int ordered_slots_length, bool file_sync_helper_can_flush,
bool remove_from_hash) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_flush_block (THREAD_ENTRY *thread_p, DWB_BLOCK *block, bool file_sync_helper_can_flush,
UINT64 *current_position_with_flags) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_init_slot (DWB_SLOT *slot) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_acquire_next_slot (THREAD_ENTRY *thread_p, bool can_wait, DWB_SLOT **p_dwb_slot)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_set_slot_data (THREAD_ENTRY *thread_p, DWB_SLOT *dwb_slot,
FILEIO_PAGE *io_page_p, bool ensure_metadata) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_wait_for_strucure_modification (THREAD_ENTRY *thread_p) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_signal_structure_modificated (THREAD_ENTRY *thread_p) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_starts_structure_modification (THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_ends_structure_modification (THREAD_ENTRY *thread_p, UINT64 current_position_with_flags)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_destroy_internal (THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_initialize_slot (DWB_SLOT *slot, FILEIO_PAGE *io_page,
unsigned int position_in_block, unsigned int block_no)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_initialize_block (DWB_BLOCK *block, unsigned int block_no,
unsigned int count_wb_pages, char *write_buffer, DWB_SLOT *slots,
FLUSH_VOLUME_INFO *flush_volumes_info, unsigned int count_flush_volumes_info,
unsigned int max_to_flush_vdes) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_create_blocks (THREAD_ENTRY *thread_p, unsigned int num_blocks, unsigned int num_block_pages,
DWB_BLOCK **p_blocks) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_finalize_block (DWB_BLOCK *block) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_create_internal (THREAD_ENTRY *thread_p, const char *dwb_volume_name,
UINT64 *current_position_with_flags) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void dwb_get_next_block_for_flush (THREAD_ENTRY *thread_p, unsigned int *block_no)
__attribute__ ((ALWAYS_INLINE));
/* Slots hash functions. */
static void *dwb_slots_hash_entry_alloc (void);
static int dwb_slots_hash_entry_free (void *entry);
static int dwb_slots_hash_entry_init (void *entry);
static int dwb_slots_hash_key_copy (void *src, void *dest);
static int dwb_slots_hash_compare_key (void *key1, void *key2);
static unsigned int dwb_slots_hash_key (void *key, int hash_table_size);
STATIC_INLINE int dwb_slots_hash_insert (THREAD_ENTRY *thread_p, VPID *vpid, DWB_SLOT *slot, bool *inserted)
__attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int dwb_slots_hash_delete (THREAD_ENTRY *thread_p, DWB_SLOT *slot);
#if defined (SERVER_MODE)
static cubthread::daemon *dwb_flush_block_daemon = NULL;
static cubthread::daemon *dwb_file_sync_helper_daemon = NULL;
#endif
static bool dwb_is_flush_block_daemon_available (void);
static bool dwb_is_file_sync_helper_daemon_available (void);
static bool dwb_flush_block_daemon_is_running (void);
static bool dwb_file_sync_helper_daemon_is_running (void);
static int dwb_file_sync_helper (THREAD_ENTRY *thread_p);
static int dwb_flush_next_block (THREAD_ENTRY *thread_p);
#if !defined (NDEBUG)
static int dwb_debug_check_dwb (THREAD_ENTRY *thread_p, DWB_SLOT *p_dwb_ordered_slots, unsigned int num_pages);
#endif // DEBUG
static int dwb_check_data_page_is_sane (THREAD_ENTRY *thread_p, DWB_BLOCK *rcv_block, DWB_SLOT *p_dwb_ordered_slots,
int *p_num_recoverable_pages);
/* Slots entry descriptor */
static LF_ENTRY_DESCRIPTOR slots_entry_Descriptor =
{
/* offsets */
offsetof (DWB_SLOTS_HASH_ENTRY, stack),
offsetof (DWB_SLOTS_HASH_ENTRY, next),
offsetof (DWB_SLOTS_HASH_ENTRY, del_id),
offsetof (DWB_SLOTS_HASH_ENTRY, vpid),
offsetof (DWB_SLOTS_HASH_ENTRY, mutex),
/* using mutex? */
LF_EM_USING_MUTEX,
LF_ENTRY_DESCRIPTOR_MAX_ALLOC,
dwb_slots_hash_entry_alloc,
dwb_slots_hash_entry_free,
dwb_slots_hash_entry_init,
NULL,
dwb_slots_hash_key_copy,
dwb_slots_hash_compare_key,
dwb_slots_hash_key,
NULL /* no inserts */
};
/*
* dwb_init_wait_queue () - Intialize wait queue.
*
* return : Nothing.
* wait_queue (in/out) : The wait queue.
*/
STATIC_INLINE void
dwb_init_wait_queue (DWB_WAIT_QUEUE *wait_queue)
{
wait_queue->head = NULL;
wait_queue->tail = NULL;
wait_queue->count = 0;
wait_queue->free_list = NULL;
wait_queue->free_count = 0;
}
/*
* dwb_make_wait_queue_entry () - Make wait queue entry.
*
* return : The queue entry.
* wait_queue (in/out) : The wait queue.
* data (in): The data.
*/
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY *
dwb_make_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue, void *data)
{
DWB_WAIT_QUEUE_ENTRY *wait_queue_entry;
assert (wait_queue != NULL && data != NULL);
if (wait_queue->free_list != NULL)
{
wait_queue_entry = wait_queue->free_list;
wait_queue->free_list = wait_queue->free_list->next;
wait_queue->free_count--;
}
else
{
wait_queue_entry = (DWB_WAIT_QUEUE_ENTRY *) malloc (sizeof (DWB_WAIT_QUEUE_ENTRY));
if (wait_queue_entry == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DWB_WAIT_QUEUE_ENTRY));
return NULL;
}
}
wait_queue_entry->data = data;
wait_queue_entry->next = NULL;
return wait_queue_entry;
}
/*
* dwb_block_add_wait_queue_entry () - Add entry to wait queue.
*
* return : Added queue entry.
* wait_queue (in/out) : The wait queue.
* data (in): The data.
*
* Note: Is user responsibility to protect against queue concurrent access.
*/
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY *
dwb_block_add_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue, void *data)
{
DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = NULL;
assert (wait_queue != NULL && data != NULL);
wait_queue_entry = dwb_make_wait_queue_entry (wait_queue, data);
if (wait_queue_entry == NULL)
{
return NULL;
}
if (wait_queue->head == NULL)
{
wait_queue->tail = wait_queue->head = wait_queue_entry;
}
else
{
wait_queue->tail->next = wait_queue_entry;
wait_queue->tail = wait_queue_entry;
}
wait_queue->count++;
return wait_queue_entry;
}
/*
* dwb_block_disconnect_wait_queue_entry () - Disconnect entry from wait queue.
*
* return : Disconnected queue entry.
* wait_queue (in/out) : The wait queue.
* data (in): The data.
* Note: If data is NULL, then first entry will be disconnected.
*/
STATIC_INLINE DWB_WAIT_QUEUE_ENTRY *
dwb_block_disconnect_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue, void *data)
{
DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = NULL, *prev_wait_queue_entry = NULL;
assert (wait_queue != NULL);
if (wait_queue->head == NULL)
{
return NULL;
}
prev_wait_queue_entry = NULL;
wait_queue_entry = wait_queue->head;
if (data != NULL)
{
/* search for data */
while (wait_queue_entry != NULL)
{
if (wait_queue_entry->data == data)
{
break;
}
prev_wait_queue_entry = wait_queue_entry;
wait_queue_entry = wait_queue_entry->next;
}
}
if (wait_queue_entry == NULL)
{
return NULL;
}
if (prev_wait_queue_entry != NULL)
{
prev_wait_queue_entry->next = wait_queue_entry->next;
}
else
{
assert (wait_queue_entry == wait_queue->head);
wait_queue->head = wait_queue_entry->next;
}
if (wait_queue_entry == wait_queue->tail)
{
wait_queue->tail = prev_wait_queue_entry;
}
wait_queue_entry->next = NULL;
wait_queue->count--;
return wait_queue_entry;
}
/*
* dwb_block_free_wait_queue_entry () - Free a wait queue entry.
*
* return : Nothing.
* wait_queue (in/out) : The wait queue.
* wait_queue_entry (in): The wait queue entry.
* func(in): The function to apply on entry.
*/
STATIC_INLINE void
dwb_block_free_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue, DWB_WAIT_QUEUE_ENTRY *wait_queue_entry,
int (*func) (void *))
{
if (wait_queue_entry == NULL)
{
return;
}
if (func != NULL)
{
(void) func (wait_queue_entry);
}
/* Reuse the entry. Do not set data field to NULL. It may be used at debugging. */
wait_queue_entry->next = wait_queue->free_list;
wait_queue->free_list = wait_queue_entry;
wait_queue->free_count++;
}
/*
* dwb_remove_wait_queue_entry () - Remove the wait queue entry.
*
* return : True, if entry removed, false otherwise.
* wait_queue (in/out): The wait queue.
* mutex (in): The mutex to protect the wait queue.
* data (in): The data.
* func(in): The function to apply on each entry.
*
* Note: If the data is NULL, the first entry will be removed.
*/
STATIC_INLINE void
dwb_remove_wait_queue_entry (DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex, void *data, int (*func) (void *))
{
DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = NULL;
if (mutex != NULL)
{
(void) pthread_mutex_lock (mutex);
}
wait_queue_entry = dwb_block_disconnect_wait_queue_entry (wait_queue, data);
if (wait_queue_entry != NULL)
{
dwb_block_free_wait_queue_entry (wait_queue, wait_queue_entry, func);
}
if (mutex != NULL)
{
pthread_mutex_unlock (mutex);
}
}
/*
* dwb_signal_waiting_threads () - Signal waiting threads.
*
* return : Nothing.
* wait_queue (in/out): The wait queue.
* mutex(in): The mutex to protect the queue.
*/
STATIC_INLINE void
dwb_signal_waiting_threads (DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex)
{
assert (wait_queue != NULL);
if (mutex != NULL)
{
(void) pthread_mutex_lock (mutex);
}
while (wait_queue->head != NULL)
{
dwb_remove_wait_queue_entry (wait_queue, NULL, NULL, dwb_signal_waiting_thread);
}
if (mutex != NULL)
{
pthread_mutex_unlock (mutex);
}
}
/*
* dwb_destroy_wait_queue () - Destroy the wait queue.
*
* return : Nothing.
* wait_queue (in/out): The wait queue.
* mutex(in): The mutex to protect the queue.
*/
STATIC_INLINE void
dwb_destroy_wait_queue (DWB_WAIT_QUEUE *wait_queue, pthread_mutex_t *mutex)
{
DWB_WAIT_QUEUE_ENTRY *wait_queue_entry;
assert (wait_queue != NULL);
if (mutex != NULL)
{
(void) pthread_mutex_lock (mutex);
}
dwb_signal_waiting_threads (wait_queue, NULL);
if (wait_queue->free_count > 0)
{
while (wait_queue->free_list != NULL)
{
wait_queue_entry = wait_queue->free_list;
wait_queue->free_list = wait_queue_entry->next;
free_and_init (wait_queue_entry);
wait_queue->free_count--;
}
}
if (mutex != NULL)
{
pthread_mutex_unlock (mutex);
}
}
/*
* dwb_power2_ceil () - Adjust value to power of 2.
*
* return : Error code.
* min (in) : minimum of value to be adjusted.
* max (in) : maximum of value to be adjusted.
* p_value (in/out) : value to be adjusted.
*
* Note: The adjusted value is a power of 2 that is greater than the value.
*/
STATIC_INLINE void
dwb_power2_ceil (unsigned int min, unsigned int max, unsigned int *p_value)
{
unsigned int limit;
if (*p_value < min)
{
*p_value = min;
}
else if (*p_value > max)
{
*p_value = max;
}
else if (!IS_POWER_OF_2 (*p_value))
{
limit = min << 1;
while (limit < *p_value)
{
limit = limit << 1;
}
*p_value = limit;
}
assert (IS_POWER_OF_2 (*p_value));
assert (*p_value >= min && *p_value <= max);
}
/*
* dwb_load_buffer_size () - Load the buffer size value of double write buffer.
*
* return : true or false.
* p_double_write_buffer_size (in/out): the buffer size of double write buffer.
*
* Note: The buffer size must be a multiple of 512 K.
*/
STATIC_INLINE bool
dwb_load_buffer_size (unsigned int *p_double_write_buffer_size)
{
assert (IS_POWER_OF_2 (DWB_MIN_SIZE) && IS_POWER_OF_2 (DWB_MAX_SIZE));
assert (DWB_MAX_SIZE >= DWB_MIN_SIZE);
*p_double_write_buffer_size = prm_get_integer_value (PRM_ID_DWB_SIZE);
if (*p_double_write_buffer_size == 0)
{
/* Do not use double write buffer. */
return false;
}
dwb_power2_ceil (DWB_MIN_SIZE, DWB_MAX_SIZE, p_double_write_buffer_size);
return true;
}
/*
* dwb_load_block_count () - Load the block count value of double write buffer.
*
* return : true or false.
* p_num_blocks (in/out): the block count of double write buffer.
*
* Note: The number of blocks must be a power of 2.
*/
STATIC_INLINE bool
dwb_load_block_count (unsigned int *p_num_blocks)
{
assert (IS_POWER_OF_2 (DWB_MIN_BLOCKS) && IS_POWER_OF_2 (DWB_MAX_BLOCKS));
assert (DWB_MAX_BLOCKS >= DWB_MIN_BLOCKS);
*p_num_blocks = prm_get_integer_value (PRM_ID_DWB_BLOCKS);
if (*p_num_blocks == 0)
{
/* Do not use double write buffer. */
return false;
}
dwb_power2_ceil (DWB_MIN_BLOCKS, DWB_MAX_BLOCKS, p_num_blocks);
return true;
}
/*
* dwb_starts_structure_modification () - Starts structure modifications.
*
* return : Error code
* thread_p (in): The thread entry.
* current_position_with_flags(out): The current position with flags.
*
* Note: This function must be called before changing structure of DWB.
*/
STATIC_INLINE int
dwb_starts_structure_modification (THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags)
{
UINT64 local_current_position_with_flags, new_position_with_flags, min_version;
unsigned int block_no;
int error_code = NO_ERROR;
unsigned int start_block_no, blocks_count;
DWB_BLOCK *file_sync_helper_block;
assert (current_position_with_flags != NULL);
do
{
local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
if (DWB_IS_MODIFYING_STRUCTURE (local_current_position_with_flags))
{
/* Only one thread can change the structure */
return ER_FAILED;
}
new_position_with_flags = DWB_STARTS_MODIFYING_STRUCTURE (local_current_position_with_flags);
/* Start structure modifications, the threads that want to flush afterwards, have to wait. */
}
while (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, local_current_position_with_flags, new_position_with_flags));
#if defined(SERVER_MODE)
while ((ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, 0) > 0)
|| dwb_flush_block_daemon_is_running () || dwb_file_sync_helper_daemon_is_running ())
{
/* Can't modify structure while flush thread can access DWB. */
thread_sleep (20);
}
#endif
/* Since we set the modify structure flag, I'm the only thread that access the DWB. */
file_sync_helper_block = dwb_Global.file_sync_helper_block;
if (file_sync_helper_block != NULL)
{
/* All remaining blocks are flushed by me. */
(void) ATOMIC_TAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL);
dwb_log ("Structure modification, needs to flush DWB block = %d having version %lld\n",
file_sync_helper_block->block_no, file_sync_helper_block->version);
}
local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
/* Need to flush incomplete blocks, ordered by version. */
start_block_no = DWB_NUM_TOTAL_BLOCKS;
min_version = 0xffffffffffffffff;
blocks_count = 0;
for (block_no = 0; block_no < DWB_NUM_TOTAL_BLOCKS; block_no++)
{
if (DWB_IS_BLOCK_WRITE_STARTED (local_current_position_with_flags, block_no))
{
if (dwb_Global.blocks[block_no].version < min_version)
{
min_version = dwb_Global.blocks[block_no].version;
start_block_no = block_no;
}
blocks_count++;
}
}
block_no = start_block_no;
while (blocks_count > 0)
{
if (DWB_IS_BLOCK_WRITE_STARTED (local_current_position_with_flags, block_no))
{
/* Flush all pages from current block. I must flush all remaining data. */
error_code =
dwb_flush_block (thread_p, &dwb_Global.blocks[block_no], false, &local_current_position_with_flags);
if (error_code != NO_ERROR)
{
/* Something wrong happened. */
dwb_log_error ("Can't flush block = %d having version %lld\n", block_no,
dwb_Global.blocks[block_no].version);
return error_code;
}
dwb_log_error ("DWB flushed %d block having version %lld\n", block_no, dwb_Global.blocks[block_no].version);
blocks_count--;
}
block_no = (block_no + 1) % DWB_NUM_TOTAL_BLOCKS;
}
local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
assert (DWB_GET_BLOCK_STATUS (local_current_position_with_flags) == 0);
*current_position_with_flags = local_current_position_with_flags;
return NO_ERROR;
}
/*
* dwb_ends_structure_modification () - Ends structure modifications.
*
* return : Error code.
* thread_p (in): The thread entry.
* current_position_with_flags(in): The current position with flags.
*/
STATIC_INLINE void
dwb_ends_structure_modification (THREAD_ENTRY *thread_p, UINT64 current_position_with_flags)
{
UINT64 new_position_with_flags;
new_position_with_flags = DWB_ENDS_MODIFYING_STRUCTURE (current_position_with_flags);
/* Ends structure modifications. */
assert (dwb_Global.position_with_flags == current_position_with_flags);
ATOMIC_TAS_64 (&dwb_Global.position_with_flags, new_position_with_flags);
/* Signal the other threads. */
dwb_signal_structure_modificated (thread_p);
}
/*
* dwb_initialize_slot () - Initialize a DWB slot.
*
* return : Error code.
* slot (in/out) : The slot to initialize.
* io_page (in) : The page.
* position_in_block(in): The position in DWB block.
* block_no(in): The number of the block where the slot reside.
*/
STATIC_INLINE void
dwb_initialize_slot (DWB_SLOT *slot, FILEIO_PAGE *io_page, unsigned int position_in_block, unsigned int block_no)
{
assert (slot != NULL && io_page != NULL);
slot->io_page = io_page;
if (io_page != NULL)
{
VPID_SET (&slot->vpid, io_page->prv.volid, io_page->prv.pageid);
LSA_COPY (&slot->lsa, &io_page->prv.lsa);
}
slot->position_in_block = position_in_block;
slot->block_no = block_no;
}
/*
* dwb_initialize_block () - Initialize a block.
*
* return : Nothing.
* block (in/out) : Double write buffer block.
* block_no (in) : Database name.
* count_wb_pages (in): Count DWB pages.
* write_buffer(in): The write buffer.
* slots(in): The slots.
* flush_volumes_info(in): The area containing volume descriptors to flush.
* count_flush_volumes_info(in): Count volumes to flush.
* max_to_flush_vdes(in): The maximum volumes to flush.
*/
STATIC_INLINE void
dwb_initialize_block (DWB_BLOCK *block, unsigned int block_no, unsigned int count_wb_pages, char *write_buffer,
DWB_SLOT *slots, FLUSH_VOLUME_INFO *flush_volumes_info, unsigned int count_flush_volumes_info,
unsigned int max_to_flush_vdes)
{
assert (block != NULL);
block->flush_volumes_info = flush_volumes_info;
block->count_flush_volumes_info = count_flush_volumes_info; /* Current volume descriptors to flush. */
block->max_to_flush_vdes = max_to_flush_vdes; /* Maximum volume descriptors to flush. */
pthread_mutex_init (&block->mutex, NULL);
dwb_init_wait_queue (&block->wait_queue);
block->write_buffer = write_buffer;
block->slots = slots;
block->count_wb_pages = count_wb_pages;
block->block_no = block_no;
block->version = 0;
block->all_pages_written = false;
}
/*
* dwb_create_blocks () - Create the blocks.
*
* return : Error code.
* thread_p (in) : The thread entry.
* num_blocks(in): The number of blocks.
* num_block_pages(in): The number of block pages.
* p_blocks(out): The created blocks.
*/
STATIC_INLINE int
dwb_create_blocks (THREAD_ENTRY *thread_p, unsigned int num_blocks, unsigned int num_block_pages,
DWB_BLOCK **p_blocks)
{
DWB_BLOCK *blocks = NULL;
char *blocks_write_buffer[DWB_MAX_BLOCKS];
FLUSH_VOLUME_INFO *flush_volumes_info[DWB_MAX_BLOCKS];
DWB_SLOT *slots[DWB_MAX_BLOCKS];
unsigned int block_buffer_size, i, j;
int error_code;
FILEIO_PAGE *io_page;
assert (num_blocks <= DWB_MAX_BLOCKS);
*p_blocks = NULL;
for (i = 0; i < DWB_MAX_BLOCKS; i++)
{
blocks_write_buffer[i] = NULL;
slots[i] = NULL;
flush_volumes_info[i] = NULL;
}
blocks = (DWB_BLOCK *) malloc (num_blocks * sizeof (DWB_BLOCK));
if (blocks == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_blocks * sizeof (DWB_BLOCK));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset (blocks, 0, num_blocks * sizeof (DWB_BLOCK));
block_buffer_size = num_block_pages * IO_PAGESIZE;
for (i = 0; i < num_blocks; i++)
{
blocks_write_buffer[i] = (char *) malloc (block_buffer_size * sizeof (char));
if (blocks_write_buffer[i] == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, block_buffer_size * sizeof (char));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset (blocks_write_buffer[i], 0, block_buffer_size * sizeof (char));
}
for (i = 0; i < num_blocks; i++)
{
slots[i] = (DWB_SLOT *) malloc (num_block_pages * sizeof (DWB_SLOT));
if (slots[i] == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_block_pages * sizeof (DWB_SLOT));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset (slots[i], 0, num_block_pages * sizeof (DWB_SLOT));
}
for (i = 0; i < num_blocks; i++)
{
flush_volumes_info[i] = (FLUSH_VOLUME_INFO *) malloc (num_block_pages * sizeof (FLUSH_VOLUME_INFO));
if (flush_volumes_info[i] == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
num_block_pages * sizeof (FLUSH_VOLUME_INFO));
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto exit_on_error;
}
memset (flush_volumes_info[i], 0, num_block_pages * sizeof (FLUSH_VOLUME_INFO));
}
for (i = 0; i < num_blocks; i++)
{
/* No need to initialize FILEIO_PAGE header here, since is overwritten before flushing */
for (j = 0; j < num_block_pages; j++)
{
io_page = (FILEIO_PAGE *) (blocks_write_buffer[i] + j * IO_PAGESIZE);
fileio_initialize_res (thread_p, io_page, IO_PAGESIZE);
dwb_initialize_slot (&slots[i][j], io_page, j, i);
}
dwb_initialize_block (&blocks[i], i, 0, blocks_write_buffer[i], slots[i], flush_volumes_info[i], 0,
num_block_pages);
}
*p_blocks = blocks;
return NO_ERROR;
exit_on_error:
for (i = 0; i < DWB_MAX_BLOCKS; i++)
{
if (slots[i] != NULL)
{
free_and_init (slots[i]);
}
if (blocks_write_buffer[i] != NULL)
{
free_and_init (blocks_write_buffer[i]);
}
if (flush_volumes_info[i] != NULL)
{
free_and_init (flush_volumes_info[i]);
}
}
if (blocks != NULL)
{
free_and_init (blocks);
}
return error_code;
}
/*
* dwb_finalize_block () - Finalize a block.
*
* return : Nothing
* block (in) : Double write buffer block.
*/
STATIC_INLINE void
dwb_finalize_block (DWB_BLOCK *block)
{
if (block->slots != NULL)
{
free_and_init (block->slots);
}
/* destroy block write buffer */
if (block->write_buffer != NULL)
{
free_and_init (block->write_buffer);
}
if (block->flush_volumes_info != NULL)
{
free_and_init (block->flush_volumes_info);
}
dwb_destroy_wait_queue (&block->wait_queue, &block->mutex);
pthread_mutex_destroy (&block->mutex);
}
/*
* dwb_create_internal () - Create double write buffer.
*
* return : Error code.
* thread_p (in): The thread entry.
* dwb_volume_name (in) : The double write buffer volume name.
* current_position_with_flags (in/out): Current position with flags.
*
* Note: Is user responsibility to ensure that no other transaction can access DWB structure, during creation.
*/
STATIC_INLINE int
dwb_create_internal (THREAD_ENTRY *thread_p, const char *dwb_volume_name, UINT64 *current_position_with_flags)
{
int error_code = NO_ERROR;
unsigned int double_write_buffer_size, num_blocks = 0;
unsigned int i, num_pages, num_block_pages;
int vdes = NULL_VOLDES;
DWB_BLOCK *blocks = NULL;
UINT64 new_position_with_flags;
const int freelist_block_count = 2;
const int freelist_block_size = DWB_SLOTS_FREE_LIST_SIZE;
assert (dwb_volume_name != NULL && current_position_with_flags != NULL);
if (!dwb_load_buffer_size (&double_write_buffer_size) || !dwb_load_block_count (&num_blocks))
{
/* Do not use double write buffer. */
return NO_ERROR;
}
num_pages = double_write_buffer_size / IO_PAGESIZE;
num_block_pages = num_pages / num_blocks;
assert (IS_POWER_OF_2 (num_blocks));
assert (IS_POWER_OF_2 (num_pages));
assert (IS_POWER_OF_2 (num_block_pages));
assert (num_blocks <= DWB_MAX_BLOCKS);
/* Create and open DWB volume first */
vdes = fileio_format (thread_p, boot_db_full_name (), dwb_volume_name, LOG_DBDWB_VOLID, num_block_pages, true,
false, false, IO_PAGESIZE, 0, false);
if (vdes == NULL_VOLDES)
{
goto exit_on_error;
}
/* Needs to flush dirty page before activating DWB. */
fileio_synchronize_all (thread_p);
/* Create DWB blocks */
error_code = dwb_create_blocks (thread_p, num_blocks, num_block_pages, &blocks);
if (error_code != NO_ERROR)
{
goto exit_on_error;
}
dwb_Global.blocks = blocks;
dwb_Global.num_blocks = num_blocks;
dwb_Global.num_pages = num_pages;
dwb_Global.num_block_pages = num_block_pages;
dwb_Global.log2_num_block_pages = (unsigned int) (log ((float) num_block_pages) / log ((float) 2));
dwb_Global.blocks_flush_counter = 0;
dwb_Global.next_block_to_flush = 0;
pthread_mutex_init (&dwb_Global.mutex, NULL);
dwb_init_wait_queue (&dwb_Global.wait_queue);
dwb_Global.vdes = vdes;
dwb_Global.file_sync_helper_block = NULL;
dwb_Global.slots_hashmap.init (dwb_slots_Ts, THREAD_TS_DWB_SLOTS, DWB_SLOTS_HASH_SIZE, freelist_block_size,
freelist_block_count, slots_entry_Descriptor);
/* Set creation flag. */
new_position_with_flags = DWB_RESET_POSITION (*current_position_with_flags);
new_position_with_flags = DWB_STARTS_CREATION (new_position_with_flags);
if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, *current_position_with_flags, new_position_with_flags))
{
/* Impossible. */
assert (false);
}
*current_position_with_flags = new_position_with_flags;
return NO_ERROR;
exit_on_error:
if (vdes != NULL_VOLDES)
{
fileio_dismount (thread_p, vdes);
fileio_unformat (NULL, dwb_volume_name);
}
if (blocks != NULL)
{
for (i = 0; i < num_blocks; i++)
{
dwb_finalize_block (&blocks[i]);
}
free_and_init (blocks);
}
return error_code;
}
/*
* dwb_slots_hash_entry_alloc () - Allocate a new entry in slots hash.
*
* returns: New pointer or NULL on error.
*/
static void *
dwb_slots_hash_entry_alloc (void)
{
DWB_SLOTS_HASH_ENTRY *slots_hash_entry;
slots_hash_entry = (DWB_SLOTS_HASH_ENTRY *) malloc (sizeof (DWB_SLOTS_HASH_ENTRY));
if (slots_hash_entry == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DWB_SLOTS_HASH_ENTRY));
return NULL;
}
pthread_mutex_init (&slots_hash_entry->mutex, NULL);
return (void *) slots_hash_entry;
}
/*
* dwb_slots_hash_entry_free () - Free an entry in slots hash.
* returns: Error code.
* entry(in): The entry to free.
*/
static int
dwb_slots_hash_entry_free (void *entry)
{
DWB_SLOTS_HASH_ENTRY *slots_hash_entry = (DWB_SLOTS_HASH_ENTRY *) entry;
if (slots_hash_entry == NULL)
{
return ER_FAILED;
}
pthread_mutex_destroy (&slots_hash_entry->mutex);
free (entry);
return NO_ERROR;
}
/*
* dwb_slots_hash_entry_init () - Initialize slots hash entry.
* returns: Error code.
* entry(in): The entry to initialize.
*/
static int
dwb_slots_hash_entry_init (void *entry)
{
DWB_SLOTS_HASH_ENTRY *p_entry = (DWB_SLOTS_HASH_ENTRY *) entry;
if (p_entry == NULL)
{
return ER_FAILED;
}
VPID_SET_NULL (&p_entry->vpid);
p_entry->slot = NULL;
return NO_ERROR;
}
/*
* dwb_slots_hash_key_copy () - Copy a slots hash key.
* returns: Error code.
* src(in): The source.
* dest(out): The destination.
*/
static int
dwb_slots_hash_key_copy (void *src, void *dest)
{
if (src == NULL || dest == NULL)
{
return ER_FAILED;
}
VPID_COPY ((VPID *) dest, (VPID *) src);
return NO_ERROR;
}
/*
* dwb_slots_hash_compare_key () - Compare slots hash keys.
*
* return : 0 if equal, -1 otherwise.
* key1 (in) : Pointer to first VPID value.
* key2 (in) : Pointer to second VPID value.
*/
static int
dwb_slots_hash_compare_key (void *key1, void *key2)
{
if (VPID_EQ ((VPID *) key1, (VPID *) key2))
{
return 0;
}
return -1;
}
/*
* dwb_slots_hash_key () - Slots hash function.
*
* return : Hash index.
* key (in) : Key value.
* hash_table_size (in) : Hash size.
*/
static unsigned int
dwb_slots_hash_key (void *key, int hash_table_size)
{
const VPID *vpid = (VPID *) key;
return ((vpid->pageid | ((unsigned int) vpid->volid) << 24) % hash_table_size);
}
/*
* dwb_slots_hash_insert () - Insert entry in slots hash.
*
* return : Error code.
* thread_p (in): The thread entry.
* vpid(in): The page identifier.
* slot(in): The DWB slot.
* inserted (out): 1, if slot inserted in hash.
*/
STATIC_INLINE int
dwb_slots_hash_insert (THREAD_ENTRY *thread_p, VPID *vpid, DWB_SLOT *slot, bool *inserted)
{
int error_code = NO_ERROR;
DWB_SLOTS_HASH_ENTRY *slots_hash_entry = NULL;
assert (vpid != NULL && slot != NULL && inserted != NULL);
*inserted = dwb_Global.slots_hashmap.find_or_insert (thread_p, *vpid, slots_hash_entry);
assert (VPID_EQ (&slots_hash_entry->vpid, &slot->vpid));
assert (slots_hash_entry->vpid.pageid == slot->io_page->prv.pageid
&& slots_hash_entry->vpid.volid == slot->io_page->prv.volid);
if (! (*inserted))
{
assert (slots_hash_entry->slot != NULL);
if (LSA_LT (&slot->lsa, &slots_hash_entry->slot->lsa))
{
dwb_log ("DWB hash find key (%d, %d), the LSA=(%lld,%d), better than (%lld,%d): \n",
vpid->volid, vpid->pageid, slots_hash_entry->slot->lsa.pageid,
slots_hash_entry->slot->lsa.offset, slot->lsa.pageid, slot->lsa.offset);
/* The older slot is better than mine - leave it in hash. */
pthread_mutex_unlock (&slots_hash_entry->mutex);
return NO_ERROR;
}
else if (LSA_EQ (&slot->lsa, &slots_hash_entry->slot->lsa))
{
/*
* If LSA's are equals, still replace slot in hash. We are in "flushing to disk without logging" case.
* The page was modified but not logged. We have to flush this version since is the latest one.
*/
if (slots_hash_entry->slot->block_no == slot->block_no)
{
/* Invalidate the old slot, if is in the same block. We want to avoid duplicates in block at flush. */
assert (slots_hash_entry->slot->position_in_block < slot->position_in_block);
VPID_SET_NULL (&slots_hash_entry->slot->vpid);
fileio_initialize_res (thread_p, slots_hash_entry->slot->io_page, IO_PAGESIZE);
dwb_log ("Found same page with same LSA in same block - %d - at positions (%d, %d) \n",
slots_hash_entry->slot->position_in_block, slot->position_in_block);
}
else
{
#if !defined (NDEBUG)
int old_block_no = ATOMIC_INC_32 (&slots_hash_entry->slot->block_no, 0);
if (old_block_no > 0)
{
/* Be sure that the block containing old page version is flushed first. */
DWB_BLOCK *old_block = &dwb_Global.blocks[old_block_no];
DWB_BLOCK *new_block = &dwb_Global.blocks[slot->block_no];
/* Maybe we will check that the slot is still in old block. */
assert ((old_block->version < new_block->version)
|| (old_block->version == new_block->version && old_block->block_no < new_block->block_no));
dwb_log ("Found same page with same LSA in 2 different blocks old = (%d, %d), new = (%d,%d) \n",
old_block_no, slots_hash_entry->slot->position_in_block, new_block->block_no,
slot->position_in_block);
}
#endif
}
}
/* If a metadata sync was requested, the metadata must be synced even if the slot is replaced */
slot->ensure_metadata = slot->ensure_metadata || slots_hash_entry->slot->ensure_metadata;
dwb_log ("Replace hash key (%d, %d), the new LSA=(%lld,%d), the old LSA = (%lld,%d)",
vpid->volid, vpid->pageid, slot->lsa.pageid, slot->lsa.offset,
slots_hash_entry->slot->lsa.pageid, slots_hash_entry->slot->lsa.offset);
}
else
{
dwb_log ("Inserted hash key (%d, %d), LSA=(%lld,%d)", vpid->volid, vpid->pageid, slot->lsa.pageid,
slot->lsa.offset);
}
slots_hash_entry->slot = slot;
pthread_mutex_unlock (&slots_hash_entry->mutex);
*inserted = true;
return NO_ERROR;
}
/*
* dwb_destroy_internal () - Destroy DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
*
* Note: Is user responsibility to ensure that no other transaction can access DWB structure, during destroy.
*/
STATIC_INLINE void
dwb_destroy_internal (THREAD_ENTRY *thread_p, UINT64 *current_position_with_flags)
{
UINT64 new_position_with_flags;
unsigned int block_no;
assert (current_position_with_flags != NULL);
dwb_destroy_wait_queue (&dwb_Global.wait_queue, &dwb_Global.mutex);
pthread_mutex_destroy (&dwb_Global.mutex);
if (dwb_Global.blocks != NULL)
{
for (block_no = 0; block_no < DWB_NUM_TOTAL_BLOCKS; block_no++)
{
dwb_finalize_block (&dwb_Global.blocks[block_no]);
}
free_and_init (dwb_Global.blocks);
}
dwb_Global.slots_hashmap.destroy ();
if (dwb_Global.vdes != NULL_VOLDES)
{
fileio_dismount (thread_p, dwb_Global.vdes);
dwb_Global.vdes = NULL_VOLDES;
fileio_unformat (thread_p, dwb_Volume_name);
}
/* Set creation flag. */
new_position_with_flags = DWB_RESET_POSITION (*current_position_with_flags);
new_position_with_flags = DWB_ENDS_CREATION (new_position_with_flags);
if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, *current_position_with_flags, new_position_with_flags))
{
/* Impossible. */
assert (false);
}
*current_position_with_flags = new_position_with_flags;
}
/*
* dwb_set_status_resumed () - Set status resumed.
*
* return : Error code.
* data (in): The thread entry.
*/
STATIC_INLINE int
dwb_set_status_resumed (void *data)
{
#if defined (SERVER_MODE)
DWB_WAIT_QUEUE_ENTRY *queue_entry = (DWB_WAIT_QUEUE_ENTRY *) data;
THREAD_ENTRY *woken_thread_p;
if (queue_entry != NULL)
{
woken_thread_p = (THREAD_ENTRY *) queue_entry->data;
if (woken_thread_p != NULL)
{
thread_lock_entry (woken_thread_p);
woken_thread_p->resume_status = THREAD_DWB_QUEUE_RESUMED;
thread_unlock_entry (woken_thread_p);
}
}
return NO_ERROR;
#else /* !SERVER_MODE */
return NO_ERROR;
#endif /* !SERVER_MODE */
}
/*
* dwb_wait_for_block_completion () - Wait for DWB block to complete.
*
* return : Error code.
* thread_p (in): The thread entry.
* dwb_block (in): The DWB block number.
*/
STATIC_INLINE int
dwb_wait_for_block_completion (THREAD_ENTRY *thread_p, unsigned int block_no)
{
#if defined (SERVER_MODE)
int error_code = NO_ERROR;
DWB_WAIT_QUEUE_ENTRY *double_write_queue_entry = NULL;
DWB_BLOCK *dwb_block = NULL;
PERF_UTIME_TRACKER time_track;
UINT64 current_position_with_flags;
int r;
struct timeval timeval_crt, timeval_timeout;
struct timespec to;
bool save_check_interrupt;
assert (thread_p != NULL && block_no < DWB_NUM_TOTAL_BLOCKS);
PERF_UTIME_TRACKER_START (thread_p, &time_track);
dwb_block = &dwb_Global.blocks[block_no];
(void) pthread_mutex_lock (&dwb_block->mutex);
thread_lock_entry (thread_p);
/* Check again after acquiring mutexes. */
current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
if (!DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, block_no))
{
thread_unlock_entry (thread_p);
pthread_mutex_unlock (&dwb_block->mutex);
PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_WAIT_FLUSH_BLOCK_TIME_COUNTERS);
return NO_ERROR;
}
double_write_queue_entry = dwb_block_add_wait_queue_entry (&dwb_block->wait_queue, thread_p);
if (double_write_queue_entry == NULL)
{
/* allocation error */
thread_unlock_entry (thread_p);
pthread_mutex_unlock (&dwb_block->mutex);
ASSERT_ERROR_AND_SET (error_code);
return error_code;
}
pthread_mutex_unlock (&dwb_block->mutex);
save_check_interrupt = logtb_set_check_interrupt (thread_p, false);
/* Waits for maximum 20 milliseconds. */
gettimeofday (&timeval_crt, NULL);
timeval_add_msec (&timeval_timeout, &timeval_crt, 20);
timeval_to_timespec (&to, &timeval_timeout);
r = thread_suspend_timeout_wakeup_and_unlock_entry (thread_p, &to, THREAD_DWB_QUEUE_SUSPENDED);
(void) logtb_set_check_interrupt (thread_p, save_check_interrupt);
PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_WAIT_FLUSH_BLOCK_TIME_COUNTERS);
if (r == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
/* timeout, remove the entry from queue */
dwb_remove_wait_queue_entry (&dwb_block->wait_queue, &dwb_block->mutex, thread_p, dwb_set_status_resumed);
return r;
}
else if (thread_p->resume_status != THREAD_DWB_QUEUE_RESUMED)
{
/* interruption, remove the entry from queue */
assert (thread_p->resume_status == THREAD_RESUME_DUE_TO_SHUTDOWN);
dwb_remove_wait_queue_entry (&dwb_block->wait_queue, &dwb_block->mutex, thread_p, NULL);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTED, 0);
return ER_INTERRUPTED;
}
else
{
assert (thread_p->resume_status == THREAD_DWB_QUEUE_RESUMED);
return NO_ERROR;
}
#else /* SERVER_MODE */
return NO_ERROR;
#endif /* SERVER_MODE */
}
/*
* dwb_signal_waiting_thread () - Signal waiting thread.
*
* return : Error code.
* data (in): Queue entry containing the waiting thread.
*/
STATIC_INLINE int
dwb_signal_waiting_thread (void *data)
{
#if defined (SERVER_MODE)
THREAD_ENTRY *wait_thread_p;
DWB_WAIT_QUEUE_ENTRY *wait_queue_entry = (DWB_WAIT_QUEUE_ENTRY *) data;
assert (wait_queue_entry != NULL);
wait_thread_p = (THREAD_ENTRY *) wait_queue_entry->data;
if (wait_thread_p)
{
thread_lock_entry (wait_thread_p);
if (wait_thread_p->resume_status == THREAD_DWB_QUEUE_SUSPENDED)
{
thread_wakeup_already_had_mutex (wait_thread_p, THREAD_DWB_QUEUE_RESUMED);
}
thread_unlock_entry (wait_thread_p);
}
return NO_ERROR;
#else /* !SERVER_MODE */
return NO_ERROR;
#endif /* !SERVER_MODE */
}
/*
* dwb_signal_block_completion () - Signal double write buffer block completion.
*
* return : Nothing.
* thread_p (in): The thread entry.
* dwb_block (in): The double write buffer block.
*/
STATIC_INLINE void
dwb_signal_block_completion (THREAD_ENTRY *thread_p, DWB_BLOCK *dwb_block)
{
assert (dwb_block != NULL);
/* There are blocked threads. Destroy the wait queue and release the blocked threads. */
dwb_signal_waiting_threads (&dwb_block->wait_queue, &dwb_block->mutex);
}
/*
* dwb_signal_structure_modificated () - Signal DWB structure changed.
*
* return : Nothing.
* thread_p (in): The thread entry.
*/
STATIC_INLINE void
dwb_signal_structure_modificated (THREAD_ENTRY *thread_p)
{
/* There are blocked threads. Destroy the wait queue and release the blocked threads. */
dwb_signal_waiting_threads (&dwb_Global.wait_queue, &dwb_Global.mutex);
}
/*
* dwb_wait_for_strucure_modification () - Wait for double write buffer structure modification.
*
* return : Error code.
* thread_p (in): The thread entry.
*/
STATIC_INLINE int
dwb_wait_for_strucure_modification (THREAD_ENTRY *thread_p)
{
#if defined (SERVER_MODE)
int error_code = NO_ERROR;
DWB_WAIT_QUEUE_ENTRY *double_write_queue_entry = NULL;
UINT64 current_position_with_flags;
int r;
struct timeval timeval_crt, timeval_timeout;
struct timespec to;
bool save_check_interrupt;
(void) pthread_mutex_lock (&dwb_Global.mutex);
/* Check the actual flags, to avoids unnecessary waits. */
current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
if (!DWB_IS_MODIFYING_STRUCTURE (current_position_with_flags))
{
pthread_mutex_unlock (&dwb_Global.mutex);
return NO_ERROR;
}
thread_lock_entry (thread_p);
double_write_queue_entry = dwb_block_add_wait_queue_entry (&dwb_Global.wait_queue, thread_p);
if (double_write_queue_entry == NULL)
{
/* allocation error */
thread_unlock_entry (thread_p);
pthread_mutex_unlock (&dwb_Global.mutex);
ASSERT_ERROR_AND_SET (error_code);
return error_code;
}
pthread_mutex_unlock (&dwb_Global.mutex);
save_check_interrupt = logtb_set_check_interrupt (thread_p, false);
/* Waits for maximum 10 milliseconds. */
gettimeofday (&timeval_crt, NULL);
timeval_add_msec (&timeval_timeout, &timeval_crt, 10);
timeval_to_timespec (&to, &timeval_timeout);
r = thread_suspend_timeout_wakeup_and_unlock_entry (thread_p, &to, THREAD_DWB_QUEUE_SUSPENDED);
(void) logtb_set_check_interrupt (thread_p, save_check_interrupt);
if (r == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
/* timeout, remove the entry from queue */
dwb_remove_wait_queue_entry (&dwb_Global.wait_queue, &dwb_Global.mutex, thread_p, NULL);
return r;
}
else if (thread_p->resume_status != THREAD_DWB_QUEUE_RESUMED)
{
/* interruption, remove the entry from queue */
assert (thread_p->resume_status == THREAD_RESUME_DUE_TO_SHUTDOWN);
dwb_remove_wait_queue_entry (&dwb_Global.wait_queue, &dwb_Global.mutex, thread_p, NULL);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTED, 0);
return ER_INTERRUPTED;
}
else
{
assert (thread_p->resume_status == THREAD_DWB_QUEUE_RESUMED);
return NO_ERROR;
}
#else /* !SERVER_MODE */
return NO_ERROR;
#endif /* !SERVER_MODE */
}
/*
* dwb_compare_slots () - Compare DWB slots.
* return: arg1 - arg2.
* arg1(in): Slot 1.
* arg2(in): Slot 2.
*/
static int
dwb_compare_slots (const void *arg1, const void *arg2)
{
int diff;
INT64 diff2;
DWB_SLOT *slot1, *slot2;
assert (arg1 != NULL);
assert (arg2 != NULL);
slot1 = (DWB_SLOT *) arg1;
slot2 = (DWB_SLOT *) arg2;
diff = slot1->vpid.volid - slot2->vpid.volid;
if (diff > 0)
{
return 1;
}
else if (diff < 0)
{
return -1;
}
diff = slot1->vpid.pageid - slot2->vpid.pageid;
if (diff > 0)
{
return 1;
}
else if (diff < 0)
{
return -1;
}
diff2 = slot1->lsa.pageid - slot2->lsa.pageid;
if (diff2 > 0)
{
return 1;
}
else if (diff2 < 0)
{
return -1;
}
diff2 = slot1->lsa.offset - slot2->lsa.offset;
if (diff2 > 0)
{
return 1;
}
else if (diff2 < 0)
{
return -1;
}
return 0;
}
/*
* dwb_block_create_ordered_slots () - Create ordered slots from block slots.
*
* return : Error code.
* block(in): The block.
* p_dwb_ordered_slots(out): The ordered slots.
* p_ordered_slots_length(out): The ordered slots array length.
*/
STATIC_INLINE int
dwb_block_create_ordered_slots (DWB_BLOCK *block, DWB_SLOT **p_dwb_ordered_slots,
unsigned int *p_ordered_slots_length)
{
DWB_SLOT *p_local_dwb_ordered_slots = NULL;
assert (block != NULL && p_dwb_ordered_slots != NULL);
/* including sentinel */
p_local_dwb_ordered_slots = (DWB_SLOT *) malloc ((block->count_wb_pages + 1) * sizeof (DWB_SLOT));
if (p_local_dwb_ordered_slots == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
(block->count_wb_pages + 1) * sizeof (DWB_SLOT));
return ER_OUT_OF_VIRTUAL_MEMORY;
}
memcpy (p_local_dwb_ordered_slots, block->slots, block->count_wb_pages * sizeof (DWB_SLOT));
/* init sentinel slot */
dwb_init_slot (&p_local_dwb_ordered_slots[block->count_wb_pages]);
/* Order pages by (VPID, LSA) */
qsort ((void *) p_local_dwb_ordered_slots, block->count_wb_pages, sizeof (DWB_SLOT), dwb_compare_slots);
*p_dwb_ordered_slots = p_local_dwb_ordered_slots;
*p_ordered_slots_length = block->count_wb_pages + 1;
return NO_ERROR;
}
/*
* dwb_slots_hash_delete () - Delete entry in slots hash.
*
* return : Error code.
* thread_p (in): The thread entry.
* slot(in): The DWB slot.
*/
STATIC_INLINE int
dwb_slots_hash_delete (THREAD_ENTRY *thread_p, DWB_SLOT *slot)
{
int error_code = NO_ERROR;
VPID *vpid;
DWB_SLOTS_HASH_ENTRY *slots_hash_entry = NULL;
vpid = &slot->vpid;
if (VPID_ISNULL (vpid))
{
/* Nothing to do, the slot is not in hash. */
return NO_ERROR;
}
/* Remove the old vpid from hash. */
slots_hash_entry = dwb_Global.slots_hashmap.find (thread_p, *vpid);
if (slots_hash_entry == NULL)
{
/* Already removed from hash by others, nothing to do. */
return NO_ERROR;
}
assert (VPID_ISNULL (& (slots_hash_entry->slot->vpid)) || VPID_EQ (& (slots_hash_entry->slot->vpid), vpid));
/* Check the slot. */
if (slots_hash_entry->slot == slot)
{
assert (VPID_EQ (& (slots_hash_entry->slot->vpid), vpid));
assert (slots_hash_entry->slot->io_page->prv.pageid == vpid->pageid
&& slots_hash_entry->slot->io_page->prv.volid == vpid->volid);
if (!dwb_Global.slots_hashmap.erase_locked (thread_p, *vpid, slots_hash_entry))
{
assert_release (false);
/* Should not happen. */
pthread_mutex_unlock (&slots_hash_entry->mutex);
dwb_log_error ("DWB hash delete key (%d, %d) with %d error: \n", vpid->volid, vpid->pageid, error_code);
return error_code;
}
}
else
{
pthread_mutex_unlock (&slots_hash_entry->mutex);
}
return NO_ERROR;
}
/*
* dwb_compare_vol_fd () - Compare two volume file descriptors.
*
* return : v1 - v2.
* v1(in) : Pointer to volume1.
* v2(in) : Pointer to volume2.
*/
static int
dwb_compare_vol_fd (const void *v1, const void *v2)
{
int *vol_fd1 = (int *) v1;
int *vol_fd2 = (int *) v2;
assert (vol_fd1 != NULL && vol_fd2 != NULL);
return ((*vol_fd1) - (*vol_fd2));
}
/*
* dwb_add_volume_to_block_flush_area () - Add a volume to block flush area.
*
* return :
* thread_p (in): The thread entry.
* block(in): The block where the flush area reside.
* vol_fd(in): The volume to add.
* ensure_metadata(in): Include metadata when syncing.
*
* Note: The volume is added if is not already in block flush area.
*/
STATIC_INLINE FLUSH_VOLUME_INFO *
dwb_add_volume_to_block_flush_area (THREAD_ENTRY *thread_p, DWB_BLOCK *block, int vol_fd, bool ensure_metadata)
{
FLUSH_VOLUME_INFO *flush_new_volume_info;
#if !defined (NDEBUG)
unsigned int old_count_flush_volumes_info;
#endif
assert (block != NULL);
assert (vol_fd != NULL_VOLDES);
#if !defined (NDEBUG)
old_count_flush_volumes_info = block->count_flush_volumes_info;
#endif
flush_new_volume_info = &block->flush_volumes_info[block->count_flush_volumes_info];
flush_new_volume_info->vdes = vol_fd;
flush_new_volume_info->num_pages = 0;
flush_new_volume_info->all_pages_written = false;
flush_new_volume_info->metadata = ensure_metadata;
flush_new_volume_info->flushed_status = VOLUME_NOT_FLUSHED;
/* There is only a writer and several (currently 2) readers on flush_new_volume_info array.
* I need to be sure that size is incremented after setting element. Uses atomic to prevent code reordering.
*/
ATOMIC_INC_32 (&block->count_flush_volumes_info, 1);
assert (((old_count_flush_volumes_info + 1) == block->count_flush_volumes_info)
&& (block->count_flush_volumes_info <= block->max_to_flush_vdes));
return flush_new_volume_info;
}
/*
* dwb_write_block () - Write block pages in specified order.
*
* return : Error code.
* thread_p (in): The thread entry.
* block(in): The block that is written.
* p_dwb_ordered_slots(in): The slots that gives the pages flush order.
* ordered_slots_length(in): The ordered slots array length.
* remove_from_hash(in): True, if needs to remove entries from hash.
* file_sync_helper_can_flush(in): True, if helper can flush.
*
* Note: This function fills to_flush_vdes array with the volumes that must be flushed.
*/
STATIC_INLINE int
dwb_write_block (THREAD_ENTRY *thread_p, DWB_BLOCK *block, DWB_SLOT *p_dwb_ordered_slots,
unsigned int ordered_slots_length, bool file_sync_helper_can_flush, bool remove_from_hash)
{
VOLID last_written_volid;
unsigned int i;
int last_written_vol_fd, vol_fd;
VPID *vpid;
int error_code = NO_ERROR;
int count_writes = 0, num_pages_to_sync;
FLUSH_VOLUME_INFO *current_flush_volume_info = NULL;
bool can_flush_volume = false;
bool ensure_metadata;
assert (block != NULL && p_dwb_ordered_slots != NULL);
/*
* Write the whole slots data first and then remove it from hash. Is better to do in this way. Thus, the fileio_write
* may be slow. While the current transaction has delays caused by fileio_write, the concurrent transaction still
* can access the data from memory instead disk
*/
assert (block->count_wb_pages < ordered_slots_length);
assert (block->count_flush_volumes_info == 0);
num_pages_to_sync = prm_get_integer_value (PRM_ID_PB_SYNC_ON_NFLUSH);
last_written_volid = NULL_VOLID;
last_written_vol_fd = NULL_VOLDES;
for (i = 0; i < block->count_wb_pages; i++)
{
vpid = &p_dwb_ordered_slots[i].vpid;
if (VPID_ISNULL (vpid))
{
continue;
}
assert (VPID_ISNULL (&p_dwb_ordered_slots[i + 1].vpid) || VPID_LT (vpid, &p_dwb_ordered_slots[i + 1].vpid));
ensure_metadata = p_dwb_ordered_slots[i].ensure_metadata;
if (last_written_volid != vpid->volid)
{
/* Get the volume descriptor. */
if (current_flush_volume_info != NULL)
{
assert_release (current_flush_volume_info->vdes == last_written_vol_fd);
current_flush_volume_info->all_pages_written = true;
can_flush_volume = true;
current_flush_volume_info = NULL; /* reset */
}
vol_fd = fileio_get_volume_descriptor (vpid->volid);
if (vol_fd == NULL_VOLDES)
{
/* probably it was removed meanwhile. skip it! */
continue;
}
last_written_volid = vpid->volid;
last_written_vol_fd = vol_fd;
current_flush_volume_info = dwb_add_volume_to_block_flush_area (thread_p, block, last_written_vol_fd, ensure_metadata);
}
else
{
assert (current_flush_volume_info != NULL);
current_flush_volume_info->metadata = current_flush_volume_info->metadata || ensure_metadata;
}
assert (last_written_vol_fd != NULL_VOLDES);
assert (p_dwb_ordered_slots[i].io_page->prv.p_reserve_2 == 0);
assert (p_dwb_ordered_slots[i].vpid.pageid == p_dwb_ordered_slots[i].io_page->prv.pageid
&& p_dwb_ordered_slots[i].vpid.volid == p_dwb_ordered_slots[i].io_page->prv.volid);
/* Write the data. */
if (fileio_write (thread_p, last_written_vol_fd, p_dwb_ordered_slots[i].io_page, vpid->pageid, IO_PAGESIZE,
FILEIO_WRITE_NO_COMPENSATE_WRITE) == NULL)
{
ASSERT_ERROR ();
dwb_log_error ("DWB write page VPID=(%d, %d) LSA=(%lld,%d) with %d error: \n",
vpid->volid, vpid->pageid, p_dwb_ordered_slots[i].io_page->prv.lsa.pageid,
(int) p_dwb_ordered_slots[i].io_page->prv.lsa.offset, er_errid ());
assert (false);
/* Something wrong happened. */
return ER_FAILED;
}
dwb_log ("dwb_write_block: written page = (%d,%d) LSA=(%lld,%d)\n",
vpid->volid, vpid->pageid, p_dwb_ordered_slots[i].io_page->prv.lsa.pageid,
(int) p_dwb_ordered_slots[i].io_page->prv.lsa.offset);
#if defined (SERVER_MODE)
ATOMIC_INC_32 (¤t_flush_volume_info->num_pages, 1);
count_writes++;
if (file_sync_helper_can_flush && (count_writes >= num_pages_to_sync || can_flush_volume == true)
&& dwb_is_file_sync_helper_daemon_available ())
{
if (ATOMIC_CAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL, block))
{
dwb_file_sync_helper_daemon->wakeup ();
}
/* Add statistics. */
perfmon_add_stat (thread_p, PSTAT_PB_NUM_IOWRITES, count_writes);
count_writes = 0;
can_flush_volume = false;
}
#endif
}
/* the last written volume */
if (current_flush_volume_info != NULL)
{
current_flush_volume_info->all_pages_written = true;
}
#if !defined (NDEBUG)
for (i = 0; i < block->count_flush_volumes_info; i++)
{
assert (block->flush_volumes_info[i].all_pages_written == true);
assert (block->flush_volumes_info[i].vdes != NULL_VOLDES);
}
#endif
#if defined (SERVER_MODE)
if (file_sync_helper_can_flush && (dwb_Global.file_sync_helper_block == NULL)
&& (block->count_flush_volumes_info > 0))
{
/* If file_sync_helper_block is NULL, it means that the file sync helper thread does not run and was not woken yet. */
if (dwb_is_file_sync_helper_daemon_available ()
&& ATOMIC_CAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL, block))
{
dwb_file_sync_helper_daemon->wakeup ();
}
}
#endif
/* Add statistics. */
perfmon_add_stat (thread_p, PSTAT_PB_NUM_IOWRITES, count_writes);
/* Remove the corresponding entries from hash. */
if (remove_from_hash)
{
PERF_UTIME_TRACKER time_track;
PERF_UTIME_TRACKER_START (thread_p, &time_track);
for (i = 0; i < block->count_wb_pages; i++)
{
vpid = &p_dwb_ordered_slots[i].vpid;
if (VPID_ISNULL (vpid))
{
continue;
}
assert (p_dwb_ordered_slots[i].position_in_block < DWB_BLOCK_NUM_PAGES);
error_code = dwb_slots_hash_delete (thread_p, &block->slots[p_dwb_ordered_slots[i].position_in_block]);
if (error_code != NO_ERROR)
{
return error_code;
}
}
PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_DECACHE_PAGES_AFTER_WRITE);
}
return NO_ERROR;
}
/*
* dwb_flush_block () - Flush pages from specified block.
*
* return : Error code.
* thread_p (in): Thread entry.
* block(in): The block that needs flush.
* file_sync_helper_can_flush(in): True, if file sync helper thread can flush.
* current_position_with_flags(out): Current position with flags.
*
* Note: The block pages can't be modified by others during flush.
*/
STATIC_INLINE int
dwb_flush_block (THREAD_ENTRY *thread_p, DWB_BLOCK *block, bool file_sync_helper_can_flush,
UINT64 *current_position_with_flags)
{
UINT64 local_current_position_with_flags, new_position_with_flags;
int error_code = NO_ERROR;
DWB_SLOT *p_dwb_ordered_slots = NULL;
unsigned int i, ordered_slots_length;
PERF_UTIME_TRACKER time_track;
int num_pages;
unsigned int current_block_to_flush, next_block_to_flush;
int max_pages_to_sync;
#if defined (SERVER_MODE)
bool flush = false;
PERF_UTIME_TRACKER time_track_file_sync_helper;
#endif
#if !defined (NDEBUG)
DWB_BLOCK *saved_file_sync_helper_block = NULL;
LOG_LSA nxio_lsa;
#endif
assert (block != NULL && block->count_wb_pages > 0 && dwb_is_created ());
PERF_UTIME_TRACKER_START (thread_p, &time_track);
/* Currently we allow only one block to be flushed. */
ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, 1);
assert (dwb_Global.blocks_flush_counter <= 1);
/* Order slots by VPID, to flush faster. */
error_code = dwb_block_create_ordered_slots (block, &p_dwb_ordered_slots, &ordered_slots_length);
if (error_code != NO_ERROR)
{
error_code = ER_FAILED;
goto end;
}
/* Remove duplicates */
for (i = 0; i < block->count_wb_pages - 1; i++)
{
DWB_SLOT *s1, *s2;
s1 = &p_dwb_ordered_slots[i];
s2 = &p_dwb_ordered_slots[i + 1];
assert (s1->io_page->prv.p_reserve_2 == 0);
if (!VPID_ISNULL (&s1->vpid) && VPID_EQ (&s1->vpid, &s2->vpid))
{
/* Next slot contains the same page, but that page is newer than the current one. Invalidate the VPID to
* avoid flushing the page twice. I'm sure that the current slot is not in hash.
*/
assert (LSA_LE (&s1->lsa, &s2->lsa));
VPID_SET_NULL (&s1->vpid);
assert (s1->position_in_block < DWB_BLOCK_NUM_PAGES);
VPID_SET_NULL (& (block->slots[s1->position_in_block].vpid));
fileio_initialize_res (thread_p, s1->io_page, IO_PAGESIZE);
s2->ensure_metadata = s1->ensure_metadata || s2->ensure_metadata;
}
/* Check for WAL protocol. */
#if !defined (NDEBUG)
if (s1->io_page->prv.pageid != NULL_PAGEID && logpb_need_wal (&s1->io_page->prv.lsa))
{
/* Need WAL. Check whether log buffer pool was destroyed. */
nxio_lsa = log_Gl.append.get_nxio_lsa ();
assert (LSA_ISNULL (&nxio_lsa));
}
#endif
}
PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_FLUSH_BLOCK_SORT_TIME_COUNTERS);
#if !defined (NDEBUG)
saved_file_sync_helper_block = (DWB_BLOCK *) dwb_Global.file_sync_helper_block;
#endif
#if defined (SERVER_MODE)
PERF_UTIME_TRACKER_START (thread_p, &time_track_file_sync_helper);
while (dwb_Global.file_sync_helper_block != NULL)
{
flush = true;
/* Be sure that the previous block was written on disk, before writing the current block. */
if (dwb_is_file_sync_helper_daemon_available ())
{
/* Wait for file sync helper. */
thread_sleep (1);
}
else
{
/* Helper not available, flush the volumes from previous block. */
for (i = 0; i < dwb_Global.file_sync_helper_block->count_flush_volumes_info; i++)
{
assert (dwb_Global.file_sync_helper_block->flush_volumes_info[i].vdes != NULL_VOLDES);
if (ATOMIC_INC_32 (& (dwb_Global.file_sync_helper_block->flush_volumes_info[i].num_pages), 0) >= 0)
{
(void) fileio_synchronize (thread_p,
dwb_Global.file_sync_helper_block->flush_volumes_info[i].vdes,
NULL,
dwb_Global.file_sync_helper_block->flush_volumes_info[i].metadata);
dwb_log ("dwb_flush_block: Synchronized volume %d\n",
dwb_Global.file_sync_helper_block->flush_volumes_info[i].vdes);
}
}
(void) ATOMIC_TAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL);
}
}
#if !defined (NDEBUG)
if (saved_file_sync_helper_block)
{
for (i = 0; i < saved_file_sync_helper_block->count_flush_volumes_info; i++)
{
assert (saved_file_sync_helper_block->flush_volumes_info[i].all_pages_written == true
&& saved_file_sync_helper_block->flush_volumes_info[i].num_pages == 0);
}
}
#endif
if (flush == true)
{
PERF_UTIME_TRACKER_TIME (thread_p, &time_track_file_sync_helper, PSTAT_DWB_WAIT_FILE_SYNC_HELPER_TIME_COUNTERS);
}
#endif /* SERVER_MODE */
ATOMIC_TAS_32 (&block->count_flush_volumes_info, 0);
block->all_pages_written = false;
/* First, write and flush the double write file buffer. */
if (fileio_write_pages (thread_p, dwb_Global.vdes, block->write_buffer, 0, block->count_wb_pages,
IO_PAGESIZE, FILEIO_WRITE_NO_COMPENSATE_WRITE) == NULL)
{
/* Something wrong happened. */
assert (false);
error_code = ER_FAILED;
goto end;
}
/* Increment statistics after writing in double write volume. */
perfmon_add_stat (thread_p, PSTAT_PB_NUM_IOWRITES, block->count_wb_pages);
if (fileio_synchronize (thread_p, dwb_Global.vdes, dwb_Volume_name, false) != dwb_Global.vdes)
{
assert (false);
/* Something wrong happened. */
error_code = ER_FAILED;
goto end;
}
dwb_log ("dwb_flush_block: DWB synchronized\n");
/* Now, write and flush the original location. */
error_code =
dwb_write_block (thread_p, block, p_dwb_ordered_slots, ordered_slots_length, file_sync_helper_can_flush, true);
if (error_code != NO_ERROR)
{
assert (false);
goto end;
}
max_pages_to_sync = prm_get_integer_value (PRM_ID_PB_SYNC_ON_NFLUSH) / 2;
/* Now, flush only the volumes having pages in current block. */
for (i = 0; i < block->count_flush_volumes_info; i++)
{
assert (block->flush_volumes_info[i].vdes != NULL_VOLDES);
num_pages = ATOMIC_INC_32 (&block->flush_volumes_info[i].num_pages, 0);
if (num_pages == 0)
{
/* Flushed by helper. */
continue;
}
#if defined (SERVER_MODE)
if (file_sync_helper_can_flush == true)
{
if ((num_pages > max_pages_to_sync) && dwb_is_file_sync_helper_daemon_available ())
{
/* Let the helper thread to flush volumes having many pages. */
assert (dwb_Global.file_sync_helper_block != NULL);
continue;
}
}
else
{
assert (dwb_Global.file_sync_helper_block == NULL);
}
#endif
if (!ATOMIC_CAS_32 (&block->flush_volumes_info[i].flushed_status, VOLUME_NOT_FLUSHED,
VOLUME_FLUSHED_BY_DWB_FLUSH_THREAD))
{
/* Flushed by helper. */
continue;
}
num_pages = ATOMIC_TAS_32 (&block->flush_volumes_info[i].num_pages, 0);
assert (num_pages != 0);
(void) fileio_synchronize (thread_p, block->flush_volumes_info[i].vdes, NULL, block->flush_volumes_info[i].metadata);
dwb_log ("dwb_flush_block: Synchronized volume %d\n", block->flush_volumes_info[i].vdes);
}
/* Allow to file sync helper thread to finish. */
block->all_pages_written = true;
if (perfmon_is_perf_tracking_and_active (PERFMON_ACTIVATION_FLAG_FLUSHED_BLOCK_VOLUMES))
{
perfmon_db_flushed_block_volumes (thread_p, block->count_flush_volumes_info);
}
/* The block is full or there is only one thread that access DWB. */
assert (block->count_wb_pages == DWB_BLOCK_NUM_PAGES
|| DWB_IS_MODIFYING_STRUCTURE (ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0LL)));
ATOMIC_TAS_32 (&block->count_wb_pages, 0);
ATOMIC_INC_64 (&block->version, 1ULL);
/* Reset block bit, since the block was flushed. */
reset_bit_position:
local_current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0LL);
new_position_with_flags = DWB_ENDS_BLOCK_WRITING (local_current_position_with_flags, block->block_no);
if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, local_current_position_with_flags, new_position_with_flags))
{
/* The position was changed by others, try again. */
goto reset_bit_position;
}
/* Advance flushing to next block. */
current_block_to_flush = dwb_Global.next_block_to_flush;
next_block_to_flush = DWB_GET_NEXT_BLOCK_NO (current_block_to_flush);
if (!ATOMIC_CAS_32 (&dwb_Global.next_block_to_flush, current_block_to_flush, next_block_to_flush))
{
/* I'm the only thread that can advance next block to flush. */
assert_release (false);
}
/* Release locked threads, if any. */
dwb_signal_block_completion (thread_p, block);
if (current_position_with_flags)
{
*current_position_with_flags = new_position_with_flags;
}
end:
ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, -1);
if (p_dwb_ordered_slots != NULL)
{
free_and_init (p_dwb_ordered_slots);
}
PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_FLUSH_BLOCK_TIME_COUNTERS);
return error_code;
}
/*
* dwb_acquire_next_slot () - Acquire the next slot in DWB.
*
* return : Error code.
* thread_p(in): The thread entry.
* can_wait(in): True, if can wait to get the next slot.
* p_dwb_slot(out): The pointer to the next slot in DWB.
*/
STATIC_INLINE int
dwb_acquire_next_slot (THREAD_ENTRY *thread_p, bool can_wait, DWB_SLOT **p_dwb_slot)
{
UINT64 current_position_with_flags, current_position_with_block_write_started, new_position_with_flags;
unsigned int current_block_no, position_in_current_block;
int error_code = NO_ERROR;
DWB_BLOCK *block;
assert (p_dwb_slot != NULL);
*p_dwb_slot = NULL;
start:
/* Get the current position in double write buffer. */
current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
if (DWB_NOT_CREATED_OR_MODIFYING (current_position_with_flags))
{
/* Rarely happens. */
if (DWB_IS_MODIFYING_STRUCTURE (current_position_with_flags))
{
if (can_wait == false)
{
return NO_ERROR;
}
/* DWB structure change started, needs to wait. */
error_code = dwb_wait_for_strucure_modification (thread_p);
if (error_code != NO_ERROR)
{
if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
/* timeout, try again */
goto start;
}
return error_code;
}
/* Probably someone else advanced the position, try again. */
goto start;
}
else if (!DWB_IS_CREATED (current_position_with_flags))
{
if (DWB_IS_ANY_BLOCK_WRITE_STARTED (current_position_with_flags))
{
/* Someone deleted the DWB, before flushing the data. */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DWB_DISABLED, 0);
return ER_DWB_DISABLED;
}
/* Someone deleted the DWB */
return NO_ERROR;
}
else
{
assert (false);
}
}
current_block_no = DWB_GET_BLOCK_NO_FROM_POSITION (current_position_with_flags);
position_in_current_block = DWB_GET_POSITION_IN_BLOCK (current_position_with_flags);
assert (current_block_no < DWB_NUM_TOTAL_BLOCKS && position_in_current_block < DWB_BLOCK_NUM_PAGES);
if (position_in_current_block == 0)
{
/* This is the first write on current block. Before writing, check whether the previous iteration finished. */
if (DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, current_block_no))
{
if (can_wait == false)
{
return NO_ERROR;
}
dwb_log ("Waits for flushing block=%d having version=%lld) \n",
current_block_no, dwb_Global.blocks[current_block_no].version);
/*
* The previous iteration didn't finished, needs to wait, in order to avoid buffer overwriting.
* Should happens relative rarely, except the case when the buffer consist in only one block.
*/
error_code = dwb_wait_for_block_completion (thread_p, current_block_no);
if (error_code != NO_ERROR)
{
if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
/* timeout, try again */
goto start;
}
dwb_log_error ("Error %d while waiting for flushing block=%d having version %lld \n",
error_code, current_block_no, dwb_Global.blocks[current_block_no].version);
return error_code;
}
/* Probably someone else advanced the position, try again. */
goto start;
}
/* First write in the current block. */
assert (!DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, current_block_no));
current_position_with_block_write_started =
DWB_STARTS_BLOCK_WRITING (current_position_with_flags, current_block_no);
new_position_with_flags = DWB_GET_NEXT_POSITION_WITH_FLAGS (current_position_with_block_write_started);
}
else
{
/* I'm sure that nobody else can delete the buffer */
assert (DWB_IS_CREATED (dwb_Global.position_with_flags));
assert (!DWB_IS_MODIFYING_STRUCTURE (dwb_Global.position_with_flags));
/* Compute the next position with flags */
new_position_with_flags = DWB_GET_NEXT_POSITION_WITH_FLAGS (current_position_with_flags);
}
/* Compute and advance the global position in double write buffer. */
if (!ATOMIC_CAS_64 (&dwb_Global.position_with_flags, current_position_with_flags, new_position_with_flags))
{
/* Someone else advanced the global position in double write buffer, try again. */
goto start;
}
block = dwb_Global.blocks + current_block_no;
*p_dwb_slot = block->slots + position_in_current_block;
/* Invalidate slot content. */
VPID_SET_NULL (& (*p_dwb_slot)->vpid);
assert ((*p_dwb_slot)->position_in_block == position_in_current_block);
return NO_ERROR;
}
/*
* dwb_set_slot_data () - Set DWB data at the location indicated by the slot.
*
* return : Error code.
* thread_p(in): Thread entry
* dwb_slot(in/out): DWB slot that contains the location where the data must be set.
* io_page_p(in): The data.
* ensure_metadata(in): Include metadata when syncing.
*/
STATIC_INLINE void
dwb_set_slot_data (THREAD_ENTRY *thread_p, DWB_SLOT *dwb_slot, FILEIO_PAGE *io_page_p, bool ensure_metadata)
{
assert (dwb_slot != NULL && io_page_p != NULL);
assert (io_page_p->prv.p_reserve_2 == 0);
if (io_page_p->prv.pageid != NULL_PAGEID)
{
memcpy (dwb_slot->io_page, (char *) io_page_p, IO_PAGESIZE);
}
else
{
/* Initialize page for consistency. */
fileio_initialize_res (thread_p, dwb_slot->io_page, IO_PAGESIZE);
}
assert (fileio_is_page_sane (io_page_p, IO_PAGESIZE));
LSA_COPY (&dwb_slot->lsa, &io_page_p->prv.lsa);
VPID_SET (&dwb_slot->vpid, io_page_p->prv.volid, io_page_p->prv.pageid);
dwb_slot->ensure_metadata = ensure_metadata;
}
/*
* dwb_init_slot () - Initialize DWB slot.
*
* return : Nothing.
* slot (in/out) : The DWB slot.
*/
STATIC_INLINE void
dwb_init_slot (DWB_SLOT *slot)
{
assert (slot != NULL);
slot->io_page = NULL;
VPID_SET_NULL (&slot->vpid);
LSA_SET_NULL (&slot->lsa);
}
/*
* dwb_get_next_block_for_flush(): Get next block for flush.
*
* returns: Nothing
* thread_p (in): The thread entry.
* block_no(out): The next block for flush if found, otherwise DWB_NUM_TOTAL_BLOCKS.
*/
STATIC_INLINE void
dwb_get_next_block_for_flush (THREAD_ENTRY *thread_p, unsigned int *block_no)
{
assert (block_no != NULL);
*block_no = DWB_NUM_TOTAL_BLOCKS;
/* check whether the next block can be flushed. */
if (dwb_Global.blocks[dwb_Global.next_block_to_flush].count_wb_pages != DWB_BLOCK_NUM_PAGES)
{
/* Next block is not full yet. */
return;
}
*block_no = dwb_Global.next_block_to_flush;
}
/*
* dwb_set_data_on_next_slot () - Sets data at the next DWB slot, if possible.
*
* return : Error code.
* thread_p(in): The thread entry.
* io_page_p(in): The data that will be set on next slot.
* can_wait(in): True, if waiting is allowed.
* ensure_metadata(in): Include metadata when syncing.
* p_dwb_slot(out): Pointer to the next free DWB slot.
*/
int
dwb_set_data_on_next_slot (THREAD_ENTRY *thread_p, FILEIO_PAGE *io_page_p, bool can_wait, bool ensure_metadata,
DWB_SLOT **p_dwb_slot)
{
int error_code;
assert (p_dwb_slot != NULL && io_page_p != NULL);
/* Acquire the slot before setting the data. */
error_code = dwb_acquire_next_slot (thread_p, can_wait, p_dwb_slot);
if (error_code != NO_ERROR)
{
return error_code;
}
assert (can_wait == false || *p_dwb_slot != NULL);
if (*p_dwb_slot == NULL)
{
/* Can't acquire next slot. */
return NO_ERROR;
}
/* Set data on slot. */
dwb_set_slot_data (thread_p, *p_dwb_slot, io_page_p, ensure_metadata);
return NO_ERROR;
}
/*
* dwb_add_page () - Add page content to DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
* io_page_p(in): In-memory address where the current content of page resides.
* vpid(in): Page identifier.
* ensure_metadata(in): Include metadata when syncing.
* p_dwb_slot(in/out): DWB slot where the page content must be added.
*
* Note: thread may flush the block, if flush thread is not available or we are in stand alone.
*/
int
dwb_add_page (THREAD_ENTRY *thread_p, FILEIO_PAGE *io_page_p, VPID *vpid, bool ensure_metadata, DWB_SLOT **p_dwb_slot)
{
unsigned int count_wb_pages;
int error_code = NO_ERROR;
bool inserted = false;
DWB_BLOCK *block = NULL;
DWB_SLOT *dwb_slot = NULL;
bool needs_flush;
assert (p_dwb_slot != NULL && (io_page_p != NULL || (*p_dwb_slot)->io_page != NULL) && vpid != NULL);
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
if (*p_dwb_slot == NULL)
{
error_code = dwb_set_data_on_next_slot (thread_p, io_page_p, true, ensure_metadata, p_dwb_slot);
if (error_code != NO_ERROR)
{
return error_code;
}
if (*p_dwb_slot == NULL)
{
return NO_ERROR;
}
}
dwb_slot = *p_dwb_slot;
assert (VPID_EQ (vpid, &dwb_slot->vpid));
if (!VPID_ISNULL (vpid))
{
error_code = dwb_slots_hash_insert (thread_p, vpid, dwb_slot, &inserted);
if (error_code != NO_ERROR)
{
return error_code;
}
if (!inserted)
{
/* Invalidate the slot to avoid flushing the same data twice. */
VPID_SET_NULL (&dwb_slot->vpid);
fileio_initialize_res (thread_p, dwb_slot->io_page, IO_PAGESIZE);
}
}
dwb_log ("dwb_add_page: added page = (%d,%d) on block (%d) position (%d)\n", vpid->volid, vpid->pageid,
dwb_slot->block_no, dwb_slot->position_in_block);
block = &dwb_Global.blocks[dwb_slot->block_no];
count_wb_pages = ATOMIC_INC_32 (&block->count_wb_pages, 1);
assert_release (count_wb_pages <= DWB_BLOCK_NUM_PAGES);
if (count_wb_pages < DWB_BLOCK_NUM_PAGES)
{
needs_flush = false;
}
else
{
needs_flush = true;
}
if (needs_flush == false)
{
return NO_ERROR;
}
/*
* The blocks must be flushed in the order they are filled to have consistent data. The flush block thread knows
* how to flush the blocks in the order they are filled. So, we don't care anymore about the flushing order here.
* Initially, we waited here if the previous block was not flushed. That approach created delays.
*/
#if defined (SERVER_MODE)
/*
* Wake ups flush block thread to flush the current block. The current block will be flushed after flushing the
* previous block.
*/
if (dwb_is_flush_block_daemon_available ())
{
/* Wakeup the thread that will flush the block. */
dwb_flush_block_daemon->wakeup ();
return NO_ERROR;
}
#endif /* SERVER_MODE */
/* Flush all pages from current block */
error_code = dwb_flush_block (thread_p, block, false, NULL);
if (error_code != NO_ERROR)
{
dwb_log_error ("Can't flush block = %d having version %lld\n", block->block_no, block->version);
return error_code;
}
dwb_log ("Successfully flushed DWB block = %d having version %lld\n", block->block_no, block->version);
return NO_ERROR;
}
/*
* dwb_synchronize () - synchronize the DWB and volume.
*
* return : Error code.
* thread_p (in): The thread entry.
* vol_fd(in): Volume descriptor
* vlabel(in): Volume label
*
* NOTE: try to sync the DWB and the volume, but sync only
* the volume if failed to sync the DWB.
*/
int dwb_synchronize (THREAD_ENTRY *thread_p, int vol_fd, const char *vlabel)
{
#if defined (EnableThreadMonitoring)
TSC_TICKS start_tick, end_tick;
TSCTIMEVAL elapsed_time;
#endif
bool complete = false;
int error = NO_ERROR;
/* should fsync ? */
if (fileio_fsync_pending ())
{
return vol_fd;
}
#if defined (EnableThreadMonitoring)
if (0 < prm_get_integer_value (PRM_ID_MNT_WAITING_THREAD))
{
tsc_getticks (&start_tick);
}
#endif
#if !defined (CS_MODE)
if (fileio_is_permanent_volume_descriptor (thread_p, vol_fd))
{
error = dwb_flush_force (thread_p, &complete);
}
#endif
/* If complete is true, everything was synchronized. if not, directly sync the volume */
if (error == NO_ERROR && complete == false)
{
error = fsync (vol_fd);
}
#if defined (EnableThreadMonitoring)
if (0 < prm_get_integer_value (PRM_ID_MNT_WAITING_THREAD))
{
tsc_getticks (&end_tick);
tsc_elapsed_time_usec (&elapsed_time, end_tick, start_tick);
}
#endif
if (error != NO_ERROR)
{
/* sync error is not alwasy handled and I am not sure a proper safe handling is possible: raise as fatal error */
er_set_with_oserror (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_IO_SYNC, 1, (vlabel ? vlabel : "Unknown"));
return NULL_VOLDES;
}
#if defined (EnableThreadMonitoring)
if (MONITOR_WAITING_THREAD (elapsed_time))
{
er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_MNT_WAITING_THREAD, 3, __func__,
prm_get_integer_value (PRM_ID_MNT_WAITING_THREAD), TO_MSEC (elapsed_time));
}
#endif
perfmon_inc_stat (thread_p, PSTAT_FILE_NUM_IOSYNCHES);
return vol_fd;
}
/*
* dwb_is_created () - Checks whether double write buffer was created.
*
* return : True, if created.
*/
bool
dwb_is_created (void)
{
UINT64 position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
return DWB_IS_CREATED (position_with_flags);
}
/*
* dwb_create () - Create DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
* dwb_path_p (in) : The double write buffer volume path.
* db_name_p (in) : The database name.
*/
int
dwb_create (THREAD_ENTRY *thread_p, const char *dwb_path_p, const char *db_name_p)
{
UINT64 current_position_with_flags;
int error_code = NO_ERROR;
error_code = dwb_starts_structure_modification (thread_p, ¤t_position_with_flags);
if (error_code != NO_ERROR)
{
dwb_log_error ("Can't create DWB: error = %d\n", error_code);
return error_code;
}
/* DWB structure modification started, no other transaction can modify the global position with flags */
if (DWB_IS_CREATED (current_position_with_flags))
{
/* Already created, restore the modification flag. */
goto end;
}
fileio_make_dwb_name (dwb_Volume_name, dwb_path_p, db_name_p);
error_code = dwb_create_internal (thread_p, dwb_Volume_name, ¤t_position_with_flags);
if (error_code != NO_ERROR)
{
dwb_log_error ("Can't create DWB: error = %d\n", error_code);
goto end;
}
end:
/* Ends the modification, allowing to others to modify global position with flags. */
dwb_ends_structure_modification (thread_p, current_position_with_flags);
return error_code;
}
/*
* dwb_recreate () - Recreate double write buffer with new user parameter values.
*
* return : Error code.
* thread_p (in): The thread entry.
*/
int
dwb_recreate (THREAD_ENTRY *thread_p)
{
int error_code = NO_ERROR;
UINT64 current_position_with_flags;
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
error_code = dwb_starts_structure_modification (thread_p, ¤t_position_with_flags);
if (error_code != NO_ERROR)
{
return error_code;
}
/* DWB structure modification started, no other transaction can modify the global position with flags */
if (DWB_IS_CREATED (current_position_with_flags))
{
dwb_destroy_internal (thread_p, ¤t_position_with_flags);
}
error_code = dwb_create_internal (thread_p, dwb_Volume_name, ¤t_position_with_flags);
if (error_code != NO_ERROR)
{
goto end;
}
end:
/* Ends the modification, allowing to others to modify global position with flags. */
dwb_ends_structure_modification (thread_p, current_position_with_flags);
return error_code;
}
#if !defined (NDEBUG)
/*
* dwb_debug_check_dwb () - check sanity of ordered slots
*
* return : Error code
* thread_p (in): The thread entry.
* p_dwb_ordered_slots(in):
* num_dwb_pages(in):
*
*/
static int
dwb_debug_check_dwb (THREAD_ENTRY *thread_p, DWB_SLOT *p_dwb_ordered_slots, unsigned int num_dwb_pages)
{
char page_buf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
FILEIO_PAGE *iopage;
int error_code;
unsigned int i;
bool is_page_corrupted;
iopage = (FILEIO_PAGE *) PTR_ALIGN (page_buf, MAX_ALIGNMENT);
memset (iopage, 0, IO_PAGESIZE);
/* Check for duplicates in DWB. */
for (i = 1; i < num_dwb_pages; i++)
{
if (VPID_ISNULL (&p_dwb_ordered_slots[i].vpid))
{
continue;
}
if (VPID_EQ (&p_dwb_ordered_slots[i].vpid, &p_dwb_ordered_slots[i - 1].vpid))
{
/* Same VPID, at least one is corrupted. */
error_code = fileio_page_check_corruption (thread_p, p_dwb_ordered_slots[i - 1].io_page, &is_page_corrupted);
if (error_code != NO_ERROR)
{
return error_code;
}
if (is_page_corrupted)
{
continue;
}
error_code = fileio_page_check_corruption (thread_p, p_dwb_ordered_slots[i].io_page, &is_page_corrupted);
if (error_code != NO_ERROR)
{
return error_code;
}
if (is_page_corrupted)
{
continue;
}
if (memcmp (p_dwb_ordered_slots[i - 1].io_page, iopage, IO_PAGESIZE) == 0)
{
/* Skip not initialized pages. */
continue;
}
if (memcmp (p_dwb_ordered_slots[i].io_page, iopage, IO_PAGESIZE) == 0)
{
/* Skip not initialized pages. */
continue;
}
/* Found duplicates - something is wrong. We may still can check for same LSAs.
* But, duplicates occupies disk space so is better to avoid it.
*/
assert (false);
}
}
return NO_ERROR;
}
#endif // DEBUG
/*
* dwb_check_data_page_is_sane () - Check whether the data page is corrupted.
*
* return : Error code
* thread_p (in): The thread entry.
* block(in): DWB recovery block.
* p_dwb_ordered_slots(in): DWB ordered slots
* p_num_recoverable_pages(out): number of recoverable corrupted pages
*
*/
static int
dwb_check_data_page_is_sane (THREAD_ENTRY *thread_p, DWB_BLOCK *rcv_block, DWB_SLOT *p_dwb_ordered_slots,
int *p_num_recoverable_pages)
{
char page_buf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
FILEIO_PAGE *iopage;
VPID *vpid;
int vol_fd = NULL_VOLDES, temp_vol_fd = NULL_VOLDES, vol_pages = 0;
INT16 volid;
int error_code;
unsigned int i;
int num_recoverable_pages = 0;
bool is_page_corrupted;
assert (rcv_block != NULL && p_dwb_ordered_slots != NULL && p_num_recoverable_pages != NULL);
iopage = (FILEIO_PAGE *) PTR_ALIGN (page_buf, MAX_ALIGNMENT);
memset (iopage, 0, IO_PAGESIZE);
volid = NULL_VOLID;
/* Check whether the data page is corrupted. If true, replaced with the DWB page. */
for (i = 0; i < rcv_block->count_wb_pages; i++)
{
vpid = &p_dwb_ordered_slots[i].vpid;
if (VPID_ISNULL (vpid))
{
continue;
}
if (volid != vpid->volid)
{
/* Update the current VPID and get the volume descriptor. */
temp_vol_fd = fileio_get_volume_descriptor (vpid->volid);
if (temp_vol_fd == NULL_VOLDES)
{
continue;
}
vol_fd = temp_vol_fd;
volid = vpid->volid;
vol_pages = fileio_get_number_of_volume_pages (vol_fd, IO_PAGESIZE);
}
assert (vol_fd != NULL_VOLDES);
if (vpid->pageid >= vol_pages)
{
/* The page was written in DWB, not in data volume. */
continue;
}
/* Read the page from data volume. */
if (fileio_read (thread_p, vol_fd, iopage, vpid->pageid, IO_PAGESIZE) == NULL)
{
/* There was an error in reading the page. */
ASSERT_ERROR_AND_SET (error_code);
return error_code;
}
error_code = fileio_page_check_corruption (thread_p, iopage, &is_page_corrupted);
if (error_code != NO_ERROR)
{
return error_code;
}
if (!is_page_corrupted)
{
/* The page in data volume is not corrupted. Do not overwrite its content - reset slot VPID. */
VPID_SET_NULL (&p_dwb_ordered_slots[i].vpid);
fileio_initialize_res (thread_p, p_dwb_ordered_slots[i].io_page, IO_PAGESIZE);
continue;
}
/* Corrupted page in data volume. Check DWB. */
error_code = fileio_page_check_corruption (thread_p, p_dwb_ordered_slots[i].io_page, &is_page_corrupted);
if (error_code != NO_ERROR)
{
return error_code;
}
if (is_page_corrupted)
{
/* The page is corrupted in data volume and DWB. Something wrong happened. */
assert_release (false);
dwb_log_error ("Can't recover page = (%d,%d)\n", vpid->volid, vpid->pageid);
return ER_FAILED;
}
/* The page content in data volume will be replaced later with the DWB page content. */
dwb_log ("page = (%d,%d) is recovered with DWB.\n", vpid->volid, vpid->pageid);
num_recoverable_pages++;
}
*p_num_recoverable_pages = num_recoverable_pages;
return NO_ERROR;
}
/*
* dwb_load_and_recover_pages () - Load and recover pages from DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
* dwb_path_p (in): The double write buffer path.
* db_name_p (in): The database name.
*
* Note: This function is called at recovery. The corrupted pages are recovered from double write volume buffer disk.
* Then, double write volume buffer disk is recreated according to user specifications.
* Currently we use a DWB block in memory to recover corrupted page.
*/
int
dwb_load_and_recover_pages (THREAD_ENTRY *thread_p, const char *dwb_path_p, const char *db_name_p)
{
int error_code = NO_ERROR, read_fd = NULL_VOLDES;
unsigned int num_dwb_pages, ordered_slots_length, i;
DWB_BLOCK *rcv_block = NULL;
DWB_SLOT *p_dwb_ordered_slots = NULL;
FILEIO_PAGE *iopage;
int num_recoverable_pages;
assert (dwb_Global.vdes == NULL_VOLDES);
dwb_check_logging ();
fileio_make_dwb_name (dwb_Volume_name, dwb_path_p, db_name_p);
if (fileio_is_volume_exist (dwb_Volume_name))
{
/* Open DWB volume first */
read_fd = fileio_mount (thread_p, boot_db_full_name (), dwb_Volume_name, LOG_DBDWB_VOLID, false, false);
if (read_fd == NULL_VOLDES)
{
return ER_IO_MOUNT_FAIL;
}
num_dwb_pages = fileio_get_number_of_volume_pages (read_fd, IO_PAGESIZE);
dwb_log ("dwb_load_and_recover_pages: The number of pages in DWB %d\n", num_dwb_pages);
/* We are in recovery phase. The system may be restarted with DWB size different than parameter value.
* There may be one of the following two reasons:
* - the user may intentionally modified the DWB size, before restarting.
* - DWB size didn't changed, but the previous DWB was created, partially flushed and the system crashed.
* We know that a valid DWB size must be a power of 2. In this case we recover from DWB. Otherwise, skip
* recovering from DWB - the modifications are not reflected in data pages.
* Another approach would be to recover, even if the DWB size is not a power of 2 (DWB partially flushed).
*/
if ((num_dwb_pages > 0) && IS_POWER_OF_2 (num_dwb_pages))
{
/* Create DWB block for recovery purpose. */
error_code = dwb_create_blocks (thread_p, 1, num_dwb_pages, &rcv_block);
if (error_code != NO_ERROR)
{
goto end;
}
/* Read pages in block write area. This means that slot pages are set. */
if (fileio_read_pages (thread_p, read_fd, rcv_block->write_buffer, 0, num_dwb_pages, IO_PAGESIZE) == NULL)
{
error_code = ER_FAILED;
goto end;
}
/* Set slots VPID and LSA from pages. */
for (i = 0; i < num_dwb_pages; i++)
{
iopage = rcv_block->slots[i].io_page;
VPID_SET (&rcv_block->slots[i].vpid, iopage->prv.volid, iopage->prv.pageid);
LSA_COPY (&rcv_block->slots[i].lsa, &iopage->prv.lsa);
}
rcv_block->count_wb_pages = num_dwb_pages;
/* Order slots by VPID, to flush faster. */
error_code = dwb_block_create_ordered_slots (rcv_block, &p_dwb_ordered_slots, &ordered_slots_length);
if (error_code != NO_ERROR)
{
error_code = ER_FAILED;
goto end;
}
/* Remove duplicates. Normally, we do not expect duplicates in DWB. However, this happens if
* the system crashes in the middle of flushing into double write file. In this case, some pages in DWB
* are from the last DWB flush and the other from the previous DWB flush.
*/
for (i = 0; i < rcv_block->count_wb_pages - 1; i++)
{
DWB_SLOT *s1, *s2;
s1 = &p_dwb_ordered_slots[i];
s2 = &p_dwb_ordered_slots[i + 1];
if (!VPID_ISNULL (&s1->vpid) && VPID_EQ (&s1->vpid, &s2->vpid))
{
/* Next slot contains the same page. Search for the oldest version. */
assert (LSA_LE (&s1->lsa, &s2->lsa));
dwb_log ("dwb_load_and_recover_pages: Found duplicates in DWB at positions = (%d,%d) %d\n",
s1->position_in_block, s2->position_in_block);
if (LSA_LT (&s1->lsa, &s2->lsa))
{
/* Invalidate the oldest page version. */
VPID_SET_NULL (&s1->vpid);
dwb_log ("dwb_load_and_recover_pages: Invalidated the page at position = (%d)\n",
s1->position_in_block);
}
else
{
/* Same LSA. This is the case when page was modified without setting LSA.
* The first appearance in DWB contains the oldest page modification - last flush in DWB!
*/
assert (s1->position_in_block != s2->position_in_block);
if (s1->position_in_block < s2->position_in_block)
{
/* Page of s1 is valid. */
VPID_SET_NULL (&s2->vpid);
dwb_log ("dwb_load_and_recover_pages: Invalidated the page at position = (%d)\n",
s2->position_in_block);
}
else
{
/* Page of s2 is valid. */
VPID_SET_NULL (&s1->vpid);
dwb_log ("dwb_load_and_recover_pages: Invalidated the page at position = (%d)\n",
s1->position_in_block);
}
}
}
}
#if !defined (NDEBUG)
// check sanity of ordered slots
error_code = dwb_debug_check_dwb (thread_p, p_dwb_ordered_slots, num_dwb_pages);
if (error_code != NO_ERROR)
{
goto end;
}
#endif // DEBUG
/* Check whether the data page is corrupted. If the case, it will be replaced with the DWB page. */
error_code = dwb_check_data_page_is_sane (thread_p, rcv_block, p_dwb_ordered_slots, &num_recoverable_pages);
if (error_code != NO_ERROR)
{
goto end;
}
if (0 < num_recoverable_pages)
{
/* Replace the corrupted pages in data volume with the DWB content. */
error_code =
dwb_write_block (thread_p, rcv_block, p_dwb_ordered_slots, ordered_slots_length, false, false);
if (error_code != NO_ERROR)
{
goto end;
}
/* Now, flush the volumes having pages in current block. */
for (i = 0; i < rcv_block->count_flush_volumes_info; i++)
{
/* rcv_block->flush_volumes_info[i].metadata is invalid at this point */
if (fileio_synchronize (thread_p, rcv_block->flush_volumes_info[i].vdes, NULL, true) == NULL_VOLDES)
{
error_code = ER_FAILED;
goto end;
}
dwb_log ("dwb_load_and_recover_pages: Synchronized volume %d\n",
rcv_block->flush_volumes_info[i].vdes);
}
rcv_block->count_flush_volumes_info = 0;
}
assert (rcv_block->count_flush_volumes_info == 0);
}
/* Dismount the file. */
fileio_dismount (thread_p, read_fd);
/* Destroy the old file, since data recovered. */
fileio_unformat (thread_p, dwb_Volume_name);
read_fd = NULL_VOLDES;
}
/* Since old file destroyed, now we can rebuild the new double write buffer with user specifications. */
error_code = dwb_create (thread_p, dwb_path_p, db_name_p);
if (error_code != NO_ERROR)
{
dwb_log_error ("Can't create DWB \n");
}
end:
/* Do not remove the old file if an error occurs. */
if (p_dwb_ordered_slots != NULL)
{
free_and_init (p_dwb_ordered_slots);
}
if (rcv_block != NULL)
{
dwb_finalize_block (rcv_block);
free_and_init (rcv_block);
}
return error_code;
}
/*
* dwb_destroy () - Destroy DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
*/
int
dwb_destroy (THREAD_ENTRY *thread_p)
{
int error_code = NO_ERROR;
UINT64 current_position_with_flags;
error_code = dwb_starts_structure_modification (thread_p, ¤t_position_with_flags);
if (error_code != NO_ERROR)
{
return error_code;
}
/* DWB structure modification started, no other transaction can modify the global position with flags */
if (!DWB_IS_CREATED (current_position_with_flags))
{
/* Not created, nothing to destroy, restore the modification flag. */
goto end;
}
dwb_destroy_internal (thread_p, ¤t_position_with_flags);
end:
/* Ends the modification, allowing to others to modify global position with flags. */
dwb_ends_structure_modification (thread_p, current_position_with_flags);
/* DWB is destroyed, */
#if defined(SERVER_MODE)
dwb_daemons_destroy ();
#endif
return error_code;
}
/*
* dwb_get_volume_name - Get the double write volume name.
* return : DWB volume name.
*/
char *
dwb_get_volume_name (void)
{
if (dwb_is_created ())
{
return dwb_Volume_name;
}
else
{
return NULL;
}
}
/*
* dwb_flush_next_block(): Flush next block.
*
* returns: error code
* thread_p (in): The thread entry.
*/
static int
dwb_flush_next_block (THREAD_ENTRY *thread_p)
{
unsigned int block_no;
DWB_BLOCK *flush_block = NULL;
int error_code = NO_ERROR;
UINT64 position_with_flags;
start:
position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
if (!DWB_IS_CREATED (position_with_flags) || DWB_IS_MODIFYING_STRUCTURE (position_with_flags))
{
return NO_ERROR;
}
dwb_get_next_block_for_flush (thread_p, &block_no);
if (block_no < DWB_NUM_TOTAL_BLOCKS)
{
flush_block = &dwb_Global.blocks[block_no];
/* Flush all pages from current block */
assert (flush_block != NULL && flush_block->count_wb_pages == DWB_BLOCK_NUM_PAGES);
assert ((DWB_GET_PREV_BLOCK (flush_block->block_no)->version > flush_block->version) ||
(flush_block->block_no == 0
&& (DWB_GET_PREV_BLOCK (flush_block->block_no)->version == flush_block->version))
|| (flush_block->version == UINT64_MAX
&& (DWB_GET_PREV_BLOCK (flush_block->block_no)->version < flush_block->version)));
error_code = dwb_flush_block (thread_p, flush_block, true, NULL);
if (error_code != NO_ERROR)
{
/* Something wrong happened. */
dwb_log_error ("Can't flush block = %d having version %lld\n", flush_block->block_no, flush_block->version);
return error_code;
}
dwb_log ("Successfully flushed DWB block = %d having version %lld\n",
flush_block->block_no, flush_block->version);
/* Check whether is another block available for flush. */
goto start;
}
return NO_ERROR;
}
/*
* dwb_flush_force () - Force flushing the current content of DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
* all_sync (out): True, if everything synchronized.
*/
int
dwb_flush_force (THREAD_ENTRY *thread_p, bool *all_sync)
{
UINT64 initial_position_with_flags, current_position_with_flags, prev_position_with_flags;
UINT64 initial_block_version, current_block_version;
int initial_block_no, current_block_no = DWB_NUM_TOTAL_BLOCKS;
char page_buf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT];
FILEIO_PAGE *iopage = NULL;
VPID null_vpid = { NULL_VOLID, NULL_PAGEID };
int error_code = NO_ERROR;
DWB_SLOT *dwb_slot = NULL;
unsigned int count_added_pages = 0, max_pages_to_add = 0, initial_num_pages = 0;
DWB_BLOCK *initial_block;
PERF_UTIME_TRACKER time_track;
int block_no;
assert (all_sync != NULL);
PERF_UTIME_TRACKER_START (thread_p, &time_track);
*all_sync = false;
start:
initial_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
dwb_log ("dwb_flush_force: Started with initital position = %lld\n", initial_position_with_flags);
#if !defined (NDEBUG)
if (dwb_Global.blocks != NULL)
{
for (block_no = 0; block_no < (int) DWB_NUM_TOTAL_BLOCKS; block_no++)
{
dwb_log_error ("dwb_flush_force start: Block %d, Num pages = %d, version = %lld\n",
block_no, dwb_Global.blocks[block_no].count_wb_pages, dwb_Global.blocks[block_no].version);
}
}
#endif
if (DWB_NOT_CREATED_OR_MODIFYING (initial_position_with_flags))
{
if (!DWB_IS_CREATED (initial_position_with_flags))
{
/* Nothing to do. Everything flushed. */
assert (dwb_Global.file_sync_helper_block == NULL);
dwb_log ("dwb_flush_force: Everything flushed\n");
goto end;
}
if (DWB_IS_MODIFYING_STRUCTURE (initial_position_with_flags))
{
/* DWB structure change started, needs to wait for flush. */
error_code = dwb_wait_for_strucure_modification (thread_p);
if (error_code != NO_ERROR)
{
if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
/* timeout, try again */
goto start;
}
dwb_log_error ("dwb_flush_force : Error %d while waiting for structure modification=%lld\n",
error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
return error_code;
}
}
}
if (DWB_GET_BLOCK_STATUS (initial_position_with_flags) == 0)
{
/* Check helper flush block. */
initial_block_no = DWB_GET_BLOCK_NO_FROM_POSITION (initial_position_with_flags);
initial_block = dwb_Global.file_sync_helper_block;
if (initial_block == NULL)
{
/* Nothing to flush. */
goto end;
}
goto wait_for_file_sync_helper_block;
}
/* Search for latest not flushed block - not flushed yet, having highest version. */
initial_block_no = -1;
initial_block_version = 0;
for (block_no = 0; block_no < (int) DWB_NUM_TOTAL_BLOCKS; block_no++)
{
if (DWB_IS_BLOCK_WRITE_STARTED (initial_position_with_flags, block_no)
&& (dwb_Global.blocks[block_no].version >= initial_block_version))
{
initial_block_no = block_no;
initial_block_version = dwb_Global.blocks[initial_block_no].version;
}
}
/* At least one block was not flushed. */
assert (initial_block_no != -1);
initial_num_pages = dwb_Global.blocks[initial_block_no].count_wb_pages;
if (initial_position_with_flags != ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL))
{
/* The position_with_flags was modified meanwhile by concurrent threads. */
goto start;
}
prev_position_with_flags = initial_position_with_flags;
max_pages_to_add = DWB_BLOCK_NUM_PAGES - initial_num_pages;
iopage = (FILEIO_PAGE *) PTR_ALIGN (page_buf, MAX_ALIGNMENT);
memset (iopage, 0, IO_MAX_PAGE_SIZE);
fileio_initialize_res (thread_p, iopage, IO_PAGESIZE);
dwb_log ("dwb_flush_force: Waits for flushing the block %d having version %lld and %d pages\n",
initial_block_no, initial_block_version, initial_num_pages);
/* Check whether the initial block was flushed */
check_flushed_blocks:
assert (initial_block_no >= 0);
if ((ATOMIC_INC_32 (&dwb_Global.blocks_flush_counter, 0) > 0)
&& (ATOMIC_INC_32 (&dwb_Global.next_block_to_flush, 0) == (unsigned int) initial_block_no)
&& (ATOMIC_INC_32 (&dwb_Global.blocks[initial_block_no].count_wb_pages, 0) == DWB_BLOCK_NUM_PAGES))
{
/* The initial block is currently flushing, wait for it. */
error_code = dwb_wait_for_block_completion (thread_p, initial_block_no);
if (error_code != NO_ERROR)
{
if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
/* timeout, try again */
goto check_flushed_blocks;
}
dwb_log_error ("dwb_flush_force : Error %d while waiting for block completion = %lld\n",
error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
return error_code;
}
goto check_flushed_blocks;
}
/* Read again the current position and check whether initial block was flushed. */
current_position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
if (DWB_NOT_CREATED_OR_MODIFYING (current_position_with_flags))
{
if (!DWB_IS_CREATED (current_position_with_flags))
{
/* Nothing to do. Everything flushed. */
assert (dwb_Global.file_sync_helper_block == NULL);
dwb_log ("dwb_flush_force: Everything flushed\n");
goto end;
}
if (DWB_IS_MODIFYING_STRUCTURE (current_position_with_flags))
{
/* DWB structure change started, needs to wait for flush. */
error_code = dwb_wait_for_strucure_modification (thread_p);
if (error_code != NO_ERROR)
{
if (error_code == ER_CSS_PTHREAD_COND_TIMEDOUT)
{
/* timeout, try again */
goto check_flushed_blocks;
}
dwb_log_error ("dwb_flush_force : Error %d while waiting for structure modification = %lld\n",
error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
return error_code;
}
}
}
if (!DWB_IS_BLOCK_WRITE_STARTED (current_position_with_flags, initial_block_no))
{
/* Check helper flush block. */
goto wait_for_file_sync_helper_block;
}
/* Check whether initial block content was overwritten. */
current_block_no = DWB_GET_BLOCK_NO_FROM_POSITION (current_position_with_flags);
current_block_version = dwb_Global.blocks[current_block_no].version;
if ((current_block_no == initial_block_no) && (current_block_version != initial_block_version))
{
assert (current_block_version > initial_block_version);
/* Check helper flush block. */
goto wait_for_file_sync_helper_block;
}
if (current_position_with_flags == prev_position_with_flags && count_added_pages < max_pages_to_add)
{
/* The system didn't advanced, add null pages to force flush block. */
dwb_slot = NULL;
error_code = dwb_add_page (thread_p, iopage, &null_vpid, false, &dwb_slot);
if (error_code != NO_ERROR)
{
dwb_log_error ("dwb_flush_force : Error %d while adding page = %lld\n",
error_code, ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
return error_code;
}
else if (dwb_slot == NULL)
{
/* DWB disabled meanwhile, everything flushed. */
assert (dwb_Global.file_sync_helper_block == NULL);
assert (!DWB_IS_CREATED (ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL)));
dwb_log ("dwb_flush_force: DWB disabled = %lld\n", ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
goto end;
}
count_added_pages++;
}
prev_position_with_flags = current_position_with_flags;
goto check_flushed_blocks;
wait_for_file_sync_helper_block:
dwb_log ("dwb_flush_force: Wait for helper flush = %lld\n", ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
initial_block = &dwb_Global.blocks[initial_block_no];
#if defined (SERVER_MODE)
while (dwb_Global.file_sync_helper_block == initial_block)
{
/* Wait for file sync helper thread to finish. */
thread_sleep (1);
}
#endif
end:
*all_sync = true;
dwb_log ("dwb_flush_force: Ended with position = %lld\n", ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL));
PERF_UTIME_TRACKER_TIME_AND_RESTART (thread_p, &time_track, PSTAT_DWB_FLUSH_FORCE_TIME_COUNTERS);
#if !defined (NDEBUG)
if (dwb_Global.blocks != NULL)
{
for (block_no = 0; block_no < (int) DWB_NUM_TOTAL_BLOCKS; block_no++)
{
dwb_log_error ("dwb_flush_force end: Block %d, Num pages = %d, version = %lld\n",
block_no, dwb_Global.blocks[block_no].count_wb_pages, dwb_Global.blocks[block_no].version);
}
}
#endif
return NO_ERROR;
}
/*
* dwb_file_sync_helper () - Helps file sync.
*
* return : Error code.
* thread_p (in): Thread entry.
*/
static int
dwb_file_sync_helper (THREAD_ENTRY *thread_p)
{
unsigned int i;
int num_pages, num_pages2, num_pages_to_sync;
DWB_BLOCK *block = NULL;
UINT64 position_with_flags;
FLUSH_VOLUME_INFO *current_flush_volume_info = NULL;
unsigned int count_flush_volumes_info = 0;
bool all_block_pages_written = false, need_wait = false, can_flush_volume = false;
unsigned int start_flush_volume = 0;
int first_partial_flushed_volume = -1;
PERF_UTIME_TRACKER time_track;
PERF_UTIME_TRACKER_START (thread_p, &time_track);
num_pages_to_sync = prm_get_integer_value (PRM_ID_PB_SYNC_ON_NFLUSH);
position_with_flags = ATOMIC_INC_64 (&dwb_Global.position_with_flags, 0ULL);
if (!DWB_IS_CREATED (position_with_flags) || DWB_IS_MODIFYING_STRUCTURE (position_with_flags))
{
/* Needs to modify structure. Stop flushing. */
return NO_ERROR;
}
block = (DWB_BLOCK *) dwb_Global.file_sync_helper_block;
if (block == NULL)
{
return NO_ERROR;
}
do
{
count_flush_volumes_info = block->count_flush_volumes_info;
all_block_pages_written = block->all_pages_written;
need_wait = false;
first_partial_flushed_volume = -1;
for (i = start_flush_volume; i < count_flush_volumes_info; i++)
{
current_flush_volume_info = &block->flush_volumes_info[i];
if (current_flush_volume_info->flushed_status != VOLUME_FLUSHED_BY_DWB_FILE_SYNC_HELPER_THREAD)
{
if (!ATOMIC_CAS_32 (¤t_flush_volume_info->flushed_status, VOLUME_NOT_FLUSHED,
VOLUME_FLUSHED_BY_DWB_FILE_SYNC_HELPER_THREAD))
{
/* Flushed by DWB flusher, skip it. */
assert_release (current_flush_volume_info->flushed_status == VOLUME_FLUSHED_BY_DWB_FLUSH_THREAD);
continue;
}
}
/* I'm the flusher of the volume. */
assert_release (current_flush_volume_info->flushed_status == VOLUME_FLUSHED_BY_DWB_FILE_SYNC_HELPER_THREAD);
num_pages = ATOMIC_INC_32 (¤t_flush_volume_info->num_pages, 0);
if (num_pages < num_pages_to_sync)
{
if (current_flush_volume_info->all_pages_written == true)
{
if (num_pages == 0)
{
/* Already flushed. */
continue;
}
/* Needs flushing. */
}
else
{
/* Not enough pages, check the other volumes and retry. */
assert (all_block_pages_written == false);
if (first_partial_flushed_volume == -1)
{
first_partial_flushed_volume = i;
}
need_wait = true;
break;
}
}
else if (current_flush_volume_info->all_pages_written == false)
{
if (first_partial_flushed_volume == -1)
{
first_partial_flushed_volume = i;
}
}
/* Reset the number of pages in volume. */
num_pages2 = ATOMIC_TAS_32 (¤t_flush_volume_info->num_pages, 0);
assert_release (num_pages2 >= num_pages);
/*
* Flush the volume. If not all volume pages are available now, continue with next volume, if any,
* and then resume the current one.
*/
(void) fileio_synchronize (thread_p, current_flush_volume_info->vdes, NULL, current_flush_volume_info->metadata);
dwb_log ("dwb_file_sync_helper: Synchronized volume %d\n", current_flush_volume_info->vdes);
}
/* Set next volume to flush. */
if (first_partial_flushed_volume != -1)
{
assert ((first_partial_flushed_volume >= 0)
&& ((unsigned int) first_partial_flushed_volume < count_flush_volumes_info));
/* Continue with partial flushed volume. */
start_flush_volume = first_partial_flushed_volume;
}
else
{
/* Continue with the next volume. */
start_flush_volume = count_flush_volumes_info;
}
can_flush_volume = false;
if (count_flush_volumes_info != block->count_flush_volumes_info)
{
/* Do not wait since a new volume arrived. */
can_flush_volume = true;
}
else if (all_block_pages_written == false)
{
/* Not all pages written at the beginning of the iteration, check whether new data arrived. */
if (first_partial_flushed_volume != -1)
{
current_flush_volume_info = &block->flush_volumes_info[first_partial_flushed_volume];
if ((ATOMIC_INC_32 (¤t_flush_volume_info->num_pages, 0) < num_pages_to_sync)
&& (current_flush_volume_info->all_pages_written == false))
{
/* Needs more data. */
need_wait = true;
}
else
{
/* New data arrived. */
can_flush_volume = true;
}
}
else if (block->all_pages_written == false)
{
/* Not all pages were written and no volume available for flush yet. */
need_wait = true;
}
else
{
can_flush_volume = true;
}
}
if (!can_flush_volume)
{
/* Can't flush a volume. Not enough data available or nothing to flush. */
if (need_wait == true)
{
/* Wait for new data. */
#if defined (SERVER_MODE)
thread_sleep (1);
#endif
/* Flush the new arrived data, if is the case. */
can_flush_volume = true;
}
}
}
while (can_flush_volume == true);
#if !defined (NDEBUG)
if (count_flush_volumes_info != 0)
{
assert (count_flush_volumes_info == block->count_flush_volumes_info);
for (i = 0; i < count_flush_volumes_info; i++)
{
current_flush_volume_info = &block->flush_volumes_info[i];
assert ((current_flush_volume_info->all_pages_written == true)
&& (current_flush_volume_info->flushed_status != VOLUME_NOT_FLUSHED));
}
}
#endif
/* Be sure that the helper flush block was not changed by other thread. */
assert (block == dwb_Global.file_sync_helper_block);
(void) ATOMIC_TAS_ADDR (&dwb_Global.file_sync_helper_block, (DWB_BLOCK *) NULL);
PERF_UTIME_TRACKER_TIME (thread_p, &time_track, PSTAT_DWB_FILE_SYNC_HELPER_TIME_COUNTERS);
return NO_ERROR;
}
/*
* dwb_read_page () - Reads page from DWB.
*
* return : Error code.
* thread_p (in): The thread entry.
* vpid(in): The page identifier.
* io_page(out): In-memory address where the content of the page will be copied.
* success(out): True, if found and read from DWB.
*/
int
dwb_read_page (THREAD_ENTRY *thread_p, const VPID *vpid, void *io_page, bool *success)
{
DWB_SLOTS_HASH_ENTRY *slots_hash_entry = NULL;
int error_code = NO_ERROR;
assert (vpid != NULL && io_page != NULL && success != NULL);
*success = false;
if (!dwb_is_created ())
{
return NO_ERROR;
}
VPID key_vpid = *vpid;
slots_hash_entry = dwb_Global.slots_hashmap.find (thread_p, key_vpid);
if (slots_hash_entry != NULL)
{
assert (slots_hash_entry->slot->io_page != NULL);
/* Check whether the slot data changed meanwhile. */
if (VPID_EQ (&slots_hash_entry->slot->vpid, vpid))
{
memcpy ((char *) io_page, (char *) slots_hash_entry->slot->io_page, IO_PAGESIZE);
/* Be sure that no other transaction has modified slot data meanwhile. */
assert (slots_hash_entry->slot->io_page->prv.pageid == vpid->pageid
&& slots_hash_entry->slot->io_page->prv.volid == vpid->volid);
*success = true;
}
pthread_mutex_unlock (&slots_hash_entry->mutex);
}
return NO_ERROR;
}
#if defined(SERVER_MODE)
// class dwb_flush_block_daemon_task
//
// description:
// dwb flush block daemon task
//
class dwb_flush_block_daemon_task: public cubthread::entry_task
{
private:
PERF_UTIME_TRACKER m_perf_track;
public:
dwb_flush_block_daemon_task ()
{
PERF_UTIME_TRACKER_START (NULL, &m_perf_track);
}
void execute (cubthread::entry &thread_ref) override
{
if (!BO_IS_FLUSH_DAEMON_AVAILABLE ())
{
return;
}
/* performance tracking */
PERF_UTIME_TRACKER_TIME (NULL, &m_perf_track, PSTAT_DWB_FLUSH_BLOCK_COND_WAIT);
/* flush pages as long as necessary */
if (prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true)
{
if (dwb_flush_next_block (&thread_ref) != NO_ERROR)
{
assert_release (false);
}
}
PERF_UTIME_TRACKER_START (&thread_ref, &m_perf_track);
}
};
// class dwb_file_sync_helper_daemon_task
//
// description:
// dwb file sync helper daemon task
//
void
dwb_file_sync_helper_execute (cubthread::entry &thread_ref)
{
if (!BO_IS_FLUSH_DAEMON_AVAILABLE ())
{
return;
}
/* flush pages as long as necessary */
if (prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true)
{
dwb_file_sync_helper (&thread_ref);
}
}
/*
* dwb_flush_block_daemon_init () - initialize DWB flush block daemon thread
*/
REGISTER_DAEMON (dwb_flush_block);
void
dwb_flush_block_daemon_init ()
{
cubthread::looper looper = cubthread::looper (std::chrono::milliseconds (1));
dwb_flush_block_daemon_task *daemon_task = new dwb_flush_block_daemon_task ();
dwb_flush_block_daemon = cubthread::get_manager ()->create_daemon (looper, daemon_task, "dwb-flush-block");
}
/*
* dwb_file_sync_helper_daemon_init () - initialize DWB file sync helper daemon thread
*/
REGISTER_DAEMON (dwb_file_sync_helper);
void
dwb_file_sync_helper_daemon_init ()
{
cubthread::looper looper = cubthread::looper (std::chrono::milliseconds (10));
cubthread::entry_callable_task *daemon_task = new cubthread::entry_callable_task (dwb_file_sync_helper_execute);
dwb_file_sync_helper_daemon = cubthread::get_manager ()->create_daemon (looper, daemon_task, "dwb-file-sync");
}
/*
* dwb_daemons_init () - initialize DWB daemon threads
*/
void
dwb_daemons_init ()
{
dwb_flush_block_daemon_init ();
dwb_file_sync_helper_daemon_init ();
}
/*
* dwb_daemons_destroy () - destroy DWB daemon threads
*/
void
dwb_daemons_destroy ()
{
cubthread::get_manager ()->destroy_daemon (dwb_flush_block_daemon);
cubthread::get_manager ()->destroy_daemon (dwb_file_sync_helper_daemon);
}
#endif /* SERVER_MODE */
/*
* dwb_is_flush_block_daemon_available () - Check if flush block daemon is available
* return: true if flush block daemon is available, false otherwise
*/
static bool
dwb_is_flush_block_daemon_available (void)
{
#if defined (SERVER_MODE)
return prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && dwb_flush_block_daemon != NULL;
#else
return false;
#endif
}
/*
* dwb_is_file_sync_helper_daemon_available () - Check if file sync helper daemon is available
* return: true if file sync helper daemon is available, false otherwise
*/
static bool
dwb_is_file_sync_helper_daemon_available (void)
{
#if defined (SERVER_MODE)
return prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && dwb_file_sync_helper_daemon != NULL;
#else
return false;
#endif
}
/*
* dwb_flush_block_daemon_is_running () - Check whether flush block daemon is running
*
* return: true, if flush block thread is running
*/
static bool
dwb_flush_block_daemon_is_running (void)
{
#if defined (SERVER_MODE)
return (prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && (dwb_flush_block_daemon != NULL)
&& (dwb_flush_block_daemon->is_running ()));
#else
return false;
#endif /* SERVER_MODE */
}
/*
* dwb_file_sync_helper_daemon_is_running () - Check whether file sync helper daemon is running
*
* return: true, if file sync helper thread is running
*/
static bool
dwb_file_sync_helper_daemon_is_running (void)
{
#if defined (SERVER_MODE)
return (prm_get_bool_value (PRM_ID_ENABLE_DWB_FLUSH_THREAD) == true && (dwb_file_sync_helper_daemon != NULL)
&& (dwb_file_sync_helper_daemon->is_running ()));
#else
return false;
#endif /* SERVER_MODE */
}