File btree_load.c¶
File List > cubrid > src > storage > btree_load.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* btree_load.c - B+-Tree Loader
*/
#ident "$Id$"
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "btree_load.h"
#include "btree.h"
#include "deduplicate_key.h"
#include "external_sort.h"
#include "heap_file.h"
#include "log_append.hpp"
#include "log_manager.h"
#include "memory_alloc.h"
#include "memory_private_allocator.hpp"
#include "mvcc.h"
#include "object_primitive.h"
#include "object_representation.h"
#include "object_representation_sr.h"
#include "partition.h"
#include "partition_sr.h"
#include "query_executor.h"
#include "query_opfunc.h"
#include "server_support.h"
#include "stream_to_xasl.h"
#include "thread_manager.hpp"
#include "thread_entry_task.hpp"
#include "xserver_interface.h"
#include "xasl.h"
#include "xasl_unpack_info.hpp"
#include "px_ftab_set.hpp"
#include "bit.h"
#ifndef NDEBUG
#include "db_value_printer.hpp"
#endif
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
typedef struct btree_page BTREE_PAGE;
struct btree_page
{
VPID vpid;
PAGE_PTR pgptr;
BTREE_NODE_HEADER hdr;
};
typedef struct load_args LOAD_ARGS;
struct load_args
{ /* This structure is never written to disk; thus logical ordering of fields is ok. */
BTID_INT *btid;
const char *bt_name; /* index name */
RECDES *out_recdes; /* Pointer to current record descriptor collecting objects. */
RECDES leaf_nleaf_recdes; /* Record descriptor used for leaf and non-leaf records. */
RECDES ovf_recdes; /* Record descriptor used for overflow OID's records. */
char *new_pos; /* Current pointer in record being built. */
DB_VALUE current_key; /* Current key value */
int max_key_size; /* The maximum key size encountered so far; used for string types */
int cur_key_len; /* The length of the current key */
/* Linked list variables */
BTREE_NODE *push_list;
BTREE_NODE *pop_list;
/* Variables for managing non-leaf, leaf & overflow pages */
BTREE_PAGE nleaf;
BTREE_PAGE leaf;
BTREE_PAGE ovf;
bool overflowing; /* Currently, are we filling in an overflow page (then, true); or a regular leaf page
* (then, false) */
int n_keys; /* Number of keys - note that in the context of MVCC, only keys that have at least one
* non-deleted object are counted. */
int curr_non_del_obj_count; /* Number of objects that have not been deleted. Unique indexes must have only one such
* object. */
int curr_rec_max_obj_count; /* Maximum number of objects for current record. */
int curr_rec_obj_count; /* Current number of record objects. */
PGSLOTID last_leaf_insert_slotid; /* Slotid of last inserted leaf record. */
VPID vpid_first_leaf;
};
typedef struct btree_scan_partition_info BTREE_SCAN_PART;
struct btree_scan_partition_info
{
BTREE_SCAN bt_scan; /* Holds information regarding the scan of the current partition. */
BTREE_NODE_HEADER *header; /* Header info for current partition */
int key_cnt; /* Number of keys in current page in the current partition. */
PRUNING_CONTEXT pcontext; /* Pruning context for current partition. */
BTID btid; /* BTID of the current partition. */
};
// *INDENT-OFF*
class index_builder_loader_context : public cubthread::entry_manager
{
public:
std::atomic_bool m_has_error;
std::atomic<std::uint64_t> m_tasks_executed;
int m_error_code;
const TP_DOMAIN *m_key_type;
css_conn_entry *m_conn;
index_builder_loader_context () = default;
protected:
void on_create (context_type &context) override;
void on_retire (context_type &context) override;
void on_recycle (context_type &context) override;
};
class index_builder_loader_task: public cubthread::entry_task
{
private:
BTID m_btid;
OID m_class_oid;
int m_unique_pk;
index_builder_loader_context &m_load_context; // Loader context.
btree_insert_list m_insert_list;
size_t m_memsize;
std::atomic<int> &m_num_keys;
std::atomic<int> &m_num_oids;
std::atomic<int> &m_num_nulls;
public:
enum batch_key_status
{
BATCH_EMPTY,
BATCH_CONTINUE,
BATCH_FULL
};
index_builder_loader_task () = delete;
index_builder_loader_task (const BTID *btid, const OID *class_oid, int unique_pk,
index_builder_loader_context &load_context, std::atomic<int> &num_keys,
std::atomic<int> &num_oids, std::atomic<int> &num_nulls);
~index_builder_loader_task ();
// add key to key set and return true if task is ready for execution, false otherwise
batch_key_status add_key (const DB_VALUE *key, const OID &oid);
bool has_keys () const;
void execute (cubthread::entry &thread_ref);
private:
void clear_keys ();
};
// *INDENT-ON*
static int btree_save_last_leafrec (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args);
static PAGE_PTR btree_connect_page (THREAD_ENTRY * thread_p, DB_VALUE * key, int max_key_len, VPID * pageid,
LOAD_ARGS * load_args, int node_level);
static int btree_build_nleafs (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, int n_nulls, int n_oids, int n_keys);
static void btree_log_page (THREAD_ENTRY * thread_p, VFID * vfid, PAGE_PTR page_ptr);
static int btree_load_new_page (THREAD_ENTRY * thread_p, const BTID * btid, BTREE_NODE_HEADER * header, int node_level,
VPID * vpid_new, PAGE_PTR * page_new);
static PAGE_PTR btree_proceed_leaf (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args);
static int btree_first_oid (THREAD_ENTRY * thread_p, DB_VALUE * this_key, OID * class_oid, OID * first_oid,
MVCC_REC_HEADER * p_mvcc_rec_header, LOAD_ARGS * load_args);
static int btree_construct_leafs (THREAD_ENTRY * thread_p, const RECDES * in_recdes, void *arg);
static int btree_get_value_from_leaf_slot (THREAD_ENTRY * thread_p, BTID_INT * btid_int, PAGE_PTR leaf_ptr,
int slot_id, DB_VALUE * key, bool * clear_key);
#if defined(CUBRID_DEBUG)
static int btree_dump_sort_output (const RECDES * recdes, LOAD_ARGS * load_args);
#endif /* defined(CUBRID_DEBUG) */
static int btree_index_sort (THREAD_ENTRY * thread_p, SORT_ARGS * sort_args, SORT_PUT_FUNC * out_func, void *out_args);
static SORT_STATUS btree_sort_get_next (THREAD_ENTRY * thread_p, RECDES * temp_recdes, void *arg);
static int compare_driver (const void *first, const void *second, void *arg);
static int list_add (BTREE_NODE ** list, VPID * pageid);
static void list_remove_first (BTREE_NODE ** list);
static void list_clear (BTREE_NODE * list);
static int list_length (const BTREE_NODE * this_list);
#if defined(CUBRID_DEBUG)
static void list_print (const BTREE_NODE * this_list);
#endif /* defined(CUBRID_DEBUG) */
static int btree_pack_root_header (RECDES * Rec, BTREE_ROOT_HEADER * header, TP_DOMAIN * key_type);
static void btree_rv_save_root_head (long long null_delta, long long oid_delta, long long key_delta, RECDES * recdes);
static int btree_advance_to_next_slot_and_fix_page (THREAD_ENTRY * thread_p, BTID_INT * btid, VPID * vpid,
PAGE_PTR * pg_ptr, INT16 * slot_id, DB_VALUE * key,
bool * clear_key, bool is_desc, int *key_cnt,
BTREE_NODE_HEADER ** header, MVCC_SNAPSHOT * mvcc);
static int btree_load_check_fk (THREAD_ENTRY * thread_p, const LOAD_ARGS * load_args_local,
const SORT_ARGS * sort_args_local);
static int btree_is_slot_visible (THREAD_ENTRY * thread_p, BTID_INT * btid, PAGE_PTR pg_ptr,
MVCC_SNAPSHOT * mvcc_snapshot, int slot_id, bool * is_slot_visible);
static int online_index_builder (THREAD_ENTRY * thread_p, BTID_INT * btid_int, HFID * hfids, OID * class_oids,
int n_classes, int *attrids, int n_attrs, FUNCTION_INDEX_INFO func_idx_info,
PRED_EXPR_WITH_CONTEXT * filter_pred, int *attrs_prefix_length,
HEAP_CACHE_ATTRINFO * attr_info, HEAP_SCANCACHE * scancache, int unique_pk,
int ib_thread_count, TP_DOMAIN * key_type);
static bool btree_is_worker_pool_logging_true ();
typedef struct
{
OID class_oid; /* Class OID value in this sorted item */
OID rec_oid; /* OID value in this sorted item */
MVCC_REC_HEADER mvcc_header;
DB_VALUE this_key; /* Key value in this sorted item (specified with in_recdes) */
BTREE_MVCC_INFO mvcc_info;
bool is_btree_ops_log;
OID orig_oid;
OID orig_class_oid;
MVCC_REC_HEADER orig_mvcc_header;
} S_PARAM_ST;
static int bt_load_put_buf_to_record (RECDES * recdes, SORT_ARGS * sort_args, int value_has_null, OID * rec_oid,
MVCC_REC_HEADER * mvcc_header, DB_VALUE * dbvalue_ptr, int key_len,
int cur_class, bool is_btree_ops_log);
static int bt_load_get_buf_from_record (RECDES * recdes, LOAD_ARGS * load_args, S_PARAM_ST * pparam, bool copy);
static int bt_load_get_first_leaf_page_and_init_args (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args,
S_PARAM_ST * pparam);
static int bt_load_make_new_record_on_leaf_page (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam,
int *sp_success);
static int bt_load_invalidate_mvcc_del_id (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam);
static int bt_load_nospace_for_new_oid (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, int *sp_success);
static int bt_load_add_same_key_to_record (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam,
int *sp_success);
static int bt_load_notify_to_vacuum (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam,
char **notify_vacuum_rv_data, char *notify_vacuum_rv_data_bufalign);
/*
* btree_get_node_header () -
*
* return:
* page_ptr(in):
*
*/
BTREE_NODE_HEADER *
btree_get_node_header (THREAD_ENTRY * thread_p, PAGE_PTR page_ptr)
{
RECDES header_record;
BTREE_NODE_HEADER *header = NULL;
assert (page_ptr != NULL);
#if !defined(NDEBUG)
(void) pgbuf_check_page_ptype (thread_p, page_ptr, PAGE_BTREE);
#endif
if (spage_get_record (thread_p, page_ptr, HEADER, &header_record, PEEK) != S_SUCCESS)
{
assert_release (false);
return NULL;
}
header = (BTREE_NODE_HEADER *) header_record.data;
if (header != NULL)
{
assert (header->max_key_len >= 0);
}
return header;
}
/*
* btree_get_root_header () -
*
* return:
* page_ptr(in):
*
*/
BTREE_ROOT_HEADER *
btree_get_root_header (THREAD_ENTRY * thread_p, PAGE_PTR page_ptr)
{
RECDES header_record;
BTREE_ROOT_HEADER *root_header = NULL;
assert (page_ptr != NULL);
#if !defined(NDEBUG)
(void) pgbuf_check_page_ptype (thread_p, page_ptr, PAGE_BTREE);
#endif
if (spage_get_record (thread_p, page_ptr, HEADER, &header_record, PEEK) != S_SUCCESS)
{
assert_release (false);
return NULL;
}
root_header = (BTREE_ROOT_HEADER *) header_record.data;
if (root_header != NULL)
{
assert (root_header->node.node_level > 0);
assert (root_header->node.max_key_len >= 0);
}
return root_header;
}
/*
* btree_get_overflow_header () -
*
* return:
* page_ptr(in):
*
*/
BTREE_OVERFLOW_HEADER *
btree_get_overflow_header (THREAD_ENTRY * thread_p, PAGE_PTR page_ptr)
{
RECDES header_record;
assert (page_ptr != NULL);
#if !defined(NDEBUG)
(void) pgbuf_check_page_ptype (thread_p, page_ptr, PAGE_BTREE);
#endif
if (spage_get_record (thread_p, page_ptr, HEADER, &header_record, PEEK) != S_SUCCESS)
{
assert_release (false);
return NULL;
}
return (BTREE_OVERFLOW_HEADER *) header_record.data;
}
/*
* btree_init_node_header () - insert btree node header
*
* return:
* vfid(in):
* page_ptr(in):
* header(in):
*
*/
int
btree_init_node_header (THREAD_ENTRY * thread_p, const VFID * vfid, PAGE_PTR page_ptr, BTREE_NODE_HEADER * header,
bool redo)
{
RECDES rec;
char rec_buf[IO_MAX_PAGE_SIZE + BTREE_MAX_ALIGN];
assert (header != NULL);
assert (header->node_level > 0);
assert (header->max_key_len >= 0);
/* create header record */
rec.area_size = DB_PAGESIZE;
rec.data = PTR_ALIGN (rec_buf, BTREE_MAX_ALIGN);
rec.type = REC_HOME;
rec.length = sizeof (BTREE_NODE_HEADER);
memcpy (rec.data, header, sizeof (BTREE_NODE_HEADER));
if (redo == true)
{
/* log the new header record for redo purposes */
log_append_redo_data2 (thread_p, RVBT_NDHEADER_INS, vfid, page_ptr, HEADER, rec.length, rec.data);
}
if (spage_insert_at (thread_p, page_ptr, HEADER, &rec) != SP_SUCCESS)
{
return ER_FAILED;
}
return NO_ERROR;
}
/*
* btree_node_header_undo_log () -
*
* return:
* vfid(in):
* page_ptr(in):
*
*/
int
btree_node_header_undo_log (THREAD_ENTRY * thread_p, VFID * vfid, PAGE_PTR page_ptr)
{
RECDES rec;
if (spage_get_record (thread_p, page_ptr, HEADER, &rec, PEEK) != S_SUCCESS)
{
return ER_FAILED;
}
log_append_undo_data2 (thread_p, RVBT_NDHEADER_UPD, vfid, page_ptr, HEADER, rec.length, rec.data);
return NO_ERROR;
}
/*
* btree_node_header_redo_log () -
*
* return:
* vfid(in):
* page_ptr(in):
*
*/
int
btree_node_header_redo_log (THREAD_ENTRY * thread_p, VFID * vfid, PAGE_PTR page_ptr)
{
RECDES rec;
if (spage_get_record (thread_p, page_ptr, HEADER, &rec, PEEK) != S_SUCCESS)
{
return ER_FAILED;
}
log_append_redo_data2 (thread_p, RVBT_NDHEADER_UPD, vfid, page_ptr, HEADER, rec.length, rec.data);
return NO_ERROR;
}
/*
* btree_change_root_header_delta () -
*
* return:
* vfid(in):
* page_ptr(in):
* null_delta(in):
* oid_delta(in):
* key_delta(in):
*
*/
int
btree_change_root_header_delta (THREAD_ENTRY * thread_p, VFID * vfid, PAGE_PTR page_ptr, long long null_delta,
long long oid_delta, long long key_delta)
{
RECDES rec, delta_rec;
char delta_rec_buf[(3 * OR_BIGINT_SIZE) + BTREE_MAX_ALIGN];
BTREE_ROOT_HEADER *root_header = NULL;
int old_version;
delta_rec.data = NULL;
delta_rec.area_size = 3 * OR_BIGINT_SIZE;
delta_rec.data = PTR_ALIGN (delta_rec_buf, BTREE_MAX_ALIGN);
if (spage_get_record (thread_p, page_ptr, HEADER, &rec, PEEK) != S_SUCCESS)
{
return ER_FAILED;
}
root_header = (BTREE_ROOT_HEADER *) rec.data;
root_header->num_nulls += null_delta;
root_header->num_oids += oid_delta;
root_header->num_keys += key_delta;
/* save root head for undo purposes */
btree_rv_save_root_head (-null_delta, -oid_delta, -key_delta, &delta_rec);
log_append_undoredo_data2 (thread_p, RVBT_ROOTHEADER_UPD, vfid, page_ptr, HEADER, delta_rec.length, rec.length,
delta_rec.data, rec.data);
return NO_ERROR;
}
/*
* btree_pack_root_header () -
* return:
* rec(out):
* root_header(in):
*
* Note: Writes the first record (header record) for a root page.
* rec must be long enough to hold the header record.
*/
static int
btree_pack_root_header (RECDES * rec, BTREE_ROOT_HEADER * root_header, TP_DOMAIN * key_type)
{
OR_BUF buf;
int rc = NO_ERROR;
int fixed_size = (int) offsetof (BTREE_ROOT_HEADER, packed_key_domain);
memcpy (rec->data, root_header, fixed_size);
or_init (&buf, rec->data + fixed_size, (rec->area_size == -1) ? -1 : (rec->area_size - fixed_size));
rc = or_put_domain (&buf, key_type, 0, 0);
rec->length = fixed_size + CAST_BUFLEN (buf.ptr - buf.buffer);
rec->type = REC_HOME;
if (rc != NO_ERROR && er_errid () == NO_ERROR)
{
/* if an error occurs then set a generic error so that at least an error to be send to client. */
if (er_errid () == NO_ERROR)
{
assert (false);
er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
}
}
return rc;
}
/*
* btree_init_root_header () - insert btree node header
*
* return:
* vfid(in):
* page_ptr(in):
* root_header(in):
*
*/
int
btree_init_root_header (THREAD_ENTRY * thread_p, VFID * vfid, PAGE_PTR page_ptr, BTREE_ROOT_HEADER * root_header,
TP_DOMAIN * key_type)
{
RECDES rec;
char copy_rec_buf[IO_MAX_PAGE_SIZE + BTREE_MAX_ALIGN];
assert (root_header != NULL);
assert (root_header->node.node_level > 0);
assert (root_header->node.max_key_len >= 0);
rec.area_size = DB_PAGESIZE;
rec.data = PTR_ALIGN (copy_rec_buf, BTREE_MAX_ALIGN);
if (btree_pack_root_header (&rec, root_header, key_type) != NO_ERROR)
{
return ER_FAILED;
}
/* insert the root header information into the root page */
if (spage_insert_at (thread_p, page_ptr, HEADER, &rec) != SP_SUCCESS)
{
return ER_FAILED;
}
/* log the new header record for redo purposes */
log_append_redo_data2 (thread_p, RVBT_NDHEADER_INS, vfid, page_ptr, HEADER, rec.length, rec.data);
return NO_ERROR;
}
/*
* btree_init_overflow_header () - insert btree overflow node header
*
* return:
* page_ptr(in):
* ovf_header(in):
*
*/
int
btree_init_overflow_header (THREAD_ENTRY * thread_p, PAGE_PTR page_ptr, BTREE_OVERFLOW_HEADER * ovf_header)
{
RECDES rec;
char copy_rec_buf[IO_MAX_PAGE_SIZE + BTREE_MAX_ALIGN];
rec.area_size = DB_PAGESIZE;
rec.data = PTR_ALIGN (copy_rec_buf, BTREE_MAX_ALIGN);
memcpy (rec.data, ovf_header, sizeof (BTREE_OVERFLOW_HEADER));
rec.length = sizeof (BTREE_OVERFLOW_HEADER);
rec.type = REC_HOME;
/* insert the root header information into the root page */
if (spage_insert_at (thread_p, page_ptr, HEADER, &rec) != SP_SUCCESS)
{
return ER_FAILED;
}
return NO_ERROR;
}
/*
* btree_rv_save_root_head () - Save root head stats FOR LOGICAL LOG PURPOSES
* return:
* null_delta(in):
* oid_delta(in):
* key_delta(in):
* recdes(out):
*
* Note: Copy the root header statistics to the data area provided.
*
* Note: This is a UTILITY routine, but not an actual recovery routine.
*/
static void
btree_rv_save_root_head (long long null_delta, long long oid_delta, long long key_delta, RECDES * recdes)
{
char *datap;
assert (recdes->area_size >= 3 * OR_BIGINT_SIZE);
recdes->length = 0;
datap = (char *) recdes->data;
OR_PUT_BIGINT (datap, &null_delta);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &oid_delta);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &key_delta);
datap += OR_BIGINT_SIZE;
recdes->length = CAST_STRLEN (datap - recdes->data);
}
/*
* btree_rv_mvcc_save_increments () - Save unique_stats
* return:
* btid(in):
* max_key_len(in):
* null_delta(in):
* oid_delta(in):
* key_delta(in):
* recdes(in):
*
* Note: Copy the unique statistics to the data area provided.
*
* Note: This is a UTILITY routine, but not an actual recovery routine.
*/
void
btree_rv_mvcc_save_increments (const BTID * btid, long long key_delta, long long oid_delta, long long null_delta,
RECDES * recdes)
{
char *datap;
assert (recdes != NULL && (recdes->area_size >= ((3 * OR_BIGINT_SIZE) + OR_BTID_ALIGNED_SIZE)));
recdes->length = (3 * OR_BIGINT_SIZE) + OR_BTID_ALIGNED_SIZE;
datap = (char *) recdes->data;
OR_PUT_BTID (datap, btid);
datap += OR_BTID_ALIGNED_SIZE;
OR_PUT_BIGINT (datap, &key_delta);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &oid_delta);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &null_delta);
datap += OR_BIGINT_SIZE;
recdes->length = CAST_STRLEN (datap - recdes->data);
}
/*
* btree_get_next_overflow_vpid () -
*
* return:
* page_ptr(in):
*
*/
int
btree_get_next_overflow_vpid (THREAD_ENTRY * thread_p, PAGE_PTR page_ptr, VPID * vpid)
{
BTREE_OVERFLOW_HEADER *ovf_header = NULL;
ovf_header = btree_get_overflow_header (thread_p, page_ptr);
if (ovf_header == NULL)
{
return ER_FAILED;
}
*vpid = ovf_header->next_vpid;
return NO_ERROR;
}
int
bt_load_heap_scancache_start_for_attrinfo (THREAD_ENTRY * thread_p, SORT_ARGS * args, HEAP_SCANCACHE * scan_cache,
HEAP_CACHE_ATTRINFO * attr_info, int save_cache_last_fix_page)
{
int attr_offset = args->cur_class * args->n_attrs;
if (scan_cache == NULL)
{
scan_cache = &args->hfscan_cache;
}
if (attr_info == NULL)
{
attr_info = &args->attr_info;
}
/* Start scancache */
if (heap_scancache_start (thread_p, scan_cache, &args->hfids[args->cur_class],
&args->class_ids[args->cur_class], save_cache_last_fix_page, NULL) != NO_ERROR)
{
return ER_FAILED;
}
args->scancache_inited = true;
if (heap_attrinfo_start (thread_p, &args->class_ids[args->cur_class], args->n_attrs,
&args->attr_ids[attr_offset], attr_info) != NO_ERROR)
{
return ER_FAILED;
}
args->attrinfo_inited = true;
if (args->filter != NULL)
{
if (heap_attrinfo_start (thread_p, &args->class_ids[args->cur_class], args->filter->num_attrs_pred,
args->filter->attrids_pred, args->filter->cache_pred) != NO_ERROR)
{
return ER_FAILED;
}
}
if (args->func_index_info && args->func_index_info->expr)
{
if (heap_attrinfo_start (thread_p, &args->class_ids[args->cur_class], args->n_attrs,
&args->attr_ids[attr_offset], args->func_index_info->expr->cache_attrinfo) != NO_ERROR)
{
return ER_FAILED;
}
}
return NO_ERROR;
}
void
bt_load_heap_scancache_end_for_attrinfo (THREAD_ENTRY * thread_p, SORT_ARGS * args, HEAP_SCANCACHE * scan_cache,
HEAP_CACHE_ATTRINFO * attr_info)
{
if (scan_cache == NULL)
{
scan_cache = &args->hfscan_cache;
}
if (attr_info == NULL)
{
attr_info = &args->attr_info;
}
if (args->attrinfo_inited)
{
heap_attrinfo_end (thread_p, attr_info);
if (args->filter)
{
heap_attrinfo_end (thread_p, args->filter->cache_pred);
}
if (args->func_index_info && args->func_index_info->expr)
{
heap_attrinfo_end (thread_p, args->func_index_info->expr->cache_attrinfo);
}
args->attrinfo_inited = false;
}
if (args->scancache_inited)
{
(void) heap_scancache_end (thread_p, scan_cache);
args->scancache_inited = false;
}
}
void
bt_load_clear_pred_and_unpack (THREAD_ENTRY * thread_p, SORT_ARGS * args, XASL_UNPACK_INFO * func_unpack_info)
{
if (args->filter != NULL)
{
/* to clear db values from dbvalue regu variable */
qexec_clear_pred_context (thread_p, args->filter, true);
if (args->filter->unpack_info != NULL)
{
free_xasl_unpack_info (thread_p, args->filter->unpack_info);
}
db_private_free_and_init (thread_p, args->filter);
}
if (args->func_index_info && args->func_index_info->expr != NULL)
{
(void) qexec_clear_func_pred (thread_p, args->func_index_info->expr);
args->func_index_info->expr = NULL;
}
if (func_unpack_info != NULL)
{
free_xasl_unpack_info (thread_p, func_unpack_info);
}
}
int
btree_load_filter_pred_function_info (THREAD_ENTRY * thread_p, SORT_ARGS * sort_args,
PRED_EXPR_WITH_CONTEXT ** filter_pred_p, FILTER_INDEX_INFO * filter_index_info_p,
FUNCTION_INDEX_INFO * func_index_info_p, XASL_UNPACK_INFO ** func_unpack_info_p,
DB_TYPE * single_node_type)
{
if (filter_index_info_p && filter_index_info_p->pred_stream && filter_index_info_p->pred_stream_size > 0)
{
if (stx_map_stream_to_filter_pred (thread_p, filter_pred_p, filter_index_info_p->pred_stream,
filter_index_info_p->pred_stream_size) != NO_ERROR)
{
return ER_FAILED;
}
sort_args->filter = *filter_pred_p;
sort_args->filter_eval_func = eval_fnc (thread_p, (*filter_pred_p)->pred, single_node_type);
}
if (func_index_info_p && func_index_info_p->expr_stream && func_index_info_p->expr_stream_size > 0)
{
if (stx_map_stream_to_func_pred (thread_p, &func_index_info_p->expr, func_index_info_p->expr_stream,
func_index_info_p->expr_stream_size, func_unpack_info_p) != NO_ERROR)
{
return ER_FAILED;
}
sort_args->func_index_info = func_index_info_p;
}
return NO_ERROR;
}
/*
* xbtree_load_index () - create & load b+tree index
* return: BTID * (btid on success and NULL on failure)
* btid is set as a side effect.
* btid(out):
* btid: Set to the created B+tree index identifier
* (Note: btid->vfid.volid should be set by the caller)
* bt_name(in): index name
* key_type(in): Key type corresponding to the attribute.
* class_oids(in): OID of the class for which the index will be created
* n_classes(in):
* n_attrs(in):
* attr_ids(in): Identifier of the attribute of the class for which the index
* will be created.
* hfids(in): Identifier of the heap file containing the instances of the
* class
* unique_pk(in):
* not_null_flag(in):
* fk_refcls_oid(in):
* fk_refcls_pk_btid(in):
* fk_name(in):
*
*/
BTID *
xbtree_load_index (THREAD_ENTRY * thread_p, BTID * btid, const char *bt_name, TP_DOMAIN * key_type, OID * class_oids,
int n_classes, int n_attrs, int *attr_ids, int *attrs_prefix_length, HFID * hfids, int unique_pk,
int not_null_flag, OID * fk_refcls_oid, BTID * fk_refcls_pk_btid, const char *fk_name,
char *pred_stream, int pred_stream_size, char *func_pred_stream, int func_pred_stream_size,
int func_col_id, int func_attr_index_start)
{
LOG_TDES *tdes = NULL;
SORT_ARGS sort_args_info, *sort_args;
LOAD_ARGS load_args_info, *load_args;
BTID_INT btid_int;
PRED_EXPR_WITH_CONTEXT *filter_pred = NULL;
#if defined (SERVER_MODE)
FILTER_INDEX_INFO filter_index_info;
#endif /* SERVER_MODE */
FUNCTION_INDEX_INFO func_index_info;
DB_TYPE single_node_type = DB_TYPE_NULL;
XASL_UNPACK_INFO *func_unpack_info = NULL;
bool has_fk;
BTID btid_global_stats = BTID_INITIALIZER;
OID *notification_class_oid;
bool is_sysop_started = false;
/* Check for robustness */
if (!btid || !hfids || !class_oids || !attr_ids || !key_type)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BTREE_LOAD_FAILED, 0);
return NULL;
}
sort_args = &sort_args_info;
load_args = &load_args_info;
/* Initialize pointers which might be used in case of error */
load_args->nleaf.pgptr = NULL;
load_args->leaf.pgptr = NULL;
load_args->ovf.pgptr = NULL;
load_args->leaf_nleaf_recdes.data = NULL;
load_args->ovf_recdes.data = NULL;
load_args->out_recdes = NULL;
load_args->push_list = NULL;
load_args->pop_list = NULL;
/*
* Start a TOP SYSTEM OPERATION.
* This top system operation will be either ABORTED (case of failure) or
* COMMITTED, so that the new file becomes kind of permanent. This allows
* us to make use of un-used pages in the case of a bad init_pgcnt guess.
*/
#if !defined (SERVER_MODE)
log_sysop_start (thread_p);
is_sysop_started = true;
#endif /* !SERVER_MODE */
thread_p->push_resource_tracks ();
btid_int.sys_btid = btid;
btid_int.unique_pk = unique_pk;
#if !defined(NDEBUG)
if (unique_pk)
{
assert (BTREE_IS_UNIQUE (btid_int.unique_pk));
assert (BTREE_IS_PRIMARY_KEY (btid_int.unique_pk) || !BTREE_IS_PRIMARY_KEY (btid_int.unique_pk));
}
#endif
btid_int.key_type = key_type;
VFID_SET_NULL (&btid_int.ovfid);
btid_int.rev_level = BTREE_CURRENT_REV_LEVEL;
/* support for SUPPORT_DEDUPLICATE_KEY_MODE */
btid_int.deduplicate_key_idx = dk_get_deduplicate_key_position (n_attrs, attr_ids, func_attr_index_start);
COPY_OID (&btid_int.topclass_oid, &class_oids[0]);
/*
* for btree_range_search, part_key_desc is re-set at btree_initialize_bts
*/
btid_int.part_key_desc = 0;
/* init index key copy_buf info */
btid_int.copy_buf = NULL;
btid_int.copy_buf_len = 0;
btid_int.nonleaf_key_type = btree_generate_prefix_domain (&btid_int);
/* Initialize the fields of sorting argument structures */
sort_args->oldest_visible_mvccid = log_Gl.mvcc_table.get_global_oldest_visible ();
sort_args->unique_pk = unique_pk;
sort_args->not_null_flag = not_null_flag;
sort_args->hfids = hfids;
sort_args->class_ids = class_oids;
sort_args->attr_ids = attr_ids;
sort_args->n_attrs = n_attrs;
sort_args->attrs_prefix_length = attrs_prefix_length;
sort_args->n_classes = n_classes;
sort_args->key_type = key_type;
OID_SET_NULL (&sort_args->cur_oid);
sort_args->n_nulls = 0;
sort_args->n_oids = 0;
sort_args->cur_class = 0;
sort_args->scancache_inited = false;
sort_args->attrinfo_inited = false;
sort_args->btid = &btid_int;
sort_args->fk_refcls_oid = fk_refcls_oid;
sort_args->fk_refcls_pk_btid = fk_refcls_pk_btid;
sort_args->fk_name = fk_name;
sort_args->filter_index_info = NULL;
if (pred_stream && pred_stream_size > 0)
{
if (stx_map_stream_to_filter_pred (thread_p, &filter_pred, pred_stream, pred_stream_size) != NO_ERROR)
{
goto error;
}
#if defined (SERVER_MODE)
filter_index_info.pred_stream = pred_stream;
filter_index_info.pred_stream_size = pred_stream_size;
sort_args->filter_index_info = &filter_index_info;
#endif /* SERVER_MODE */
}
sort_args->filter = filter_pred;
sort_args->filter_eval_func = (filter_pred) ? eval_fnc (thread_p, filter_pred->pred, &single_node_type) : NULL;
sort_args->func_index_info = NULL;
if (func_pred_stream && func_pred_stream_size > 0)
{
func_index_info.expr_stream = func_pred_stream;
func_index_info.expr_stream_size = func_pred_stream_size;
func_index_info.col_id = func_col_id;
func_index_info.attr_index_start = func_attr_index_start;
func_index_info.expr = NULL;
if (stx_map_stream_to_func_pred (thread_p, &func_index_info.expr, func_pred_stream,
func_pred_stream_size, &func_unpack_info))
{
goto error;
}
sort_args->func_index_info = &func_index_info;
}
/*
* Start a heap scan cache for reading objects using the first nun-null heap
* We are guaranteed that such a heap exists, otherwise btree_load_index
* would not have been called.
*/
while (sort_args->cur_class < sort_args->n_classes && HFID_IS_NULL (&sort_args->hfids[sort_args->cur_class]))
{
sort_args->cur_class++;
}
/* After building index acquire lock on table, the transaction has deadlock priority */
tdes = LOG_FIND_CURRENT_TDES (thread_p);
if (tdes)
{
tdes->has_deadlock_priority = true;
}
/* Start scancache */
has_fk = (fk_refcls_oid != NULL && !OID_ISNULL (fk_refcls_oid));
#if !defined (SERVER_MODE)
if (bt_load_heap_scancache_start_for_attrinfo (thread_p, sort_args, NULL, NULL, !has_fk) != NO_ERROR)
{
goto error;
}
if (btree_create_file (thread_p, &class_oids[0], attr_ids[0], btid) != NO_ERROR)
{
ASSERT_ERROR ();
goto error;
}
/* if loading is aborted or if transaction is aborted, vacuum must be notified before file is destoyed. */
vacuum_log_add_dropped_file (thread_p, &btid->vfid, NULL, VACUUM_LOG_ADD_DROPPED_FILE_UNDO);
#endif /* !SERVER_MODE */
load_args->btid = &btid_int;
load_args->bt_name = bt_name;
db_make_null (&load_args->current_key);
VPID_SET_NULL (&load_args->nleaf.vpid);
load_args->nleaf.pgptr = NULL;
VPID_SET_NULL (&load_args->leaf.vpid);
load_args->leaf.pgptr = NULL;
VPID_SET_NULL (&load_args->ovf.vpid);
load_args->ovf.pgptr = NULL;
load_args->n_keys = 0;
load_args->curr_non_del_obj_count = 0;
load_args->leaf_nleaf_recdes.area_size = BTREE_MAX_KEYLEN_INPAGE + BTREE_MAX_OIDLEN_INPAGE;
load_args->leaf_nleaf_recdes.length = 0;
load_args->leaf_nleaf_recdes.type = REC_HOME;
load_args->leaf_nleaf_recdes.data = (char *) os_malloc (load_args->leaf_nleaf_recdes.area_size);
if (load_args->leaf_nleaf_recdes.data == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, load_args->leaf_nleaf_recdes.area_size);
goto error;
}
load_args->ovf_recdes.area_size = DB_PAGESIZE;
load_args->ovf_recdes.length = 0;
load_args->ovf_recdes.type = REC_HOME;
load_args->ovf_recdes.data = (char *) os_malloc (load_args->ovf_recdes.area_size);
if (load_args->ovf_recdes.data == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, load_args->ovf_recdes.area_size);
goto error;
}
load_args->out_recdes = NULL;
/* Allocate a root page and save the page_id */
*load_args->btid->sys_btid = *btid;
btid_global_stats = *btid;
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE, "DEBUG_BTREE: load start on class(%d, %d, %d), btid(%d, (%d, %d)).",
sort_args->class_ids[sort_args->cur_class].volid,
sort_args->class_ids[sort_args->cur_class].pageid,
sort_args->class_ids[sort_args->cur_class].slotid, sort_args->btid->sys_btid->root_pageid,
sort_args->btid->sys_btid->vfid.volid, sort_args->btid->sys_btid->vfid.fileid);
}
/* Build the leaf pages of the btree as the output of the sort. We do not estimate the number of pages required. */
if (btree_index_sort (thread_p, sort_args, btree_construct_leafs, load_args) != NO_ERROR)
{
goto error;
}
#if !defined (SERVER_MODE)
bt_load_heap_scancache_end_for_attrinfo (thread_p, sort_args, NULL, NULL);
#endif /* !SERVER_MODE */
#if defined (SERVER_MODE)
is_sysop_started = true;
#endif /* SERVER_MODE */
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE,
"DEBUG_BTREE: load finished all. %d classes loaded, found %d nulls and %d oids, "
"load %d keys.", sort_args->n_classes, sort_args->n_nulls, sort_args->n_oids, load_args->n_keys);
}
/* Just to make sure that there were entries to put into the tree */
if (load_args->leaf.pgptr != NULL)
{
/* Save the last leaf record */
if (btree_save_last_leafrec (thread_p, load_args) != NO_ERROR)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BTREE_LOAD_FAILED, 0);
goto error;
}
/* No need to deal with overflow pages anymore */
load_args->ovf.pgptr = NULL;
/* Check the correctness of the foreign key, if any. */
if (has_fk)
{
if (btree_load_check_fk (thread_p, load_args, sort_args) != NO_ERROR)
{
goto error;
}
}
/* Build the non leaf nodes of the btree; Root page id will be assigned here */
if (btree_build_nleafs (thread_p, load_args, sort_args->n_nulls, sort_args->n_oids, load_args->n_keys) !=
NO_ERROR)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BTREE_LOAD_FAILED, 0);
goto error;
}
/* There is at least one leaf page */
/* Release the memory area */
os_free_and_init (load_args->leaf_nleaf_recdes.data);
os_free_and_init (load_args->ovf_recdes.data);
pr_clear_value (&load_args->current_key);
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE, "DEBUG_BTREE: load built index btid (%d, (%d, %d)).",
btid_int.sys_btid->root_pageid, btid_int.sys_btid->vfid.volid, btid_int.sys_btid->vfid.fileid);
}
#if !defined(NDEBUG)
(void) btree_verify_tree (thread_p, &class_oids[0], &btid_int, bt_name);
#endif
}
else
{
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE, "DEBUG_BTREE: load didn't build any leaves btid (%d, (%d, %d)).",
btid_int.sys_btid->root_pageid, btid_int.sys_btid->vfid.volid, btid_int.sys_btid->vfid.fileid);
}
/* redo an empty index, but first destroy the one we created. the safest way is to abort changes so far. */
log_sysop_abort (thread_p);
is_sysop_started = false;
os_free_and_init (load_args->leaf_nleaf_recdes.data);
os_free_and_init (load_args->ovf_recdes.data);
pr_clear_value (&load_args->current_key);
BTID_SET_NULL (btid);
if (xbtree_add_index (thread_p, btid, key_type, &class_oids[0], attr_ids[0], unique_pk, sort_args->n_oids,
sort_args->n_nulls, load_args->n_keys, btid_int.deduplicate_key_idx) == NULL)
{
goto error;
}
}
if (!VFID_ISNULL (&load_args->btid->ovfid))
{
/* notification */
if (!OID_ISNULL (&class_oids[0]))
{
notification_class_oid = &class_oids[0];
}
else
{
notification_class_oid = &btid_int.topclass_oid;
}
BTREE_SET_CREATED_OVERFLOW_KEY_NOTIFICATION (thread_p, NULL, NULL, notification_class_oid, btid, bt_name);
}
bt_load_clear_pred_and_unpack (thread_p, sort_args, func_unpack_info);
thread_p->pop_resource_tracks ();
if (is_sysop_started)
{
/* todo: we have the option to commit & undo here. on undo, we can destroy the file directly. */
log_sysop_attach_to_outer (thread_p);
if (unique_pk)
{
/* drop statistics if aborted */
log_append_undo_data2 (thread_p, RVBT_REMOVE_UNIQUE_STATS, NULL, NULL, NULL_OFFSET, sizeof (BTID), btid);
}
}
else
{
/* index was not loaded and xbtree_add_index was called instead. we have nothing to log here. */
}
logpb_force_flush_pages (thread_p);
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE, "BTREE_DEBUG: load finished successful index btid(%d, (%d, %d)).",
btid_int.sys_btid->root_pageid, btid_int.sys_btid->vfid.volid, btid_int.sys_btid->vfid.fileid);
}
return btid;
error:
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE, "BTREE_DEBUG: load aborted index btid(%d, (%d, %d)).",
btid_int.sys_btid->root_pageid, btid_int.sys_btid->vfid.volid, btid_int.sys_btid->vfid.fileid);
}
if (!BTID_IS_NULL (&btid_global_stats))
{
logtb_delete_global_unique_stats (thread_p, &btid_global_stats);
}
bt_load_heap_scancache_end_for_attrinfo (thread_p, sort_args, NULL, NULL);
VFID_SET_NULL (&btid->vfid);
btid->root_pageid = NULL_PAGEID;
if (load_args->leaf_nleaf_recdes.data)
{
os_free_and_init (load_args->leaf_nleaf_recdes.data);
}
if (load_args->ovf_recdes.data)
{
os_free_and_init (load_args->ovf_recdes.data);
}
pr_clear_value (&load_args->current_key);
pgbuf_unfix_and_init_after_check (thread_p, load_args->leaf.pgptr);
pgbuf_unfix_and_init_after_check (thread_p, load_args->ovf.pgptr);
pgbuf_unfix_and_init_after_check (thread_p, load_args->nleaf.pgptr);
if (load_args->push_list != NULL)
{
list_clear (load_args->push_list);
load_args->push_list = NULL;
}
if (load_args->pop_list != NULL)
{
list_clear (load_args->pop_list);
load_args->pop_list = NULL;
}
bt_load_clear_pred_and_unpack (thread_p, sort_args, func_unpack_info);
thread_p->pop_resource_tracks ();
if (is_sysop_started)
{
log_sysop_abort (thread_p);
}
return NULL;
}
/*
* btree_save_last_leafrec () - save the last leaf record
* return: NO_ERROR
* load_args(in): Collection of info. for btree load operation
*
* Note: Stores the last leaf record left in the load_args->out_recdes
* area to the last leaf page (or to the last overflow page).
* Then it saves the last leaf page (and the last overflow page,
* if there is one) to the disk.
*/
static int
btree_save_last_leafrec (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args)
{
INT16 slotid;
int sp_success;
int cur_maxspace;
int ret = NO_ERROR;
BTREE_NODE_HEADER *header = NULL;
if (load_args->overflowing == true)
{
/* Store the record in current overflow page */
sp_success = spage_insert (thread_p, load_args->ovf.pgptr, &load_args->ovf_recdes, &slotid);
if (sp_success != SP_SUCCESS)
{
goto exit_on_error;
}
assert (slotid > 0);
/* Save the current overflow page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->ovf.pgptr);
load_args->ovf.pgptr = NULL;
}
/* Insert leaf record too */
cur_maxspace = spage_max_space_for_new_record (thread_p, load_args->leaf.pgptr);
if (((cur_maxspace - load_args->leaf_nleaf_recdes.length) < LOAD_FIXED_EMPTY_FOR_LEAF)
&& (spage_number_of_records (load_args->leaf.pgptr) > 1))
{
/* New record does not fit into the current leaf page (within the threshold value); so allocate a new leaf page
* and dump the current leaf page. */
if (btree_proceed_leaf (thread_p, load_args) == NULL)
{
goto exit_on_error;
}
}
/* Insert the record to the current leaf page */
sp_success = spage_insert (thread_p, load_args->leaf.pgptr, &load_args->leaf_nleaf_recdes, &slotid);
if (sp_success != SP_SUCCESS)
{
goto exit_on_error;
}
assert (slotid > 0);
/* Update the node header information for this record */
if (load_args->cur_key_len >= BTREE_MAX_KEYLEN_INPAGE)
{
if (load_args->leaf.hdr.max_key_len < DISK_VPID_SIZE)
{
load_args->leaf.hdr.max_key_len = DISK_VPID_SIZE;
}
}
else
{
if (load_args->leaf.hdr.max_key_len < load_args->cur_key_len)
{
load_args->leaf.hdr.max_key_len = load_args->cur_key_len;
}
}
/* Update the leaf header of the current leaf page */
header = btree_get_node_header (thread_p, load_args->leaf.pgptr);
if (header == NULL)
{
goto exit_on_error;
}
*header = load_args->leaf.hdr;
/* Save the current leaf page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->leaf.pgptr);
load_args->leaf.pgptr = NULL;
return ret;
exit_on_error:
pgbuf_unfix_and_init_after_check (thread_p, load_args->leaf.pgptr);
pgbuf_unfix_and_init_after_check (thread_p, load_args->ovf.pgptr);
return (ret == NO_ERROR && (ret = er_errid ()) == NO_ERROR) ? ER_FAILED : ret;
}
/*
* btree_connect_page () - Connect child page to the new parent page
* return: PAGE_PTR (pointer to the parent page if finished successfully,
* or NULL in case of an error)
* key(in): Biggest key value (or, separator value in case of string
* keys) of the child page to be connected.
* max_key_len(in): size of the biggest key in this leaf page
* pageid(in): identifier of this leaf page
* load_args(in):
*
* Note: This function connects a page to its parent page. In the
* parent level a new record is prepared with the given & pageid
* parameters and inserted into current non-leaf page being
* filled up. If the record does not fit into this page within
* the given threshold boundaries (LOAD_FIXED_EMPTY_FOR_NONLEAF bytes of each
* page are reserved for future insertions) then this page is
* flushed to disk and a new page is allocated to become the
* current non-leaf page.
*/
static PAGE_PTR
btree_connect_page (THREAD_ENTRY * thread_p, DB_VALUE * key, int max_key_len, VPID * pageid, LOAD_ARGS * load_args,
int node_level)
{
NON_LEAF_REC nleaf_rec;
INT16 slotid;
int sp_success;
int cur_maxspace;
int key_len;
int key_type = BTREE_NORMAL_KEY;
BTREE_NODE_HEADER *header = NULL;
/* form the leaf record (create the header & insert the key) */
cur_maxspace = spage_max_space_for_new_record (thread_p, load_args->nleaf.pgptr);
nleaf_rec.pnt = *pageid;
key_len = btree_get_disk_size_of_key (key);
if (key_len < BTREE_MAX_KEYLEN_INPAGE)
{
nleaf_rec.key_len = key_len;
key_type = BTREE_NORMAL_KEY;
}
else
{
nleaf_rec.key_len = -1;
key_type = BTREE_OVERFLOW_KEY;
if (VFID_ISNULL (&load_args->btid->ovfid))
{
if (btree_create_overflow_key_file (thread_p, load_args->btid) != NO_ERROR)
{
return NULL;
}
}
}
if (btree_write_record (thread_p, load_args->btid, &nleaf_rec, key, BTREE_NON_LEAF_NODE, key_type, key_len, true,
NULL, NULL, NULL, &load_args->leaf_nleaf_recdes) != NO_ERROR)
{
return NULL;
}
if ((cur_maxspace - load_args->leaf_nleaf_recdes.length) < LOAD_FIXED_EMPTY_FOR_NONLEAF)
{
/* New record does not fit into the current non-leaf page (within the threshold value); so allocate a new
* non-leaf page and dump the current non-leaf page. */
/* Update the non-leaf page header */
header = btree_get_node_header (thread_p, load_args->nleaf.pgptr);
if (header == NULL)
{
return NULL;
}
*header = load_args->nleaf.hdr;
/* Flush the current non-leaf page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->nleaf.pgptr);
load_args->nleaf.pgptr = NULL;
/* Insert the pageid to the linked list */
if (list_add (&load_args->push_list, &load_args->nleaf.vpid) != NO_ERROR)
{
return NULL;
}
/* get a new non-leaf page */
if (btree_load_new_page (thread_p, load_args->btid->sys_btid, &load_args->nleaf.hdr, node_level,
&load_args->nleaf.vpid, &load_args->nleaf.pgptr) != NO_ERROR)
{
ASSERT_ERROR ();
return NULL;
}
if (load_args->nleaf.pgptr == NULL)
{
assert_release (false);
return NULL;
}
} /* get a new leaf */
/* Insert the record to the current leaf page */
sp_success = spage_insert (thread_p, load_args->nleaf.pgptr, &load_args->leaf_nleaf_recdes, &slotid);
if (sp_success != SP_SUCCESS)
{
return NULL;
}
assert (slotid > 0);
/* Update the node header information for this record */
if (load_args->nleaf.hdr.max_key_len < max_key_len)
{
load_args->nleaf.hdr.max_key_len = max_key_len;
}
return load_args->nleaf.pgptr;
}
/*
* btree_build_nleafs () -
* return: NO_ERROR
* load_args(in): Collection of information on how to build B+tree.
* n_nulls(in):
* n_oids(in):
* n_keys(in):
*
* Note: This function builds the non-leaf level nodes of the B+Tree
* index in three phases. In the first phase, the first non-leaf
* level nodes of the tree are constructed. Then, the upper
* levels are built on top of this level, and finally, the last
* non-leaf page (the single page of the top level) is updated
* to become the root page.
*/
static int
btree_build_nleafs (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, int n_nulls, int n_oids, int n_keys)
{
RECDES temp_recdes; /* Temporary record descriptor; */
char *temp_data = NULL;
VPID next_vpid;
PAGE_PTR next_pageptr = NULL;
/* Variables used in the second phase to go over lower level non-leaf pages */
VPID cur_nleafpgid;
PAGE_PTR cur_nleafpgptr = NULL;
BTREE_NODE *temp;
BTREE_ROOT_HEADER root_header_info, *root_header = NULL;
BTREE_NODE_HEADER *header = NULL;
int key_cnt;
int max_key_len, new_max;
LEAF_REC leaf_pnt;
NON_LEAF_REC nleaf_pnt;
/* Variables for keeping keys */
DB_VALUE prefix_key; /* Prefix key; prefix of last & first keys; used only if type is one of the string
* types */
int last_key_offset, first_key_offset;
bool clear_last_key = false, clear_first_key = false;
int ret = NO_ERROR;
int node_level = 2; /* leaf level = 1, lowest non-leaf level = 2 */
RECDES rec;
char rec_buf[IO_MAX_PAGE_SIZE + BTREE_MAX_ALIGN];
DB_VALUE last_key; /* Last key of the current page */
DB_VALUE first_key; /* First key of the next page; used only if key_type is one of the string types */
root_header = &root_header_info;
rec.area_size = DB_PAGESIZE;
rec.data = PTR_ALIGN (rec_buf, BTREE_MAX_ALIGN);
btree_init_temp_key_value (&clear_last_key, &last_key);
btree_init_temp_key_value (&clear_first_key, &first_key);
db_make_null (&prefix_key);
temp_data = (char *) os_malloc (DB_PAGESIZE);
if (temp_data == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, DB_PAGESIZE);
ret = ER_OUT_OF_VIRTUAL_MEMORY;
goto end;
}
/*****************************************************
PHASE I : Build the first non_leaf level nodes
****************************************************/
assert (node_level == 2);
/* Initialize the first non-leaf page */
ret =
btree_load_new_page (thread_p, load_args->btid->sys_btid, &load_args->nleaf.hdr, node_level, &load_args->nleaf.vpid,
&load_args->nleaf.pgptr);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
if (load_args->nleaf.pgptr == NULL)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
load_args->push_list = NULL;
load_args->pop_list = NULL;
db_make_null (&last_key);
/* While there are some leaf pages do */
load_args->leaf.vpid = load_args->vpid_first_leaf;
while (!VPID_ISNULL (&(load_args->leaf.vpid)))
{
load_args->leaf.pgptr =
pgbuf_fix (thread_p, &load_args->leaf.vpid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
if (load_args->leaf.pgptr == NULL)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
#if !defined (NDEBUG)
(void) pgbuf_check_page_ptype (thread_p, load_args->leaf.pgptr, PAGE_BTREE);
#endif /* !NDEBUG */
key_cnt = btree_node_number_of_keys (thread_p, load_args->leaf.pgptr);
assert (key_cnt > 0);
/* obtain the header information for the leaf page */
header = btree_get_node_header (thread_p, load_args->leaf.pgptr);
if (header == NULL)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
/* get the maximum key length on this leaf page */
max_key_len = header->max_key_len;
next_vpid = header->next_vpid;
/* set level 2 to first non-leaf page */
load_args->nleaf.hdr.node_level = node_level;
/* Learn the first key of the leaf page */
if (spage_get_record (thread_p, load_args->leaf.pgptr, 1, &temp_recdes, PEEK) != S_SUCCESS)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
ret =
btree_read_record (thread_p, load_args->btid, load_args->leaf.pgptr, &temp_recdes, &first_key, &leaf_pnt,
BTREE_LEAF_NODE, &clear_first_key, &first_key_offset, PEEK_KEY_VALUE, NULL);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
if (pr_is_prefix_key_type (TP_DOMAIN_TYPE (load_args->btid->key_type)))
{
/*
* Key type is string or midxkey.
* Should insert the prefix key to the parent level
*/
if (DB_IS_NULL (&last_key))
{
/* is the first leaf When the types of leaf node are char, bit, the type that is saved on non-leaf
* node is different. non-leaf spec (char -> varchar, bit -> varbit) hence it should
* be configured by using setval of nonleaf_key_type. */
ret = load_args->btid->nonleaf_key_type->type->setval (&prefix_key, &first_key, true);
if (ret != NO_ERROR)
{
assert (!"setval error");
goto end;
}
}
else
{
/* Insert the prefix key to the parent level */
ret = btree_get_prefix_separator (&last_key, &first_key, &prefix_key, load_args->btid->key_type);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
}
/*
* We may need to update the max_key length if the mid key is
* larger than the max key length. This will only happen when
* the varying key length is larger than the fixed key length
* in pathological cases like char(4)
*/
new_max = btree_get_disk_size_of_key (&prefix_key);
new_max = BTREE_GET_KEY_LEN_IN_PAGE (new_max);
max_key_len = MAX (new_max, max_key_len);
assert (node_level == 2);
if (btree_connect_page (thread_p, &prefix_key, max_key_len, &load_args->leaf.vpid, load_args, node_level) ==
NULL)
{
ASSERT_ERROR_AND_SET (ret);
pr_clear_value (&prefix_key);
goto end;
}
/* We always need to clear the prefix key. It is always a copy. */
pr_clear_value (&prefix_key);
btree_clear_key_value (&clear_last_key, &last_key);
/* Learn the last key of the leaf page */
assert (key_cnt > 0);
if (spage_get_record (thread_p, load_args->leaf.pgptr, key_cnt, &temp_recdes, PEEK) != S_SUCCESS)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
ret =
btree_read_record (thread_p, load_args->btid, load_args->leaf.pgptr, &temp_recdes, &last_key, &leaf_pnt,
BTREE_LEAF_NODE, &clear_last_key, &last_key_offset, PEEK_KEY_VALUE, NULL);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
}
else
{
max_key_len = BTREE_GET_KEY_LEN_IN_PAGE (max_key_len);
/* Insert this key to the parent level */
assert (node_level == 2);
if (btree_connect_page (thread_p, &first_key, max_key_len, &load_args->leaf.vpid, load_args, node_level) ==
NULL)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
}
btree_clear_key_value (&clear_first_key, &first_key);
/* Proceed the current leaf page pointer to the next leaf page */
pgbuf_unfix_and_init (thread_p, load_args->leaf.pgptr);
load_args->leaf.vpid = next_vpid;
load_args->leaf.pgptr = next_pageptr;
next_pageptr = NULL;
} /* while there are some more leaf pages */
/* Update the non-leaf page header */
header = btree_get_node_header (thread_p, load_args->nleaf.pgptr);
if (header == NULL)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
*header = load_args->nleaf.hdr;
/* Flush the last non-leaf page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->nleaf.pgptr);
load_args->nleaf.pgptr = NULL;
/* Insert the pageid to the linked list */
ret = list_add (&load_args->push_list, &load_args->nleaf.vpid);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
btree_clear_key_value (&clear_last_key, &last_key);
/*****************************************************
PHASE II: Build the upper levels of tree
****************************************************/
assert (node_level == 2);
/* Switch the push and pop lists */
load_args->pop_list = load_args->push_list;
load_args->push_list = NULL;
while (list_length (load_args->pop_list) > 1)
{
node_level++;
/* while there are more than one page at the previous level do construct the next level */
/* Initialize the first non-leaf page of current level */
ret =
btree_load_new_page (thread_p, load_args->btid->sys_btid, &load_args->nleaf.hdr, node_level,
&load_args->nleaf.vpid, &load_args->nleaf.pgptr);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
if (load_args->nleaf.pgptr == NULL)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
do
{ /* while pop_list is not empty */
/* Get current pageid from the poplist */
cur_nleafpgid = load_args->pop_list->pageid;
/* Fetch the current page */
cur_nleafpgptr = pgbuf_fix (thread_p, &cur_nleafpgid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
if (cur_nleafpgptr == NULL)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
#if !defined (NDEBUG)
(void) pgbuf_check_page_ptype (thread_p, cur_nleafpgptr, PAGE_BTREE);
#endif /* !NDEBUG */
/* obtain the header information for the current non-leaf page */
header = btree_get_node_header (thread_p, cur_nleafpgptr);
if (header == NULL)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
/* get the maximum key length on this leaf page */
max_key_len = header->max_key_len;
/* Learn the first key of the current page */
/* Notice that since this is a non-leaf node */
if (spage_get_record (thread_p, cur_nleafpgptr, 1, &temp_recdes, PEEK) != S_SUCCESS)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
ret =
btree_read_record (thread_p, load_args->btid, cur_nleafpgptr, &temp_recdes, &first_key, &nleaf_pnt,
BTREE_NON_LEAF_NODE, &clear_first_key, &first_key_offset, PEEK_KEY_VALUE, NULL);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
max_key_len = BTREE_GET_KEY_LEN_IN_PAGE (max_key_len);
/* set level to non-leaf page nleaf page could be changed in btree_connect_page */
assert (node_level > 2);
load_args->nleaf.hdr.node_level = node_level;
/* Insert this key to the parent level */
if (btree_connect_page (thread_p, &first_key, max_key_len, &cur_nleafpgid, load_args, node_level) == NULL)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
pgbuf_unfix_and_init (thread_p, cur_nleafpgptr);
/* Remove this pageid from the pop list */
list_remove_first (&load_args->pop_list);
btree_clear_key_value (&clear_first_key, &first_key);
}
while (list_length (load_args->pop_list) > 0);
/* FLUSH LAST NON-LEAF PAGE */
/* Update the non-leaf page header */
header = btree_get_node_header (thread_p, load_args->nleaf.pgptr);
if (header == NULL)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
*header = load_args->nleaf.hdr;
/* Flush the last non-leaf page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->nleaf.pgptr);
load_args->nleaf.pgptr = NULL;
/* Insert the pageid to the linked list */
ret = list_add (&load_args->push_list, &load_args->nleaf.vpid);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
/* Switch push and pop lists */
temp = load_args->pop_list;
load_args->pop_list = load_args->push_list;
load_args->push_list = temp;
}
/* Deallocate the last node (only one exists) */
list_remove_first (&load_args->pop_list);
/******************************************
PHASE III: Update the root page
*****************************************/
/* Retrieve the last non-leaf page (the current one); guaranteed to exist */
/* Fetch the current page */
load_args->nleaf.pgptr =
pgbuf_fix (thread_p, &load_args->nleaf.vpid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
if (load_args->nleaf.pgptr == NULL)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
#if !defined (NDEBUG)
pgbuf_check_page_ptype (thread_p, load_args->nleaf.pgptr, PAGE_BTREE);
#endif /* !NDEBUG */
/* Prepare the root header by using the last leaf node header */
root_header->node.max_key_len = load_args->nleaf.hdr.max_key_len;
root_header->node.node_level = node_level;
VPID_SET_NULL (&(root_header->node.next_vpid));
VPID_SET_NULL (&(root_header->node.prev_vpid));
root_header->node.split_info.pivot = 0.0f;
root_header->node.split_info.index = 0;
if (load_args->btid->unique_pk)
{
root_header->num_nulls = n_nulls;
root_header->num_oids = n_oids;
root_header->num_keys = n_keys;
root_header->unique_pk = load_args->btid->unique_pk;
assert (BTREE_IS_UNIQUE (root_header->unique_pk));
assert (BTREE_IS_PRIMARY_KEY (root_header->unique_pk) || !BTREE_IS_PRIMARY_KEY (root_header->unique_pk));
}
else
{
root_header->num_nulls = -1;
root_header->num_oids = -1;
root_header->num_keys = -1;
root_header->unique_pk = 0;
}
COPY_OID (&(root_header->topclass_oid), &load_args->btid->topclass_oid);
root_header->ovfid = load_args->btid->ovfid; /* structure copy */
root_header->_32.rev_level = BTREE_CURRENT_REV_LEVEL;
SET_DECOMPRESS_IDX_HEADER (root_header, load_args->btid->deduplicate_key_idx);
#if defined (SERVER_MODE)
root_header->creator_mvccid = logtb_get_current_mvccid (thread_p);
#else /* !SERVER_MODE */ /* SA_MODE */
root_header->creator_mvccid = MVCCID_NULL;
#endif /* SA_MODE */
/* change node header as root header */
ret = btree_pack_root_header (&rec, root_header, load_args->btid->key_type);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
if (spage_update (thread_p, load_args->nleaf.pgptr, HEADER, &rec) != SP_SUCCESS)
{
assert_release (false);
ret = ER_FAILED;
goto end;
}
/* move current ROOT page content to the first page allocated */
btree_get_root_vpid_from_btid (thread_p, load_args->btid->sys_btid, &cur_nleafpgid);
next_pageptr = pgbuf_fix (thread_p, &cur_nleafpgid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
if (next_pageptr == NULL)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
assert (pgbuf_get_page_ptype (thread_p, next_pageptr) == PAGE_BTREE);
memcpy (next_pageptr, load_args->nleaf.pgptr, DB_PAGESIZE);
pgbuf_unfix_and_init (thread_p, load_args->nleaf.pgptr);
load_args->nleaf.pgptr = next_pageptr;
next_pageptr = NULL;
load_args->nleaf.vpid = cur_nleafpgid;
/* The root page must be logged, otherwise, in the event of a crash. The index may be gone. */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->nleaf.pgptr);
load_args->nleaf.pgptr = NULL;
assert (ret == NO_ERROR);
end:
/* cleanup */
pgbuf_unfix_and_init_after_check (thread_p, next_pageptr);
pgbuf_unfix_and_init_after_check (thread_p, cur_nleafpgptr);
pgbuf_unfix_and_init_after_check (thread_p, load_args->leaf.pgptr);
pgbuf_unfix_and_init_after_check (thread_p, load_args->nleaf.pgptr);
if (temp_data)
{
os_free_and_init (temp_data);
}
btree_clear_key_value (&clear_last_key, &last_key);
btree_clear_key_value (&clear_first_key, &first_key);
return ret;
}
/*
* btree_log_page () - Save the contents of the buffer
* return: nothing
* vfid(in):
* page_ptr(in): pointer to the page buffer to be saved
*
* Note: This function logs the contents of the given page and frees
* the page after setting on the dirty bit.
*/
static void
btree_log_page (THREAD_ENTRY * thread_p, VFID * vfid, PAGE_PTR page_ptr)
{
LOG_DATA_ADDR addr; /* For recovery purposes */
/* log the whole page for redo purposes. */
addr.vfid = vfid;
addr.pgptr = page_ptr;
addr.offset = -1; /* irrelevant */
log_append_redo_data (thread_p, RVBT_COPYPAGE, &addr, DB_PAGESIZE, page_ptr);
pgbuf_set_dirty (thread_p, page_ptr, FREE);
page_ptr = NULL;
}
/*
* btree_load_new_page () - load a new b-tree page.
*
* return : Error code
* thread_p (in) : Thread entry
* btid (in) : B-tree identifier
* header (in) : Node header for leaf/non-leaf nodes or NULL for overflow OID nodes
* node_level (in) : Node level for leaf/non-leaf nodes or -1 for overflow OID nodes
* vpid_new (out) : Output new page VPID
* page_new (out) : Output new page
*/
static int
btree_load_new_page (THREAD_ENTRY * thread_p, const BTID * btid, BTREE_NODE_HEADER * header, int node_level,
VPID * vpid_new, PAGE_PTR * page_new)
{
int error_code = NO_ERROR;
assert ((header != NULL && node_level >= 1) /* leaf, non-leaf */
|| (header == NULL && node_level == -1)); /* overflow */
assert (log_check_system_op_is_started (thread_p)); /* need system operation */
/* we need to commit page allocations. if loading index is aborted, the entire file is destroyed. */
log_sysop_start (thread_p);
/* allocate new page */
error_code = file_alloc (thread_p, &btid->vfid, btree_initialize_new_page, NULL, vpid_new, page_new);
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
if (*page_new == NULL)
{
assert_release (false);
error_code = ER_FAILED;
goto end;
}
#if !defined (NDEBUG)
(void) pgbuf_check_page_ptype (thread_p, *page_new, PAGE_BTREE);
#endif /* !NDEBUG */
if (header)
{ /* This is going to be a leaf or non-leaf page */
/* Insert the node header (with initial values) to the leaf node */
header->node_level = node_level;
header->max_key_len = 0;
VPID_SET_NULL (&header->next_vpid);
VPID_SET_NULL (&header->prev_vpid);
header->split_info.pivot = 0.0f;
header->split_info.index = 0;
header->common_prefix = 0;
error_code = btree_init_node_header (thread_p, &btid->vfid, *page_new, header, false);
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
pgbuf_unfix_and_init (thread_p, *page_new);
goto end;
}
}
else
{ /* This is going to be an overflow page */
BTREE_OVERFLOW_HEADER ovf_header_info;
assert (node_level == -1);
VPID_SET_NULL (&ovf_header_info.next_vpid);
error_code = btree_init_overflow_header (thread_p, *page_new, &ovf_header_info);
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
pgbuf_unfix_and_init (thread_p, *page_new);
goto end;
}
}
assert (error_code == NO_ERROR);
end:
if (error_code != NO_ERROR)
{
log_sysop_abort (thread_p);
}
else
{
log_sysop_commit (thread_p);
}
return error_code;
}
/*
* btree_proceed_leaf () - Proceed the current leaf page to a new one
* return: PAGE_PTR (pointer to the new leaf page, or NULL in
* case of an error).
* load_args(in): Collection of information on how to build B+tree.
*
* Note: This function proceeds the current leaf page pointer from a
* full leaf page to a new (completely empty) one. It first
* allocates a new leaf page and connects it to the current
* leaf page; then, it dumps the current one; and finally makes
* the new leaf page "current".
*/
static PAGE_PTR
btree_proceed_leaf (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args)
{
/* Local variables for managing leaf & overflow pages */
VPID new_leafpgid;
PAGE_PTR new_leafpgptr = NULL;
BTREE_NODE_HEADER new_leafhdr, *header = NULL;
RECDES temp_recdes; /* Temporary record descriptor; */
OR_ALIGNED_BUF (sizeof (BTREE_NODE_HEADER)) a_temp_data;
temp_recdes.data = OR_ALIGNED_BUF_START (a_temp_data);
temp_recdes.area_size = sizeof (BTREE_NODE_HEADER);
/* Allocate the new leaf page */
if (btree_load_new_page (thread_p, load_args->btid->sys_btid, &new_leafhdr, 1, &new_leafpgid, &new_leafpgptr)
!= NO_ERROR)
{
ASSERT_ERROR ();
return NULL;
}
if (new_leafpgptr == NULL)
{
assert_release (false);
return NULL;
}
/* new leaf update header */
new_leafhdr.prev_vpid = load_args->leaf.vpid;
header = btree_get_node_header (thread_p, new_leafpgptr);
if (header == NULL)
{
pgbuf_unfix_and_init (thread_p, new_leafpgptr);
return NULL;
}
*header = new_leafhdr; /* update header */
/* current leaf update header */
/* make the current leaf page point to the new one */
load_args->leaf.hdr.next_vpid = new_leafpgid;
/* and the new leaf point to the current one */
header = btree_get_node_header (thread_p, load_args->leaf.pgptr);
if (header == NULL)
{
pgbuf_unfix_and_init (thread_p, new_leafpgptr);
return NULL;
}
*header = load_args->leaf.hdr;
/* Flush the current leaf page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->leaf.pgptr);
load_args->leaf.pgptr = NULL;
/* Make the new leaf page become the current one */
load_args->leaf.vpid = new_leafpgid;
load_args->leaf.pgptr = new_leafpgptr;
load_args->leaf.hdr = new_leafhdr;
return load_args->leaf.pgptr;
}
/*
* btree_first_oid () - Prepare record for the key and its first oid
* return: int
* this_key(in): Key
* class_oid(in):
* first_oid(in): First OID associated with this key; inserted into the
* record.
* load_args(in): Contains fields specifying where & how to create the record
*
* Note: This function prepares the leaf record for the given key and
* its first OID at the storage location specified by
* load_args->out_recdes field.
*/
static int
btree_first_oid (THREAD_ENTRY * thread_p, DB_VALUE * this_key, OID * class_oid, OID * first_oid,
MVCC_REC_HEADER * p_mvcc_rec_header, LOAD_ARGS * load_args)
{
int key_len;
int key_type;
int error;
BTREE_MVCC_INFO mvcc_info;
assert (load_args->out_recdes == &load_args->leaf_nleaf_recdes);
/* form the leaf record (create the header & insert the key) */
key_len = load_args->cur_key_len = btree_get_disk_size_of_key (this_key);
if (key_len < BTREE_MAX_KEYLEN_INPAGE)
{
key_type = BTREE_NORMAL_KEY;
}
else
{
key_type = BTREE_OVERFLOW_KEY;
if (VFID_ISNULL (&load_args->btid->ovfid))
{
error = btree_create_overflow_key_file (thread_p, load_args->btid);
if (error != NO_ERROR)
{
return error;
}
}
}
btree_mvcc_info_from_heap_mvcc_header (p_mvcc_rec_header, &mvcc_info);
error =
btree_write_record (thread_p, load_args->btid, NULL, this_key, BTREE_LEAF_NODE, key_type, key_len, true, class_oid,
first_oid, &mvcc_info, load_args->out_recdes);
if (error != NO_ERROR)
{
/* this must be an overflow key insertion failure, we assume the overflow manager has logged an error. */
return error;
}
/* Set the location where the new oid should be inserted */
load_args->new_pos = (load_args->out_recdes->data + load_args->out_recdes->length);
assert (load_args->out_recdes->length <= load_args->out_recdes->area_size);
/* Set the current key value to this_key */
pr_clear_value (&load_args->current_key); /* clear previous value */
load_args->cur_key_len = key_len;
error = pr_clone_value (this_key, &load_args->current_key);
if (error != NO_ERROR)
{
return error;
}
if (MVCC_IS_HEADER_DELID_VALID (p_mvcc_rec_header))
{
/* Object is deleted, initialize curr_non_del_obj_count as 0 */
load_args->curr_non_del_obj_count = 0;
}
else
{
/* Object was not deleted, increment curr_non_del_obj_count */
load_args->curr_non_del_obj_count = 1;
/* Increment the key counter if object is not deleted */
(load_args->n_keys)++;
}
load_args->curr_rec_max_obj_count = BTREE_MAX_OIDCOUNT_IN_LEAF_RECORD (load_args->btid);
load_args->curr_rec_obj_count = 1;
#if !defined (NDEBUG)
btree_check_valid_record (thread_p, load_args->btid, load_args->out_recdes, BTREE_LEAF_NODE, NULL);
#endif
return NO_ERROR;
}
/*
* bt_load_put_buf_to_record () - Generating temporary records for Index
* return: int
* recdes(out):
* load_args(in): Contains fields specifying where & how to create the record
* value_has_null(in):
* rec_oid(in): Object identifier of current record.
* mvcc_header(in):
* dbvalue_ptr(in): Key value
* key_len(in): get_disk_size_of_value(dbvalue_ptr)
* cur_class(in):
* is_btree_ops_log(in)
* Note:
*/
static int
bt_load_put_buf_to_record (RECDES * recdes, SORT_ARGS * sort_args, int value_has_null, OID * rec_oid,
MVCC_REC_HEADER * mvcc_header, DB_VALUE * dbvalue_ptr, int key_len, int cur_class,
bool is_btree_ops_log)
{
int next_size;
int record_size;
OR_BUF buf;
int oid_size;
if (BTREE_IS_UNIQUE (sort_args->unique_pk))
{
oid_size = 2 * OR_OID_SIZE;
}
else
{
oid_size = OR_OID_SIZE;
}
next_size = sizeof (char *);
record_size = (next_size /* Pointer to next */
+ OR_INT_SIZE /* Has null */
+ oid_size /* OID, Class OID */
+ 2 * OR_MVCCID_SIZE /* Insert and delete MVCCID */
+ key_len /* Key length */
+ (int) MAX_ALIGNMENT /* Alignment */ );
if (recdes->area_size < record_size)
{
/*
* Record is too big to fit into recdes area; so
* backtrack this iteration
*/
sort_args->cur_oid = *rec_oid;
recdes->length = record_size;
return ER_FAILED;
}
assert (PTR_ALIGN (recdes->data, MAX_ALIGNMENT) == recdes->data);
or_init (&buf, recdes->data, 0);
or_pad (&buf, next_size); /* init as NULL */
/* save has_null */
if (or_put_byte (&buf, value_has_null) != NO_ERROR)
{
return ER_FAILED;
}
or_advance (&buf, (OR_INT_SIZE - OR_BYTE_SIZE));
assert (buf.ptr == PTR_ALIGN (buf.ptr, INT_ALIGNMENT));
if (or_put_oid (&buf, &sort_args->cur_oid) != NO_ERROR)
{
return ER_FAILED;
}
if (BTREE_IS_UNIQUE (sort_args->unique_pk))
{
if (or_put_oid (&buf, &sort_args->class_ids[cur_class]) != NO_ERROR)
{
return ER_FAILED;
}
}
/* Pack insert and delete MVCCID's */
if (MVCC_IS_HEADER_INSID_NOT_ALL_VISIBLE (mvcc_header))
{
if (or_put_mvccid (&buf, MVCC_GET_INSID (mvcc_header)) != NO_ERROR)
{
return ER_FAILED;
}
}
else
{
if (or_put_mvccid (&buf, MVCCID_ALL_VISIBLE) != NO_ERROR)
{
return ER_FAILED;
}
}
if (MVCC_IS_HEADER_DELID_VALID (mvcc_header))
{
if (or_put_mvccid (&buf, MVCC_GET_DELID (mvcc_header)) != NO_ERROR)
{
return ER_FAILED;
}
}
else
{
if (or_put_mvccid (&buf, MVCCID_NULL) != NO_ERROR)
{
return ER_FAILED;
}
}
if (is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE,
"DEBUG_BTREE: load sort found oid(%d, %d, %d)"
", class_oid(%d, %d, %d), btid(%d, (%d, %d), mvcc_info=%llu | %llu.",
sort_args->cur_oid.volid, sort_args->cur_oid.pageid, sort_args->cur_oid.slotid,
sort_args->class_ids[sort_args->cur_class].volid,
sort_args->class_ids[sort_args->cur_class].pageid,
sort_args->class_ids[sort_args->cur_class].slotid,
sort_args->btid->sys_btid->root_pageid,
sort_args->btid->sys_btid->vfid.volid, sort_args->btid->sys_btid->vfid.fileid,
MVCC_IS_FLAG_SET (mvcc_header, OR_MVCC_FLAG_VALID_INSID)
? MVCC_GET_INSID (mvcc_header) : MVCCID_ALL_VISIBLE,
MVCC_IS_FLAG_SET (mvcc_header, OR_MVCC_FLAG_VALID_DELID)
? MVCC_GET_DELID (mvcc_header) : MVCCID_NULL);
}
assert (buf.ptr == PTR_ALIGN (buf.ptr, INT_ALIGNMENT));
if (sort_args->key_type->type->data_writeval (&buf, dbvalue_ptr) != NO_ERROR)
{
return ER_FAILED;
}
recdes->length = CAST_STRLEN (buf.ptr - buf.buffer);
return NO_ERROR;
}
/*
* bt_load_get_buf_from_record () - Reads records generated by bt_load_put_buf_to_record().
* return: int
* recdes(in):
* load_args(in): Contains fields specifying where & how to create the record
* pparam(out): a bundle of record-related information
* copy(in):
* Note:
*/
static int
bt_load_get_buf_from_record (RECDES * recdes, LOAD_ARGS * load_args, S_PARAM_ST * pparam, bool copy)
{
OR_BUF buf;
int ret;
int next_size = sizeof (char *);
/* First decompose the input record into the key and oid components */
or_init (&buf, recdes->data, recdes->length);
assert (buf.ptr == PTR_ALIGN (buf.ptr, MAX_ALIGNMENT));
/* Skip forward link, value_has_null */
or_advance (&buf, next_size + OR_INT_SIZE);
assert (buf.ptr == PTR_ALIGN (buf.ptr, INT_ALIGNMENT));
/* Get OID */
ret = or_get_oid (&buf, &pparam->rec_oid);
if (ret != NO_ERROR)
{
return ret;
}
/* Instance level uniqueness checking */
if (BTREE_IS_UNIQUE (load_args->btid->unique_pk))
{ /* unique index */
/* extract class OID */
ret = or_get_oid (&buf, &pparam->class_oid);
if (ret != NO_ERROR)
{
return ret;
}
}
/* Create MVCC header */
BTREE_INIT_MVCC_HEADER (&pparam->mvcc_header);
ret = or_get_mvccid (&buf, &MVCC_GET_INSID (&pparam->mvcc_header));
if (ret != NO_ERROR)
{
return ret;
}
ret = or_get_mvccid (&buf, &MVCC_GET_DELID (&pparam->mvcc_header));
if (ret != NO_ERROR)
{
return ret;
}
#if defined(SERVER_MODE)
if (MVCC_GET_INSID (&pparam->mvcc_header) != MVCCID_ALL_VISIBLE)
{
/* Set valid insert MVCCID flag */
MVCC_SET_FLAG_BITS (&pparam->mvcc_header, OR_MVCC_FLAG_VALID_INSID);
}
if (MVCC_GET_DELID (&pparam->mvcc_header) != MVCCID_NULL)
{
/* Set valid delete MVCCID */
MVCC_SET_FLAG_BITS (&pparam->mvcc_header, OR_MVCC_FLAG_VALID_DELID);
}
#else
/* all inserted OIDs are created as visible in stand-alone */
MVCC_SET_INSID (&pparam->mvcc_header, MVCCID_ALL_VISIBLE);
if (MVCC_GET_DELID (&pparam->mvcc_header) != MVCCID_NULL)
{
assert (0);
}
#endif
assert (buf.ptr == PTR_ALIGN (buf.ptr, INT_ALIGNMENT));
if (pparam->is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE,
"DEBUG_BTREE: load new object(%d, %d, %d) class(%d, %d, %d) and btid(%d, (%d, %d)) with "
"mvccinfo=%llu| %llu", pparam->rec_oid.volid, pparam->rec_oid.pageid, pparam->rec_oid.slotid,
pparam->class_oid.volid, pparam->class_oid.pageid, pparam->class_oid.slotid,
load_args->btid->sys_btid->root_pageid, load_args->btid->sys_btid->vfid.volid,
load_args->btid->sys_btid->vfid.fileid, MVCC_GET_INSID (&pparam->mvcc_header),
MVCC_GET_DELID (&pparam->mvcc_header));
}
int key_size = -1;
/* Do not copy the string--just use the pointer. The pr_ routines for strings and sets have different semantics
* for length. */
if (TP_DOMAIN_TYPE (load_args->btid->key_type) == DB_TYPE_MIDXKEY)
{
key_size = CAST_STRLEN (buf.endptr - buf.ptr);
}
ret =
load_args->btid->key_type->type->data_readval (&buf, &pparam->this_key, load_args->btid->key_type, key_size, copy,
NULL, 0);
if (ret != NO_ERROR)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TF_CORRUPTED, 0);
return ret;
}
/* Save OID, class OID and MVCC header since they may be replaced. */
COPY_OID (&pparam->orig_oid, &pparam->rec_oid);
COPY_OID (&pparam->orig_class_oid, &pparam->class_oid);
pparam->orig_mvcc_header = pparam->mvcc_header;
return NO_ERROR;
}
/*
* bt_load_get_first_leaf_page_and_init_args () - Create a leaf page and insert the first record
* return: int
* thread_p(in):
* load_args(out): Contains fields specifying where & how to create the record
* pparam(in): a bundle of record-related information
*
* Note: This is the first call to btree_construct_leafs(). so, initialize some fields in the LOAD_ARGS structure
*/
static int
bt_load_get_first_leaf_page_and_init_args (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam)
{
/* Allocate the first page for the index */
int ret = btree_load_new_page (thread_p, load_args->btid->sys_btid, &load_args->leaf.hdr, 1, &load_args->leaf.vpid,
&load_args->leaf.pgptr);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
return ret;
}
if (load_args->leaf.pgptr == NULL || VPID_ISNULL (&load_args->leaf.vpid))
{
assert_release (false);
if ((ret = er_errid ()) == NO_ERROR)
{
return ER_FAILED;
}
return ret;
}
/* Save first leaf VPID. */
load_args->vpid_first_leaf = load_args->leaf.vpid;
load_args->overflowing = false;
assert (load_args->ovf.pgptr == NULL);
/* Create the first record of the current page in main memory */
load_args->out_recdes = &load_args->leaf_nleaf_recdes;
return btree_first_oid (thread_p, &pparam->this_key, &pparam->class_oid, &pparam->rec_oid, &pparam->mvcc_header,
load_args);
}
/*
* bt_load_make_new_record_on_leaf_page () - Create a new record from the leaf page
* return: int
* thread_p(in):
* load_args(in): Contains fields specifying where & how to create the record
* pparam(in): a bundle of record-related information
* sp_success(out): When this value is set to a value other than SP_SUCCESS,
* error processing is performed without changing the error code.
*
* Note:
*/
static int
bt_load_make_new_record_on_leaf_page (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam,
int *sp_success)
{
int cur_maxspace;
int ret = NO_ERROR;
/* Current key is finished; dump this output record to the disk page */
/* Insert current leaf record */
cur_maxspace = spage_max_space_for_new_record (thread_p, load_args->leaf.pgptr);
if (((cur_maxspace - load_args->leaf_nleaf_recdes.length) < LOAD_FIXED_EMPTY_FOR_LEAF)
&& (spage_number_of_records (load_args->leaf.pgptr) > 1))
{
/* New record does not fit into the current leaf page (within the threshold value); so allocate a new
* leaf page and dump the current leaf page. */
if (btree_proceed_leaf (thread_p, load_args) == NULL)
{
*sp_success = ER_FAILED;
return ret;
}
} /* get a new leaf */
/* Insert the record to the current leaf page */
*sp_success =
spage_insert (thread_p, load_args->leaf.pgptr, &load_args->leaf_nleaf_recdes, &load_args->last_leaf_insert_slotid);
if (*sp_success != SP_SUCCESS)
{
return ret;
}
assert (load_args->last_leaf_insert_slotid > 0);
/* Update the node header information for this record */
if (load_args->cur_key_len >= BTREE_MAX_KEYLEN_INPAGE)
{
if (load_args->leaf.hdr.max_key_len < DISK_VPID_SIZE)
{
load_args->leaf.hdr.max_key_len = DISK_VPID_SIZE;
}
}
else if (load_args->leaf.hdr.max_key_len < load_args->cur_key_len)
{
load_args->leaf.hdr.max_key_len = load_args->cur_key_len;
}
if (load_args->overflowing)
{
/* Insert the new record to the current overflow page and flush this page */
INT16 slotid;
assert (load_args->out_recdes == &load_args->ovf_recdes);
/* Store the record in current overflow page */
*sp_success = spage_insert (thread_p, load_args->ovf.pgptr, load_args->out_recdes, &slotid);
if (*sp_success != SP_SUCCESS)
{
return ret;
}
assert (slotid > 0);
/* Save the current overflow page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->ovf.pgptr);
load_args->ovf.pgptr = NULL;
/* Turn off the overflowing mode */
load_args->overflowing = false;
} /* Current page is an overflow page */
else
{
assert (load_args->out_recdes == &load_args->leaf_nleaf_recdes);
}
/* Create the first part of the next record in main memory */
load_args->out_recdes = &load_args->leaf_nleaf_recdes;
return btree_first_oid (thread_p, &pparam->this_key, &pparam->class_oid, &pparam->rec_oid, &pparam->mvcc_header,
load_args);
}
/*
* bt_load_invalidate_mvcc_del_id () -
* return: int
* thread_p(in):
* load_args(in): Contains fields specifying where & how to create the record
* pparam(in): a bundle of record-related information
* Note:
*/
static int
bt_load_invalidate_mvcc_del_id (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam)
{
int ret;
/* TODO: Rewrite btree_construct_leafs. It's almost impossible to follow. */
load_args->curr_non_del_obj_count++;
if (load_args->curr_non_del_obj_count == 1)
{
/* When first non-deleted object is found, we must increment that number of keys for statistics. */
(load_args->n_keys)++;
if (BTREE_IS_UNIQUE (load_args->btid->unique_pk))
{
/* this is the first non-deleted OID of the key; it must be placed as the first OID */
BTREE_MVCC_INFO first_mvcc_info;
OID first_oid, first_class_oid;
int offset = 0;
/* Retrieve the first OID from leaf record */
ret =
btree_leaf_get_first_object (load_args->btid, &load_args->leaf_nleaf_recdes, &first_oid,
&first_class_oid, &first_mvcc_info);
if (ret != NO_ERROR)
{
return ret;
}
/* replace with current OID (might move memory in record) */
btree_mvcc_info_from_heap_mvcc_header (&pparam->mvcc_header, &pparam->mvcc_info);
btree_leaf_change_first_object (thread_p, &load_args->leaf_nleaf_recdes, load_args->btid,
&pparam->rec_oid, &pparam->class_oid, &pparam->mvcc_info, &offset, NULL,
NULL);
if (ret != NO_ERROR)
{
return ret;
}
if (!load_args->overflowing)
{
/* Update load_args->new_pos in case record has grown after replace */
load_args->new_pos += offset;
}
assert (load_args->leaf_nleaf_recdes.length <= load_args->leaf_nleaf_recdes.area_size);
#if !defined (NDEBUG)
btree_check_valid_record (thread_p, load_args->btid, &load_args->leaf_nleaf_recdes, BTREE_LEAF_NODE, NULL);
#endif
/* save first OID as current OID, to be written */
COPY_OID (&pparam->rec_oid, &first_oid);
COPY_OID (&pparam->class_oid, &first_class_oid);
btree_mvcc_info_to_heap_mvcc_header (&first_mvcc_info, &pparam->mvcc_header);
}
}
else if (load_args->curr_non_del_obj_count > 1 && BTREE_IS_UNIQUE (load_args->btid->unique_pk))
{
/* Unique constrain violation - more than one visible records for this key. */
BTREE_SET_UNIQUE_VIOLATION_ERROR (thread_p, &pparam->this_key, &pparam->rec_oid, &pparam->class_oid,
load_args->btid->sys_btid, load_args->bt_name);
return ER_BTREE_UNIQUE_FAILED;
}
return NO_ERROR;
}
/*
* bt_load_nospace_for_new_oid () - Create and save the Overflow OID page.
* return: int
* thread_p(in):
* load_args(in): Contains fields specifying where & how to create the record
* sp_success(out): When this value is set to a value other than SP_SUCCESS,
* error processing is performed without changing the error code.
* Note:
*/
static int
bt_load_nospace_for_new_oid (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, int *sp_success)
{
int ret;
PAGE_PTR new_ovfpgptr = NULL;
assert (*sp_success == SP_SUCCESS);
/* There is no space for the new oid */
if (load_args->overflowing == true)
{
INT16 slotid;
BTREE_OVERFLOW_HEADER *ovf_header = NULL;
VPID new_ovfpgid;
/* Store the record in current overflow page */
assert (load_args->out_recdes == &load_args->ovf_recdes);
*sp_success = spage_insert (thread_p, load_args->ovf.pgptr, &load_args->ovf_recdes, &slotid);
if (*sp_success != SP_SUCCESS)
{
return NO_ERROR;
}
assert (slotid > 0);
/* Allocate the new overflow page */
ret = btree_load_new_page (thread_p, load_args->btid->sys_btid, NULL, -1, &new_ovfpgid, &new_ovfpgptr);
if (ret != NO_ERROR)
{
pgbuf_unfix_and_init_after_check (thread_p, new_ovfpgptr);
ASSERT_ERROR ();
return ret;
}
if (new_ovfpgptr == NULL)
{
pgbuf_unfix_and_init_after_check (thread_p, new_ovfpgptr);
assert_release (false);
return ER_FAILED;
}
/* make the current overflow page point to the new one */
ovf_header = btree_get_overflow_header (thread_p, load_args->ovf.pgptr);
if (ovf_header == NULL)
{
pgbuf_unfix_and_init_after_check (thread_p, new_ovfpgptr);
*sp_success = ER_FAILED;
return ret;
}
ovf_header->next_vpid = new_ovfpgid;
/* Save the current overflow page */
btree_log_page (thread_p, &load_args->btid->sys_btid->vfid, load_args->ovf.pgptr);
load_args->ovf.pgptr = NULL;
/* Make the new overflow page become the current one */
load_args->ovf.vpid = new_ovfpgid;
load_args->ovf.pgptr = new_ovfpgptr;
new_ovfpgptr = NULL;
}
else
{ /* Current page is a leaf page */
assert (load_args->out_recdes == &load_args->leaf_nleaf_recdes);
/* Allocate the new overflow page */
ret =
btree_load_new_page (thread_p, load_args->btid->sys_btid, NULL, -1, &load_args->ovf.vpid,
&load_args->ovf.pgptr);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
return ret;
}
if (load_args->ovf.pgptr == NULL)
{
assert_release (false);
*sp_success = ER_FAILED;
return NO_ERROR;
}
/* Connect the new overflow page to the leaf page */
btree_leaf_record_change_overflow_link (thread_p, load_args->btid, &load_args->leaf_nleaf_recdes,
&load_args->ovf.vpid, NULL, NULL);
load_args->overflowing = true;
/* Set out_recdes to ovf_recdes. */
load_args->out_recdes = &load_args->ovf_recdes;
} /* Current page is a leaf page */
/* Initialize the memory area for the next record */
assert (load_args->out_recdes == &load_args->ovf_recdes);
load_args->out_recdes->length = 0;
load_args->new_pos = load_args->out_recdes->data;
load_args->curr_rec_max_obj_count =
BTREE_MAX_OIDCOUNT_IN_SIZE (load_args->btid, spage_max_space_for_new_record (thread_p, load_args->ovf.pgptr));
load_args->curr_rec_obj_count = 1;
return NO_ERROR;
}
/*
* bt_load_add_same_key_to_record () - Insert the same key into an existing record
* return: int
* thread_p(in):
* load_args(in): Contains fields specifying where & how to create the record
* pparam(in): a bundle of record-related information
* sp_success(out): When this value is set to a value other than SP_SUCCESS,
* error processing is performed without changing the error code.
*
* Note:
*/
static int
bt_load_add_same_key_to_record (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam, int *sp_success)
{
int ret;
assert (*sp_success == SP_SUCCESS);
/* This key (retrieved key) is the same with the current one. */
load_args->curr_rec_obj_count++;
if (!MVCC_IS_HEADER_DELID_VALID (&pparam->mvcc_header))
{
ret = bt_load_invalidate_mvcc_del_id (thread_p, load_args, pparam);
if (ret != NO_ERROR)
{
return ret;
}
}
/* Check if there is space in the memory record for the new OID. If not dump the current record and
* create a new record. */
if (load_args->curr_rec_obj_count > load_args->curr_rec_max_obj_count)
{ /* no space for the new OID */
ret = bt_load_nospace_for_new_oid (thread_p, load_args, sp_success);
if (ret != NO_ERROR || *sp_success != SP_SUCCESS)
{
return ret;
}
}
if (load_args->overflowing || BTREE_IS_UNIQUE (load_args->btid->unique_pk))
{
/* all overflow OIDs have fixed header size; also, all OIDs of a unique index key (except the first
* one) have fixed size */
BTREE_MVCC_SET_HEADER_FIXED_SIZE (&pparam->mvcc_header);
}
/* Insert new OID, class OID (for unique), and MVCCID's. */
/* Insert OID (and MVCC flags) */
btree_set_mvcc_flags_into_oid (&pparam->mvcc_header, &pparam->rec_oid);
OR_PUT_OID (load_args->new_pos, &pparam->rec_oid);
load_args->out_recdes->length += OR_OID_SIZE;
load_args->new_pos += OR_OID_SIZE;
if (BTREE_IS_UNIQUE (load_args->btid->unique_pk))
{ /* Insert class OID */
OR_PUT_OID (load_args->new_pos, &pparam->class_oid);
load_args->out_recdes->length += OR_OID_SIZE;
load_args->new_pos += OR_OID_SIZE;
}
btree_mvcc_info_from_heap_mvcc_header (&pparam->mvcc_header, &pparam->mvcc_info);
/* Insert MVCCID's */
load_args->out_recdes->length += btree_packed_mvccinfo_size (&pparam->mvcc_info);
load_args->new_pos = btree_pack_mvccinfo (load_args->new_pos, &pparam->mvcc_info);
assert (load_args->out_recdes->length <= load_args->out_recdes->area_size);
#if !defined (NDEBUG)
btree_check_valid_record (thread_p, load_args->btid, load_args->out_recdes,
(load_args->overflowing ? BTREE_OVERFLOW_NODE : BTREE_LEAF_NODE), NULL);
#endif
return NO_ERROR;
}
/*
* bt_load_notify_to_vacuum () -
* return: int
* thread_p(in):
* load_args(in): Contains fields specifying where & how to create the record
* pparam(in): a bundle of record-related information
* notify_vacuum_rv_data(in):
* notify_vacuum_rv_data_bufalign(in):
*
* Note:
*/
static int
bt_load_notify_to_vacuum (THREAD_ENTRY * thread_p, LOAD_ARGS * load_args, S_PARAM_ST * pparam,
char **notify_vacuum_rv_data, char *notify_vacuum_rv_data_bufalign)
{
int ret;
if (pparam->is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE,
"DEBUG_BTREE: load added object(%d, %d, %d) "
"class(%d, %d, %d) and btid(%d, (%d, %d)) with mvccinfo=%llu | %llu", pparam->rec_oid.volid,
pparam->rec_oid.pageid, pparam->rec_oid.slotid, pparam->class_oid.volid,
pparam->class_oid.pageid, pparam->class_oid.slotid,
load_args->btid->sys_btid->root_pageid, load_args->btid->sys_btid->vfid.volid,
load_args->btid->sys_btid->vfid.fileid, MVCC_GET_INSID (&pparam->mvcc_header),
MVCC_GET_DELID (&pparam->mvcc_header));
}
/* Some objects have been recently deleted and couldn't be filtered (because there may be running transaction
* that can still see them. However, they must be vacuumed later. Vacuum can only find them by parsing log,
* therefore some log records are required. These are dummy records with the sole purpose of notifying vacuum.
* Since rec_oid, class_oid and mvcc_header could be replaced in case first object of unique index had to
* be swapped, we will use orig_oid, orig_class_oid and orig_mvcc_header to log object load for
* vacuum. */
if (MVCC_IS_HEADER_DELID_VALID (&pparam->orig_mvcc_header)
|| MVCC_IS_HEADER_INSID_NOT_ALL_VISIBLE (&pparam->orig_mvcc_header))
{
/* There is something to vacuum for this object */
int notify_vacuum_rv_data_capacity = IO_MAX_PAGE_SIZE;
int notify_vacuum_rv_data_length = 0;
char *pgptr = (load_args->overflowing ? load_args->ovf.pgptr : load_args->leaf.pgptr);
/* clear flags for logging */
btree_clear_mvcc_flags_from_oid (&pparam->orig_oid);
if (pparam->is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE,
"DEBUG_BTREE: load notify vacuum object(%d, %d, %d) "
"class(%d, %d, %d) and btid(%d, (%d, %d)) with mvccinfo=%llu | %llu",
pparam->orig_oid.volid, pparam->orig_oid.pageid, pparam->orig_oid.slotid,
pparam->orig_class_oid.volid, pparam->orig_class_oid.pageid, pparam->orig_class_oid.slotid,
load_args->btid->sys_btid->root_pageid, load_args->btid->sys_btid->vfid.volid,
load_args->btid->sys_btid->vfid.fileid, MVCC_GET_INSID (&pparam->orig_mvcc_header),
MVCC_GET_DELID (&pparam->orig_mvcc_header));
}
/* append log data */
btree_mvcc_info_from_heap_mvcc_header (&pparam->orig_mvcc_header, &pparam->mvcc_info);
ret =
btree_rv_save_keyval_for_undo (load_args->btid, &pparam->this_key, &pparam->orig_class_oid, &pparam->orig_oid,
&pparam->mvcc_info, BTREE_OP_NOTIFY_VACUUM, notify_vacuum_rv_data_bufalign,
notify_vacuum_rv_data, ¬ify_vacuum_rv_data_capacity,
¬ify_vacuum_rv_data_length);
if (ret != NO_ERROR)
{
return ret;
}
log_append_undo_data2 (thread_p, RVBT_MVCC_NOTIFY_VACUUM, &load_args->btid->sys_btid->vfid, NULL, -1,
notify_vacuum_rv_data_length, *notify_vacuum_rv_data);
pgbuf_set_dirty (thread_p, pgptr, DONT_FREE);
}
return NO_ERROR;
}
/*
* btree_construct_leafs () - Output function for index sorting;constructs leaf
* nodes
* return: int
* in_recdes(in): specifies the next sort item in the sorting order.
* arg(in): output arguments; provides information about how to use the
* next item in the sorted list to construct the leaf nodes of
* the Btree.
*
* Note: This function creates the btree leaf nodes. It is passed
* by the "btree_index_sort" function to the "sort_listfile" function
* to use the sort items to build the records and pages of the btree.
*/
static int
btree_construct_leafs (THREAD_ENTRY * thread_p, const RECDES * in_recdes, void *arg)
{
int ret = NO_ERROR;
bool copy = false;
RECDES sort_key_recdes, *recdes;
char *next;
char notify_vacuum_rv_data_buffer[IO_MAX_PAGE_SIZE + BTREE_MAX_ALIGN];
char *notify_vacuum_rv_data_bufalign = PTR_ALIGN (notify_vacuum_rv_data_buffer, BTREE_MAX_ALIGN);
char *notify_vacuum_rv_data = notify_vacuum_rv_data_bufalign;
LOAD_ARGS *load_args = (LOAD_ARGS *) arg;
S_PARAM_ST sparam;
sparam.class_oid = oid_Null_oid;
sparam.rec_oid = oid_Null_oid;
sparam.is_btree_ops_log = prm_get_bool_value (PRM_ID_LOG_BTREE_OPS);
sparam.orig_oid = oid_Null_oid;
sparam.orig_class_oid = oid_Null_oid;
#if defined (SERVER_MODE)
/* Make sure MVCCID for current transaction is generated. */
(void) logtb_get_current_mvccid (thread_p);
#endif /* SERVER_MODE */
sort_key_recdes = *in_recdes;
recdes = &sort_key_recdes;
for (;;)
{ /* Infinite loop; will exit with break statement */
next = *(char **) recdes->data; /* save forward link */
if ((ret = bt_load_get_buf_from_record (recdes, load_args, &sparam, copy)) != NO_ERROR)
{
goto error;
}
if (VPID_ISNULL (&(load_args->leaf.vpid))) /* Find out if this is the first call to this function */
{
/* This is the first call to this function; so, initialize some fields in the LOAD_ARGS structure */
if ((ret = bt_load_get_first_leaf_page_and_init_args (thread_p, load_args, &sparam)) != NO_ERROR)
{
goto error;
}
}
else
{ /* This is not the first call to this function */
/*
* Compare the received key with the current one.
* If different, then dump the current record and create a new record.
*/
int sp_success = SP_SUCCESS;
int c = btree_compare_key (&sparam.this_key, &load_args->current_key, load_args->btid->key_type, 0, 1, NULL);
/* EQUALITY test only - doesn't care the reverse index */
if (c == DB_GT)
{ /* Current key is finished; dump this output record to the disk page */
ret = bt_load_make_new_record_on_leaf_page (thread_p, load_args, &sparam, &sp_success);
}
else if (c == DB_EQ)
{ /* This key (retrieved key) is the same with the current one. */
ret = bt_load_add_same_key_to_record (thread_p, load_args, &sparam, &sp_success);
}
else
{
assert_release (false);
goto error;
}
if (ret != NO_ERROR || sp_success != SP_SUCCESS)
{
goto error;
}
}
ret =
bt_load_notify_to_vacuum (thread_p, load_args, &sparam, ¬ify_vacuum_rv_data, notify_vacuum_rv_data_bufalign);
if (ret != NO_ERROR)
{
goto error;
}
/* set level 1 to leaf */
load_args->leaf.hdr.node_level = 1;
if (sparam.this_key.need_clear)
{
copy = true;
}
btree_clear_key_value (©, &sparam.this_key);
if (!next)
{
break; /* exit infinite loop */
}
/* move to next link */
recdes->data = next;
recdes->length = SORT_RECORD_LENGTH (next);
} // for (;;)
if (notify_vacuum_rv_data != NULL && notify_vacuum_rv_data != notify_vacuum_rv_data_bufalign)
{
db_private_free (thread_p, notify_vacuum_rv_data);
}
assert (ret == NO_ERROR);
return ret;
error:
pgbuf_unfix_and_init_after_check (thread_p, load_args->leaf.pgptr);
pgbuf_unfix_and_init_after_check (thread_p, load_args->ovf.pgptr);
btree_clear_key_value (©, &sparam.this_key);
if (notify_vacuum_rv_data != NULL && notify_vacuum_rv_data != notify_vacuum_rv_data_bufalign)
{
db_private_free (thread_p, notify_vacuum_rv_data);
}
assert (er_errid () != NO_ERROR);
return (ret == NO_ERROR && (ret = er_errid ()) == NO_ERROR) ? ER_FAILED : ret;
}
#if defined(CUBRID_DEBUG)
/*
* btree_dump_sort_output () - Sample output function for index sorting
* return: NO_ERROR
* recdes(in):
* load_args(in):
*
* Note: This function is a debugging function. It is passed by the
* btree_index_sort function to the sort_listfile function to print
* out the contents of the sort items once they are obtained in
* the requested sorting order.
*
*/
static int
btree_dump_sort_output (const RECDES * recdes, LOAD_ARGS * load_args)
{
OID this_oid;
DB_VALUE this_key;
OR_BUF buf;
bool copy = false;
int key_size = -1;
int ret = NO_ERROR;
/* First decompose the input record into the key and oid components */
or_init (&buf, recdes->data, recdes->length);
if (or_get_oid (&buf, &this_oid) != NO_ERROR)
{
goto exit_on_error;
}
buf.ptr = PTR_ALIGN (buf.ptr, MAX_ALIGNMENT);
/* Do not copy the string--just use the pointer. The pr_ routines for strings and sets have different semantics for
* length. */
if (TP_DOMAIN_TYPE (load_args->btid->key_type) == DB_TYPE_MIDXKEY)
{
key_size = buf.endptr - buf.ptr;
}
if ((*(load_args->btid->key_type->type->readval)) (&buf, &this_key, load_args->btid->key_type, key_size, copy, NULL,
0) != NO_ERROR)
{
goto exit_on_error;
}
printf ("Attribute: ");
btree_dump_key (stdout, &this_key);
printf (" Volid: %d", this_oid.volid);
printf (" Pageid: %d", this_oid.pageid);
printf (" Slotid: %d\n", this_oid.slotid);
end:
copy = btree_clear_key_value (copy, &this_key);
return ret;
exit_on_error:
return (ret == NO_ERROR && (ret = er_errid ()) == NO_ERROR) ? ER_FAILED : ret;
}
#endif /* CUBRID_DEBUG */
/*
* btree_index_sort () - Sort for the index file creation
* return: int
* sort_args(in): sort arguments; specifies the sort-attribute as well as
* the structure of the input objects
* out_func(in): output function to utilize the sorted items as they are
* produced
* out_args(in): arguments to the out_func.
*
* Note: This function supports the initial loading phase of B+tree
* indices by providing an ordered list of (index-attribute
* value, object address) pairs. It uses the general sorting
* facility provided in the "sr" module.
*/
static int
btree_index_sort (THREAD_ENTRY * thread_p, SORT_ARGS * sort_args, SORT_PUT_FUNC * out_func, void *out_args)
{
int i;
bool includes_tde_class = false;
TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
for (i = 0; i < sort_args->n_classes; i++)
{
if (heap_get_class_tde_algorithm (thread_p, &sort_args->class_ids[i], &tde_algo) != NO_ERROR)
{
return ER_FAILED;
}
if (tde_algo != TDE_ALGORITHM_NONE)
{
includes_tde_class = true;
break;
}
}
return sort_listfile (thread_p, sort_args->hfids[0].vfid.volid, 0 /* TODO - support parallelism */ ,
&btree_sort_get_next, sort_args, out_func, out_args, compare_driver, sort_args, SORT_DUP,
NO_SORT_LIMIT, includes_tde_class, SORT_INDEX_LEAF);
}
static SCAN_CODE
get_next_vpid (THREAD_ENTRY * thread_p, parallel_query::ftab_set & ftab, FILE_PARTIAL_SECTOR * fsector, int *offset,
VPID * vpid_out, HEAP_SCANCACHE * scan_cache, PGBUF_WATCHER * old_pgwatcher, HFID * hfid)
{
int error = NO_ERROR;
assert (*offset >= 0);
while (true)
{
if (fsector == NULL || VSID_IS_NULL (&fsector->vsid))
{
/* try to get new partial sector */
FILE_PARTIAL_SECTOR new_fsector = ftab.get_next ();
if (VSID_IS_NULL (&new_fsector.vsid))
{
/* end */
if (old_pgwatcher->pgptr != NULL)
{
pgbuf_ordered_unfix (thread_p, old_pgwatcher);
}
return S_END;
}
*fsector = new_fsector;
*offset = 0;
vpid_out->volid = new_fsector.vsid.volid;
vpid_out->pageid = SECTOR_FIRST_PAGEID (new_fsector.vsid.sectid);
}
else
{
(*offset)++;
vpid_out->volid = fsector->vsid.volid;
vpid_out->pageid = SECTOR_FIRST_PAGEID (fsector->vsid.sectid) + (*offset);
}
/* fsector exists */
for (; *offset < DISK_SECTOR_NPAGES; (*offset)++, (*vpid_out).pageid++)
{
if (vpid_out->pageid == hfid->vfid.fileid && vpid_out->volid == hfid->vfid.volid)
{
/* ftab page, skip it. */
continue;
}
if (bit64_is_set (fsector->page_bitmap, *offset))
{
if (scan_cache->page_watcher.pgptr != NULL)
{
pgbuf_replace_watcher (thread_p, &scan_cache->page_watcher, old_pgwatcher);
}
error = pgbuf_ordered_fix (thread_p, vpid_out, OLD_PAGE_MAYBE_DEALLOCATED, PGBUF_LATCH_READ,
&scan_cache->page_watcher);
er_log_debug (ARG_FILE_LINE, "parallel btree: thread %d fix page %d|%d for scanning", thread_p->index,
vpid_out->volid, vpid_out->pageid);
if (scan_cache->page_watcher.pgptr == NULL)
{
if (error != NO_ERROR && error != ER_PB_BAD_PAGEID)
{
/* non-dealloc error (e.g. ER_INTERRUPTED): propagate */
if (old_pgwatcher->pgptr != NULL)
{
pgbuf_ordered_unfix (thread_p, old_pgwatcher);
}
return S_ERROR;
}
/* when bitmap is built, that page was valid.
* but now, it's deallocated in some reasons.
* this is not error, it can be ignored */
er_clear ();
continue;
}
if (old_pgwatcher->pgptr != NULL)
{
pgbuf_ordered_unfix (thread_p, old_pgwatcher);
}
if (error != NO_ERROR)
{
return S_ERROR;
}
return S_SUCCESS;
}
}
assert (fsector);
VSID_SET_NULL (&fsector->vsid);
}
assert (false);
return S_ERROR;
}
SORT_STATUS
btree_sort_get_next_parallel (THREAD_ENTRY * thread_p, RECDES * temp_recdes, void *arg)
{
SCAN_CODE page_iter_scan_result, slot_iter_scan_result;
DB_VALUE dbvalue;
DB_VALUE *dbvalue_ptr;
int key_len;
OID prev_oid;
SORT_ARGS *sort_args;
int value_has_null;
char midxkey_buf[DBVAL_BUFSIZE + MAX_ALIGNMENT], *aligned_midxkey_buf;
int *prefix_lengthp;
int result;
MVCC_REC_HEADER mvcc_header = MVCC_REC_HEADER_INITIALIZER;
MVCC_SNAPSHOT mvcc_snapshot_dirty;
MVCC_SATISFIES_SNAPSHOT_RESULT snapshot_dirty_satisfied;
bool is_btree_ops_log = prm_get_bool_value (PRM_ID_LOG_BTREE_OPS);
PGBUF_WATCHER old_pgwatcher;
HFID hfid;
VPID vpid = vpid_Null_vpid;
db_make_null (&dbvalue);
aligned_midxkey_buf = PTR_ALIGN (midxkey_buf, MAX_ALIGNMENT);
sort_args = (SORT_ARGS *) arg;
prev_oid = sort_args->cur_oid;
mvcc_snapshot_dirty.snapshot_fnc = mvcc_satisfies_dirty;
VPID_SET_NULL (&vpid);
hfid = sort_args->hfids[0];
PGBUF_INIT_WATCHER (&old_pgwatcher, PGBUF_ORDERED_HEAP_NORMAL, &hfid);
if (OID_ISNULL (&sort_args->cur_oid))
{
page_iter_scan_result =
get_next_vpid (thread_p, (*sort_args->ftab_sets)[sort_args->cur_class], &sort_args->curr_sec,
&sort_args->curr_pgoffset, &vpid, &sort_args->hfscan_cache, &old_pgwatcher,
&sort_args->hfids[sort_args->cur_class]);
if (page_iter_scan_result == S_END)
{
return SORT_NOMORE_RECS;
}
else if (page_iter_scan_result == S_SUCCESS)
{
;
}
else
{
return SORT_ERROR_OCCURRED;
}
}
else
{
vpid.pageid = sort_args->cur_oid.pageid;
vpid.volid = sort_args->cur_oid.volid;
}
do
{ /* Infinite loop */
int cur_class, attr_offset;
bool save_cache_last_fix_page;
/*
* This infinite loop will be exited when a satisfactory next value is
* found (i.e., when an object belonging to this class with a non-null
* attribute value is found), or when there are no more objects in the
* heap files.
*/
/*
* RETRIEVE THE NEXT OBJECT
*/
cur_class = sort_args->cur_class;
attr_offset = cur_class * sort_args->n_attrs;
sort_args->in_recdes.data = NULL;
if (vpid.pageid == sort_args->hfids[cur_class].vfid.fileid
&& vpid.volid == sort_args->hfids[cur_class].vfid.volid)
{
/* ftab page */
slot_iter_scan_result = S_END;
}
else
{
slot_iter_scan_result =
heap_next_1page (thread_p, &sort_args->hfids[cur_class], &vpid, &sort_args->class_ids[cur_class],
&sort_args->cur_oid, &sort_args->in_recdes, &sort_args->hfscan_cache,
sort_args->hfscan_cache.cache_last_fix_page ? PEEK : COPY);
}
switch (slot_iter_scan_result)
{
case S_SUCCESS:
break;
case S_END:
{
page_iter_scan_result =
get_next_vpid (thread_p, (*sort_args->ftab_sets)[sort_args->cur_class], &sort_args->curr_sec,
&sort_args->curr_pgoffset, &vpid, &sort_args->hfscan_cache, &old_pgwatcher,
&sort_args->hfids[sort_args->cur_class]);
if (page_iter_scan_result == S_END)
{
/* fall through */
}
else if (page_iter_scan_result == S_SUCCESS)
{
continue;
}
else
{
return SORT_ERROR_OCCURRED;
}
/* No more objects in this heap, finish the current scan */
save_cache_last_fix_page = sort_args->hfscan_cache.cache_last_fix_page;
bt_load_heap_scancache_end_for_attrinfo (thread_p, sort_args, NULL, NULL);
/* Are we through with all the non-null heaps? */
sort_args->cur_class++;
while ((sort_args->cur_class < sort_args->n_classes)
&& HFID_IS_NULL (&sort_args->hfids[sort_args->cur_class]))
{
sort_args->cur_class++;
}
if (sort_args->cur_class == sort_args->n_classes)
{
return SORT_NOMORE_RECS;
}
else
{
/* When there is a class inheritance relationship, n_classes may be 2 or more
* only if it is a reverse unique, unique, or primary index.
* In addition, filter and func_index_info cannot exist in this case.
*/
/* start up the next scan */
cur_class = sort_args->cur_class;
attr_offset = cur_class * sort_args->n_attrs;
if (bt_load_heap_scancache_start_for_attrinfo
(thread_p, sort_args, NULL, NULL, save_cache_last_fix_page) != NO_ERROR)
{
return SORT_ERROR_OCCURRED;
}
/* set the scan to the initial state for this new heap */
OID_SET_NULL (&sort_args->cur_oid);
if (is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE, "DEBUG_BTREE: load start on class(%d, %d, %d), btid(%d, (%d, %d)).",
sort_args->class_ids[sort_args->cur_class].volid,
sort_args->class_ids[sort_args->cur_class].pageid,
sort_args->class_ids[sort_args->cur_class].slotid,
sort_args->btid->sys_btid->root_pageid, sort_args->btid->sys_btid->vfid.volid,
sort_args->btid->sys_btid->vfid.fileid);
}
hfid = sort_args->hfids[sort_args->cur_class];
PGBUF_INIT_WATCHER (&old_pgwatcher, PGBUF_ORDERED_HEAP_NORMAL, &hfid);
}
continue;
}
/*
case S_ERROR:
case S_DOESNT_EXIST:
case S_DOESNT_FIT:
case S_SUCCESS_CHN_UPTODATE:
case S_SNAPSHOT_NOT_SATISFIED:
*/
default:
return SORT_ERROR_OCCURRED;
}
/*
* Produce the sort item for this object
*/
/* filter out dead records before any more checks */
if (or_mvcc_get_header (&sort_args->in_recdes, &mvcc_header) != NO_ERROR)
{
return SORT_ERROR_OCCURRED;
}
if (MVCC_IS_HEADER_DELID_VALID (&mvcc_header) && MVCC_GET_DELID (&mvcc_header) < sort_args->oldest_visible_mvccid)
{
continue;
}
if (MVCC_IS_HEADER_INSID_NOT_ALL_VISIBLE (&mvcc_header)
&& MVCC_GET_INSID (&mvcc_header) < sort_args->oldest_visible_mvccid)
{
/* Insert MVCCID is now visible to everyone. Clear it to avoid unnecessary vacuuming. */
MVCC_CLEAR_FLAG_BITS (&mvcc_header, OR_MVCC_FLAG_VALID_INSID);
}
snapshot_dirty_satisfied = mvcc_snapshot_dirty.snapshot_fnc (thread_p, &mvcc_header, &mvcc_snapshot_dirty);
if (sort_args->filter)
{
if (heap_attrinfo_read_dbvalues
(thread_p, &sort_args->cur_oid, &sort_args->in_recdes, sort_args->filter->cache_pred) != NO_ERROR)
{
return SORT_ERROR_OCCURRED;
}
result = (*sort_args->filter_eval_func) (thread_p, sort_args->filter->pred, NULL, &sort_args->cur_oid);
if (result == V_ERROR)
{
return SORT_ERROR_OCCURRED;
}
else if (result != V_TRUE)
{
continue;
}
}
if (sort_args->func_index_info && sort_args->func_index_info->expr)
{
if (snapshot_dirty_satisfied != SNAPSHOT_SATISFIED)
{
/* Check snapshot before key generation. Key generation may leads to errors when a function is involved. */
continue;
}
}
prefix_lengthp = (sort_args->attrs_prefix_length) ? &(sort_args->attrs_prefix_length[0]) : NULL;
dbvalue_ptr =
heap_attrinfo_generate_key (thread_p, sort_args->n_attrs, &sort_args->attr_ids[attr_offset], prefix_lengthp,
&sort_args->attr_info, &sort_args->in_recdes, &dbvalue, aligned_midxkey_buf,
sort_args->func_index_info, NULL, &sort_args->cur_oid);
if (dbvalue_ptr == NULL)
{
return SORT_ERROR_OCCURRED;
}
value_has_null = 0; /* init */
if (DB_IS_NULL (dbvalue_ptr) || btree_multicol_key_has_null (dbvalue_ptr))
{
if (sort_args->not_null_flag && snapshot_dirty_satisfied == SNAPSHOT_SATISFIED)
{
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_NOT_NULL_DOES_NOT_ALLOW_NULL_VALUE, 0);
return SORT_ERROR_OCCURRED;
}
value_has_null = 1; /* found null columns */
}
if (DB_IS_NULL (dbvalue_ptr) || btree_multicol_key_is_null (dbvalue_ptr))
{
if (snapshot_dirty_satisfied == SNAPSHOT_SATISFIED)
{
/* All objects that were not candidates for vacuum are loaded, but statistics should only care for
* objects that have not been deleted and committed at the time of load. */
sort_args->n_oids++; /* Increment the OID counter */
sort_args->n_nulls++; /* Increment the NULL counter */
}
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
if (is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE,
"DEBUG_BTREE: load sort found null at oid(%d, %d, %d)"
", class_oid(%d, %d, %d), btid(%d, (%d, %d).", sort_args->cur_oid.volid,
sort_args->cur_oid.pageid, sort_args->cur_oid.slotid,
sort_args->class_ids[sort_args->cur_class].volid,
sort_args->class_ids[sort_args->cur_class].pageid,
sort_args->class_ids[sort_args->cur_class].slotid, sort_args->btid->sys_btid->root_pageid,
sort_args->btid->sys_btid->vfid.volid, sort_args->btid->sys_btid->vfid.fileid);
}
continue;
}
key_len = sort_args->key_type->type->get_disk_size_of_value (dbvalue_ptr);
if (key_len > 0)
{
result = bt_load_put_buf_to_record (temp_recdes, sort_args, value_has_null, &prev_oid, &mvcc_header,
dbvalue_ptr, key_len, cur_class, is_btree_ops_log);
if (result != NO_ERROR)
{
goto nofit;
}
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
}
if (snapshot_dirty_satisfied == SNAPSHOT_SATISFIED)
{
/* All objects that were not candidates for vacuum are loaded, but statistics should only care for objects
* that have not been deleted and committed at the time of load. */
sort_args->n_oids++; /* Increment the OID counter */
}
if (key_len > 0)
{
return SORT_SUCCESS;
}
}
while (true);
nofit:
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
return SORT_REC_DOESNT_FIT;
}
/*
* btree_sort_get_next () - Get_key function for index sorting
* return: SORT_STATUS
* temp_recdes(in): temporary record descriptor; specifies where to put the
* next sort item.
* arg(in): sort arguments; provides information about how to produce
* the next sort item.
*
* Note: This function is passed by the "btree_index_sort" function to
* the "sort_listfile" function to obtain the value of the attribute
* (on which the B+tree index for the class is to be created)
* of each object successively.
*/
static SORT_STATUS
btree_sort_get_next (THREAD_ENTRY * thread_p, RECDES * temp_recdes, void *arg)
{
SCAN_CODE scan_result;
DB_VALUE dbvalue;
DB_VALUE *dbvalue_ptr;
int key_len;
OID prev_oid;
SORT_ARGS *sort_args;
int value_has_null;
char midxkey_buf[DBVAL_BUFSIZE + MAX_ALIGNMENT], *aligned_midxkey_buf;
int *prefix_lengthp;
int result;
MVCC_REC_HEADER mvcc_header = MVCC_REC_HEADER_INITIALIZER;
MVCC_SNAPSHOT mvcc_snapshot_dirty;
MVCC_SATISFIES_SNAPSHOT_RESULT snapshot_dirty_satisfied;
bool is_btree_ops_log = prm_get_bool_value (PRM_ID_LOG_BTREE_OPS);
db_make_null (&dbvalue);
aligned_midxkey_buf = PTR_ALIGN (midxkey_buf, MAX_ALIGNMENT);
sort_args = (SORT_ARGS *) arg;
prev_oid = sort_args->cur_oid;
mvcc_snapshot_dirty.snapshot_fnc = mvcc_satisfies_dirty;
do
{ /* Infinite loop */
int cur_class, attr_offset;
bool save_cache_last_fix_page;
/*
* This infinite loop will be exited when a satisfactory next value is
* found (i.e., when an object belonging to this class with a non-null
* attribute value is found), or when there are no more objects in the
* heap files.
*/
/*
* RETRIEVE THE NEXT OBJECT
*/
cur_class = sort_args->cur_class;
attr_offset = cur_class * sort_args->n_attrs;
sort_args->in_recdes.data = NULL;
scan_result =
heap_next (thread_p, &sort_args->hfids[cur_class], &sort_args->class_ids[cur_class], &sort_args->cur_oid,
&sort_args->in_recdes, &sort_args->hfscan_cache,
sort_args->hfscan_cache.cache_last_fix_page ? PEEK : COPY);
switch (scan_result)
{
case S_SUCCESS:
break;
case S_END:
/* No more objects in this heap, finish the current scan */
save_cache_last_fix_page = sort_args->hfscan_cache.cache_last_fix_page;
bt_load_heap_scancache_end_for_attrinfo (thread_p, sort_args, NULL, NULL);
/* Are we through with all the non-null heaps? */
sort_args->cur_class++;
while ((sort_args->cur_class < sort_args->n_classes)
&& HFID_IS_NULL (&sort_args->hfids[sort_args->cur_class]))
{
sort_args->cur_class++;
}
if (sort_args->cur_class == sort_args->n_classes)
{
return SORT_NOMORE_RECS;
}
else
{
/* When there is a class inheritance relationship, n_classes may be 2 or more
* only if it is a reverse unique, unique, or primary index.
* In addition, filter and func_index_info cannot exist in this case.
*/
/* start up the next scan */
cur_class = sort_args->cur_class;
attr_offset = cur_class * sort_args->n_attrs;
if (bt_load_heap_scancache_start_for_attrinfo (thread_p, sort_args, NULL, NULL, save_cache_last_fix_page)
!= NO_ERROR)
{
return SORT_ERROR_OCCURRED;
}
/* set the scan to the initial state for this new heap */
OID_SET_NULL (&sort_args->cur_oid);
if (is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE, "DEBUG_BTREE: load start on class(%d, %d, %d), btid(%d, (%d, %d)).",
sort_args->class_ids[sort_args->cur_class].volid,
sort_args->class_ids[sort_args->cur_class].pageid,
sort_args->class_ids[sort_args->cur_class].slotid,
sort_args->btid->sys_btid->root_pageid, sort_args->btid->sys_btid->vfid.volid,
sort_args->btid->sys_btid->vfid.fileid);
}
}
continue;
/*
case S_ERROR:
case S_DOESNT_EXIST:
case S_DOESNT_FIT:
case S_SUCCESS_CHN_UPTODATE:
case S_SNAPSHOT_NOT_SATISFIED:
*/
default:
return SORT_ERROR_OCCURRED;
}
/*
* Produce the sort item for this object
*/
/* filter out dead records before any more checks */
if (or_mvcc_get_header (&sort_args->in_recdes, &mvcc_header) != NO_ERROR)
{
return SORT_ERROR_OCCURRED;
}
if (MVCC_IS_HEADER_DELID_VALID (&mvcc_header) && MVCC_GET_DELID (&mvcc_header) < sort_args->oldest_visible_mvccid)
{
continue;
}
if (MVCC_IS_HEADER_INSID_NOT_ALL_VISIBLE (&mvcc_header)
&& MVCC_GET_INSID (&mvcc_header) < sort_args->oldest_visible_mvccid)
{
/* Insert MVCCID is now visible to everyone. Clear it to avoid unnecessary vacuuming. */
MVCC_CLEAR_FLAG_BITS (&mvcc_header, OR_MVCC_FLAG_VALID_INSID);
}
snapshot_dirty_satisfied = mvcc_snapshot_dirty.snapshot_fnc (thread_p, &mvcc_header, &mvcc_snapshot_dirty);
if (sort_args->filter)
{
if (heap_attrinfo_read_dbvalues
(thread_p, &sort_args->cur_oid, &sort_args->in_recdes, sort_args->filter->cache_pred) != NO_ERROR)
{
return SORT_ERROR_OCCURRED;
}
result = (*sort_args->filter_eval_func) (thread_p, sort_args->filter->pred, NULL, &sort_args->cur_oid);
if (result == V_ERROR)
{
return SORT_ERROR_OCCURRED;
}
else if (result != V_TRUE)
{
continue;
}
}
if (sort_args->func_index_info && sort_args->func_index_info->expr)
{
if (snapshot_dirty_satisfied != SNAPSHOT_SATISFIED)
{
/* Check snapshot before key generation. Key generation may leads to errors when a function is involved. */
continue;
}
}
prefix_lengthp = (sort_args->attrs_prefix_length) ? &(sort_args->attrs_prefix_length[0]) : NULL;
dbvalue_ptr =
heap_attrinfo_generate_key (thread_p, sort_args->n_attrs, &sort_args->attr_ids[attr_offset], prefix_lengthp,
&sort_args->attr_info, &sort_args->in_recdes, &dbvalue, aligned_midxkey_buf,
sort_args->func_index_info, NULL, &sort_args->cur_oid);
if (dbvalue_ptr == NULL)
{
return SORT_ERROR_OCCURRED;
}
value_has_null = 0; /* init */
if (DB_IS_NULL (dbvalue_ptr) || btree_multicol_key_has_null (dbvalue_ptr))
{
if (sort_args->not_null_flag && snapshot_dirty_satisfied == SNAPSHOT_SATISFIED)
{
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_NOT_NULL_DOES_NOT_ALLOW_NULL_VALUE, 0);
return SORT_ERROR_OCCURRED;
}
value_has_null = 1; /* found null columns */
}
if (DB_IS_NULL (dbvalue_ptr) || btree_multicol_key_is_null (dbvalue_ptr))
{
if (snapshot_dirty_satisfied == SNAPSHOT_SATISFIED)
{
/* All objects that were not candidates for vacuum are loaded, but statistics should only care for
* objects that have not been deleted and committed at the time of load. */
sort_args->n_oids++; /* Increment the OID counter */
sort_args->n_nulls++; /* Increment the NULL counter */
}
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
if (is_btree_ops_log)
{
_er_log_debug (ARG_FILE_LINE,
"DEBUG_BTREE: load sort found null at oid(%d, %d, %d)"
", class_oid(%d, %d, %d), btid(%d, (%d, %d).", sort_args->cur_oid.volid,
sort_args->cur_oid.pageid, sort_args->cur_oid.slotid,
sort_args->class_ids[sort_args->cur_class].volid,
sort_args->class_ids[sort_args->cur_class].pageid,
sort_args->class_ids[sort_args->cur_class].slotid, sort_args->btid->sys_btid->root_pageid,
sort_args->btid->sys_btid->vfid.volid, sort_args->btid->sys_btid->vfid.fileid);
}
continue;
}
key_len = sort_args->key_type->type->get_disk_size_of_value (dbvalue_ptr);
if (key_len > 0)
{
result = bt_load_put_buf_to_record (temp_recdes, sort_args, value_has_null, &prev_oid, &mvcc_header,
dbvalue_ptr, key_len, cur_class, is_btree_ops_log);
if (result != NO_ERROR)
{
goto nofit;
}
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
}
if (snapshot_dirty_satisfied == SNAPSHOT_SATISFIED)
{
/* All objects that were not candidates for vacuum are loaded, but statistics should only care for objects
* that have not been deleted and committed at the time of load. */
sort_args->n_oids++; /* Increment the OID counter */
}
if (key_len > 0)
{
return SORT_SUCCESS;
}
}
while (true);
nofit:
if (dbvalue_ptr == &dbvalue || dbvalue_ptr->need_clear == true)
{
pr_clear_value (dbvalue_ptr);
}
return SORT_REC_DOESNT_FIT;
}
/*
* compare_driver () -
* return:
* first(in):
* second(in):
* arg(in):
*/
static int
compare_driver (const void *first, const void *second, void *arg)
{
char *mem1 = *(char **) first;
char *mem2 = *(char **) second;
int has_null;
SORT_ARGS *sort_args;
TP_DOMAIN *key_type;
char *oidptr1, *oidptr2;
int c = DB_UNK;
sort_args = (SORT_ARGS *) arg;
key_type = sort_args->key_type;
assert (PTR_ALIGN (mem1, MAX_ALIGNMENT) == mem1);
assert (PTR_ALIGN (mem2, MAX_ALIGNMENT) == mem2);
/* Skip next link */
mem1 += sizeof (char *);
mem2 += sizeof (char *);
/* Read value_has_null */
assert (OR_GET_BYTE (mem1) == 0 || OR_GET_BYTE (mem1) == 1);
assert (OR_GET_BYTE (mem2) == 0 || OR_GET_BYTE (mem2) == 1);
has_null = (OR_GET_BYTE (mem1) || OR_GET_BYTE (mem2)) ? 1 : 0;
mem1 += OR_INT_SIZE;
mem2 += OR_INT_SIZE;
assert (PTR_ALIGN (mem1, INT_ALIGNMENT) == mem1);
assert (PTR_ALIGN (mem2, INT_ALIGNMENT) == mem2);
oidptr1 = mem1;
oidptr2 = mem2;
/* Skip the oids */
if (BTREE_IS_UNIQUE (sort_args->unique_pk))
{ /* unique index */
mem1 += (2 * OR_OID_SIZE);
mem2 += (2 * OR_OID_SIZE);
}
else
{ /* non-unique index */
mem1 += OR_OID_SIZE;
mem2 += OR_OID_SIZE;
}
assert (PTR_ALIGN (mem1, INT_ALIGNMENT) == mem1);
assert (PTR_ALIGN (mem2, INT_ALIGNMENT) == mem2);
/* Skip the MVCCID's */
mem1 += 2 * OR_MVCCID_SIZE;
mem2 += 2 * OR_MVCCID_SIZE;
assert (PTR_ALIGN (mem1, INT_ALIGNMENT) == mem1);
assert (PTR_ALIGN (mem2, INT_ALIGNMENT) == mem2);
if (TP_DOMAIN_TYPE (key_type) == DB_TYPE_MIDXKEY)
{
int i;
char *nullmap_ptr1, *nullmap_ptr2;
int header_size;
TP_DOMAIN *dom;
/* fast implementation of pr_midxkey_compare (). do not use DB_VALUE container for speed-up */
nullmap_ptr1 = mem1;
nullmap_ptr2 = mem2;
header_size = or_multi_header_size (key_type->precision);
mem1 += header_size;
mem2 += header_size;
#if !defined(NDEBUG)
for (i = 0, dom = key_type->setdomain; dom; dom = dom->next, i++);
assert (i == key_type->precision);
if (sort_args->func_index_info != NULL)
{
/* In the following cases, the precision may be smaller than n_attrs.
* create index idx on tbl(left(s2, v1),v3);
* So, remove the assert(). */
//assert (sort_args->n_attrs <= key_type->precision);
}
else
{
assert (sort_args->n_attrs == key_type->precision);
}
assert (key_type->setdomain != NULL);
#endif
for (i = 0, dom = key_type->setdomain; i < key_type->precision && dom; i++, dom = dom->next)
{
/* val1 or val2 is NULL */
if (has_null)
{
if (or_multi_is_null (nullmap_ptr1, i))
{ /* element val is null? */
if (or_multi_is_null (nullmap_ptr2, i))
{
continue;
}
c = DB_LT;
break; /* exit for-loop */
}
else if (or_multi_is_null (nullmap_ptr2, i))
{
c = DB_GT;
break; /* exit for-loop */
}
}
/* check for val1 and val2 same domain */
c = dom->type->index_cmpdisk (mem1, mem2, dom, 0, 1, NULL);
assert (c == DB_LT || c == DB_EQ || c == DB_GT);
if (c != DB_EQ)
{
break; /* exit for-loop */
}
mem1 += pr_midxkey_element_disk_size (mem1, dom);
mem2 += pr_midxkey_element_disk_size (mem2, dom);
} /* for (i = 0; ... ) */
assert (c == DB_LT || c == DB_EQ || c == DB_GT);
if (dom && dom->is_desc)
{
c = ((c == DB_GT) ? DB_LT : (c == DB_LT) ? DB_GT : c);
}
}
else
{
OR_BUF buf_val1, buf_val2;
DB_VALUE val1, val2;
or_init (&buf_val1, mem1, -1);
or_init (&buf_val2, mem2, -1);
if (key_type->type->data_readval (&buf_val1, &val1, key_type, -1, false, NULL, 0) != NO_ERROR)
{
assert (false);
return DB_UNK;
}
if (key_type->type->data_readval (&buf_val2, &val2, key_type, -1, false, NULL, 0) != NO_ERROR)
{
assert (false);
return DB_UNK;
}
c = btree_compare_key (&val1, &val2, key_type, 0, 1, NULL);
/* Clear the values if it is required */
if (DB_NEED_CLEAR (&val1))
{
pr_clear_value (&val1);
}
if (DB_NEED_CLEAR (&val2))
{
pr_clear_value (&val2);
}
}
assert (c == DB_LT || c == DB_EQ || c == DB_GT);
/* compare OID for non-unique index */
if (c == DB_EQ)
{
OID first_oid, second_oid;
OR_GET_OID (oidptr1, &first_oid);
OR_GET_OID (oidptr2, &second_oid);
assert_release (!OID_EQ (&first_oid, &second_oid));
return (OID_LT (&first_oid, &second_oid) ? DB_LT : DB_GT);
}
return c;
}
/*
* Linked list implementation
*/
/*
* list_add () -
* return: NO_ERROR
* list(in): which list to add
* pageid(in): what value to put to the new node
*
* Note: This function adds a new node to the end of the given list.
*/
static int
list_add (BTREE_NODE ** list, VPID * pageid)
{
BTREE_NODE *new_node;
BTREE_NODE *next_node;
int ret = NO_ERROR;
new_node = (BTREE_NODE *) os_malloc (sizeof (BTREE_NODE));
if (new_node == NULL)
{
goto exit_on_error;
}
new_node->pageid = *pageid;
new_node->next = NULL;
if (*list == NULL)
{
*list = new_node;
}
else
{
next_node = *list;
while (next_node->next != NULL)
{
next_node = next_node->next;
}
next_node->next = new_node;
}
return ret;
exit_on_error:
return (ret == NO_ERROR && (ret = er_errid ()) == NO_ERROR) ? ER_FAILED : ret;
}
/*
* list_remove_first () -
* return: nothing
* list(in):
*
* Note: This function removes the first node of the given list (if it has one).
*/
static void
list_remove_first (BTREE_NODE ** list)
{
BTREE_NODE *temp;
if (*list != NULL)
{
temp = *list;
*list = (*list)->next;
os_free_and_init (temp);
}
}
/*
* list_clear () -
* return: nothing
* list(in):
*/
static void
list_clear (BTREE_NODE * list)
{
BTREE_NODE *p, *next;
for (p = list; p != NULL; p = next)
{
next = p->next;
os_free_and_init (p);
}
}
/*
* list_length () -
* return: int
* this_list(in): which list
*
* Note: This function returns the number of elements kept in the
* given linked list.
*/
static int
list_length (const BTREE_NODE * this_list)
{
int length = 0;
while (this_list != NULL)
{
length++;
this_list = this_list->next;
}
return length;
}
#if defined(CUBRID_DEBUG)
/*
* list_print () -
* return: this_list
* this_list(in): which list to print
*
* Note: This function prints the elements of the given linked list;
* It is used for debugging purposes.
*/
static void
list_print (const BTREE_NODE * this_list)
{
while (this_list != NULL)
{
(void) printf ("{%d, %d}n", this_list->pageid.volid, this_list->pageid.pageid);
this_list = this_list->next;
}
}
/*
* btree_load_foo_debug () -
* return:
*
* Note: To avoid warning during development
*/
void
btree_load_foo_debug (void)
{
(void) btree_dump_sort_output (NULL, NULL);
list_print (NULL);
}
#endif /* CUBRID_DEBUG */
/*
* btree_rv_nodehdr_dump () - Dump node header recovery information
* return: int
* length(in): Length of Recovery Data
* data(in): The data being logged
*
* Note: Dump node header recovery information
*/
void
btree_rv_nodehdr_dump (FILE * fp, int length, void *data)
{
BTREE_NODE_HEADER *header = NULL;
header = (BTREE_NODE_HEADER *) data;
assert (header != NULL);
fprintf (fp, "\nNODE_TYPE: %s MAX_KEY_LEN: %4d PREV_PAGEID: {%4d , %4d} NEXT_PAGEID: {%4d , %4d} \n\n",
header->node_level > 1 ? "NON_LEAF" : "LEAF", header->max_key_len, header->prev_vpid.volid,
header->prev_vpid.pageid, header->next_vpid.volid, header->next_vpid.pageid);
}
/*
* btree_node_number_of_keys () -
* return: int
*
*/
int
btree_node_number_of_keys (THREAD_ENTRY * thread_p, PAGE_PTR page_ptr)
{
int key_cnt;
assert (page_ptr != NULL);
#if !defined(NDEBUG)
(void) pgbuf_check_page_ptype (thread_p, page_ptr, PAGE_BTREE);
#endif
key_cnt = spage_number_of_records (page_ptr) - 1;
#if !defined(NDEBUG)
{
BTREE_NODE_HEADER *header = NULL;
BTREE_NODE_TYPE node_type;
header = btree_get_node_header (thread_p, page_ptr);
if (header == NULL)
{
assert (false);
}
node_type = (header->node_level > 1) ? BTREE_NON_LEAF_NODE : BTREE_LEAF_NODE;
if ((node_type == BTREE_NON_LEAF_NODE && key_cnt <= 0) || (node_type == BTREE_LEAF_NODE && key_cnt < 0))
{
er_log_debug (ARG_FILE_LINE, "btree_node_number_of_keys: node key count underflow: %d\n", key_cnt);
assert (false);
}
}
#endif
assert_release (key_cnt >= 0);
return key_cnt;
}
/*
* btree_load_check_fk () - Checks if the current foreign key that needs to be loaded is passing all the requirements.
*
* return: NO_ERROR or error code.
* load_args(in): Context for the loaded index (Includes leaf level of the foreign key)
* sort_args(in): Context for sorting (Includes info on primary key)
*
*/
int
btree_load_check_fk (THREAD_ENTRY * thread_p, const LOAD_ARGS * load_args, const SORT_ARGS * sort_args)
{
DB_VALUE fk_key, pk_key;
bool clear_fk_key, clear_pk_key;
int fk_node_key_cnt = -1, pk_node_key_cnt = -1;
BTREE_NODE_HEADER *fk_node_header = NULL, *pk_node_header = NULL;
VPID vpid;
int ret = NO_ERROR, i;
PAGE_PTR curr_fk_pageptr = NULL, old_page = NULL;
INDX_SCAN_ID pk_isid;
BTREE_SCAN pk_bt_scan;
INT16 fk_slot_id = -1;
bool found = false, pk_has_slot_visible = false, fk_has_visible = false;
char *val_print = NULL;
bool is_fk_scan_desc = false;
MVCC_SNAPSHOT mvcc_snapshot_dirty;
int lock_ret = LK_GRANTED;
DB_VALUE_COMPARE_RESULT compare_ret;
OR_CLASSREP *classrepr = NULL;
int classrepr_cacheindex = -1, part_count = -1, pos = -1;
bool clear_pcontext = false;
PRUNING_CONTEXT pcontext;
BTID pk_btid;
OID pk_clsoid;
HFID pk_dummy_hfid;
BTREE_SCAN_PART *partitions = NULL;
bool has_nulls = false;
bool has_deduplicate_key_col = false;
DB_VALUE new_fk_key[2];
DB_VALUE *fk_key_ptr = &fk_key;
db_make_null (&(new_fk_key[0]));
db_make_null (&(new_fk_key[1]));
if (sort_args->n_attrs > 1)
{
// We cannot make a PK with a function. Therefore, only the last member is checked.
has_deduplicate_key_col = IS_DEDUPLICATE_KEY_ATTR_ID (sort_args->attr_ids[sort_args->n_attrs - 1]);
if (has_deduplicate_key_col)
{
fk_key_ptr = &(new_fk_key[0]);
}
}
btree_init_temp_key_value (&clear_fk_key, &fk_key);
btree_init_temp_key_value (&clear_pk_key, &pk_key);
mvcc_snapshot_dirty.snapshot_fnc = mvcc_satisfies_dirty;
/* Initialize index scan on primary key btid. */
scan_init_index_scan (&pk_isid, NULL, NULL);
BTREE_INIT_SCAN (&pk_bt_scan);
/* Lock the primary key class. */
lock_ret = lock_object (thread_p, sort_args->fk_refcls_oid, oid_Root_class_oid, SIX_LOCK, LK_UNCOND_LOCK);
if (lock_ret != LK_GRANTED)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
/* Get class info for the primary key */
classrepr = heap_classrepr_get (thread_p, sort_args->fk_refcls_oid, NULL, NULL_REPRID, &classrepr_cacheindex);
if (classrepr == NULL)
{
ret = ER_FK_INVALID;
goto end;
}
/* Primary key index search prepare */
ret = btree_prepare_bts (thread_p, &pk_bt_scan, sort_args->fk_refcls_pk_btid, &pk_isid, NULL, NULL, NULL, NULL,
NULL, false, NULL);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto end;
}
/* Set the order. */
is_fk_scan_desc = (sort_args->key_type->is_desc != pk_bt_scan.btid_int.key_type->is_desc);
/* Get the corresponding leaf of the foreign key. */
if (!is_fk_scan_desc)
{
/* Get first leaf vpid. */
vpid = load_args->vpid_first_leaf;
}
else
{
/* Get the last leaf. Current leaf is the last one. */
vpid = load_args->leaf.vpid;
}
/* Init slot id */
pk_bt_scan.slot_id = 0;
/* Check if there are any partitions on the primary key. */
if (classrepr->has_partition_info > 0)
{
(void) partition_init_pruning_context (&pcontext);
clear_pcontext = true;
ret = partition_load_pruning_context (thread_p, sort_args->fk_refcls_oid, DB_PARTITIONED_CLASS, &pcontext);
if (ret != NO_ERROR)
{
goto end;
}
/* Get number of partitions. */
part_count = pcontext.count - 1; /* exclude partitioned table */
assert (part_count <= MAX_PARTITIONS);
partitions = (BTREE_SCAN_PART *) malloc (part_count * sizeof (BTREE_SCAN_PART));
if (partitions == NULL)
{
ASSERT_ERROR_AND_SET (ret);
goto end;
}
/* Init context of each partition using the root context. */
for (i = 0; i < part_count; i++)
{
memcpy (&partitions[i].pcontext, &pcontext, sizeof (PRUNING_CONTEXT));
memcpy (&partitions[i].bt_scan, &pk_bt_scan, sizeof (BTREE_SCAN));
partitions[i].bt_scan.btid_int.sys_btid = &partitions[i].btid;
BTID_SET_NULL (&partitions[i].btid);
partitions[i].header = NULL;
partitions[i].key_cnt = -1;
}
}
while (true)
{
ret = btree_advance_to_next_slot_and_fix_page (thread_p, sort_args->btid, &vpid, &curr_fk_pageptr, &fk_slot_id,
&fk_key, &clear_fk_key, is_fk_scan_desc, &fk_node_key_cnt,
&fk_node_header, &mvcc_snapshot_dirty);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
break;
}
has_nulls = false;
if (curr_fk_pageptr == NULL)
{
/* Search has ended. */
break;
}
if (DB_IS_NULL (&fk_key))
{
/* Only way to get this is by having no visible objects in the foreign key. */
/* Must be checked!! */
break;
}
/* Check for multi col nulls. */
if (sort_args->n_attrs > 1)
{
has_nulls = btree_multicol_key_has_null (&fk_key);
}
else
{
/* TODO: unreachable case */
has_nulls = DB_IS_NULL (&fk_key);
}
if (has_nulls)
{
/* ANSI SQL says
*
* The choices for <match type> are MATCH SIMPLE, MATCH PARTIAL, and MATCH FULL; MATCH SIMPLE is the default.
* There is no semantic difference between these choices if there is only one referencing column (and, hence,
* only one referenced column). There is also no semantic difference if all referencing columns are not
* nullable. If there is more than one referencing column, at least one of which is nullable, and
* if no <referencing period specification> is specified, then the various <match type>s have the following
* semantics:
*
* MATCH SIMPLE: if at least one referencing column is null, then the row of the referencing table passes
* the constraint check. If all referencing columns are not null, then the row passes the constraint check
* if and only if there is a row of the referenced table that matches all the referencing columns.
* MATCH PARTIAL: if all referencing columns are null, then the row of the referencing table passes
* the constraint check. If at least one referencing columns is not null, then the row passes the constraint
* check if and only if there is a row of the referenced table that matches all the non-null referencing
* columns.
* MATCH FULL: if all referencing columns are null, then the row of the referencing table passes
* the constraint check. If all referencing columns are not null, then the row passes the constraint check
* if and only if there is a row of the referenced table that matches all the referencing columns. If some
* referencing column is null and another referencing column is non-null, then the row of the referencing
* table violates the constraint check.
*
* In short, we don't provide options for <match type> and our behavior is <MATCH SIMPLE> which is
* the default behavior of ANSI SQL and the other (commercial) products.
*/
/* Skip current key. */
continue;
}
if (has_deduplicate_key_col)
{
assert (!DB_IS_NULL (&fk_key));
assert (DB_VALUE_DOMAIN_TYPE (&fk_key) == DB_TYPE_MIDXKEY);
DB_VALUE *new_ptr = (fk_key_ptr == &(new_fk_key[0])) ? &(new_fk_key[1]) : &(new_fk_key[0]);
pr_clear_value (new_ptr);
ret = btree_remake_reference_key_with_FK (thread_p, pk_bt_scan.btid_int.key_type, &fk_key, new_ptr);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
break;
}
if (btree_compare_key (fk_key_ptr, new_ptr, pk_bt_scan.btid_int.key_type, 1, 1, NULL) == DB_EQ)
{ /* Remove the added deduplicate_key_attr and it can be the same key. */
continue;
}
fk_key_ptr = new_ptr;
}
/* We got the value from the foreign key, now search through the primary key index. */
found = false;
if (partitions)
{
COPY_OID (&pk_clsoid, sort_args->fk_refcls_oid);
BTID_COPY (&pk_btid, sort_args->fk_refcls_pk_btid);
/* Get the correct oid, btid and partition of the key we are looking for. */
ret = partition_prune_partition_index (&pcontext, fk_key_ptr, &pk_clsoid, &pk_btid, &pos);
if (ret != NO_ERROR)
{
break;
}
if (BTID_IS_NULL (&partitions[pos].btid))
{
/* No need to lock individual partitions here, since the partitioned table is already locked */
ret = partition_prune_unique_btid (&pcontext, fk_key_ptr, &pk_clsoid, &pk_dummy_hfid, &pk_btid);
if (ret != NO_ERROR)
{
break;
}
/* Update the partition BTID. */
BTID_COPY (&partitions[pos].btid, &pk_btid);
}
/* Save the old page, if any. */
if (pk_bt_scan.C_page != NULL)
{
old_page = pk_bt_scan.C_page;
}
/* Update references. */
pk_bt_scan = partitions[pos].bt_scan;
pk_node_key_cnt = partitions[pos].key_cnt;
pk_node_header = partitions[pos].header;
}
/* Search through the primary key index. */
if (pk_bt_scan.C_page == NULL)
{
/* No search has been initiated yet, we start from root. */
ret = btree_locate_key (thread_p, &pk_bt_scan.btid_int, fk_key_ptr, &pk_bt_scan.C_vpid, &pk_bt_scan.slot_id,
&pk_bt_scan.C_page, &found);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
break;
}
else if (!found)
{
/* Value was not found at all, it means the foreign key is invalid. */
val_print = pr_valstring (fk_key_ptr);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FK_INVALID, 2, sort_args->fk_name,
(val_print ? val_print : "unknown value"));
ret = ER_FK_INVALID;
db_private_free (thread_p, val_print);
break;
}
else
{
/* First unfix the old page if applicable. The new page was fixed in btree_locate_key. */
pgbuf_unfix_and_init_after_check (thread_p, old_page);
/* Make sure there is at least one visible object. */
ret = btree_is_slot_visible (thread_p, &pk_bt_scan.btid_int, pk_bt_scan.C_page, &mvcc_snapshot_dirty,
pk_bt_scan.slot_id, &pk_has_slot_visible);
if (ret != NO_ERROR)
{
break;
}
if (!pk_has_slot_visible)
{
/* No visible object in current page, but the key was located here. Should not happen often. */
val_print = pr_valstring (fk_key_ptr);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FK_INVALID, 2, sort_args->fk_name,
(val_print ? val_print : "unknown value"));
ret = ER_FK_INVALID;
db_private_free (thread_p, val_print);
break;
}
assert (pk_bt_scan.C_page != NULL);
}
/* Unfix old page, if any. */
pgbuf_unfix_and_init_after_check (thread_p, old_page);
}
else
{
/* We try to resume the search in the current leaf. */
while (true)
{
ret = btree_advance_to_next_slot_and_fix_page (thread_p, &pk_bt_scan.btid_int, &pk_bt_scan.C_vpid,
&pk_bt_scan.C_page, &pk_bt_scan.slot_id, &pk_key,
&clear_pk_key, false, &pk_node_key_cnt, &pk_node_header,
&mvcc_snapshot_dirty);
if (ret != NO_ERROR)
{
goto end;
}
if (pk_bt_scan.C_page == NULL)
{
/* The primary key has ended, but the value from foreign key was not found. */
/* Foreign key is invalid. Set error. */
val_print = pr_valstring (fk_key_ptr);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FK_INVALID, 2, sort_args->fk_name,
(val_print ? val_print : "unknown value"));
ret = ER_FK_INVALID;
db_private_free (thread_p, val_print);
goto end;
}
/* We need to compare the current value with the new value from the primary key. */
compare_ret = btree_compare_key (&pk_key, fk_key_ptr, pk_bt_scan.btid_int.key_type, 1, 1, NULL);
if (compare_ret == DB_EQ)
{
/* Found value, stop searching in pk. */
break;
}
else if (compare_ret == DB_LT)
{
/* No match yet. Advance in pk. */
continue;
}
else
{
/* Fk is invalid. Set error. */
val_print = pr_valstring (fk_key_ptr);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FK_INVALID, 2, sort_args->fk_name,
(val_print ? val_print : "unknown value"));
ret = ER_FK_INVALID;
db_private_free (thread_p, val_print);
goto end;
}
}
if (pk_bt_scan.slot_id > pk_node_key_cnt)
{
old_page = pk_bt_scan.C_page;
pk_bt_scan.C_page = NULL;
}
}
if (partitions)
{
/* Update references. */
partitions[pos].key_cnt = pk_node_key_cnt;
partitions[pos].header = pk_node_header;
}
if (found == true)
{
btree_clear_key_value (&clear_fk_key, &fk_key);
}
btree_clear_key_value (&clear_pk_key, &pk_key);
}
end:
if (partitions)
{
for (i = 0; i < part_count; i++)
{
pgbuf_unfix_and_init_after_check (thread_p, partitions[i].bt_scan.C_page);
}
free (partitions);
}
pgbuf_unfix_and_init_after_check (thread_p, old_page);
pgbuf_unfix_and_init_after_check (thread_p, curr_fk_pageptr);
pgbuf_unfix_and_init_after_check (thread_p, pk_bt_scan.C_page);
btree_clear_key_value (&clear_fk_key, &fk_key);
btree_clear_key_value (&clear_pk_key, &pk_key);
pr_clear_value (&(new_fk_key[0]));
pr_clear_value (&(new_fk_key[1]));
if (clear_pcontext == true)
{
partition_clear_pruning_context (&pcontext);
}
if (classrepr != NULL)
{
heap_classrepr_free_and_init (classrepr, &classrepr_cacheindex);
}
return ret;
}
/*
* btree_get_value_from_leaf_slot () -
*
* return: NO_ERROR or error code.
* btid_int(in): The structure of the B-tree where the leaf resides.
* leaf_ptr(in): The leaf where the value needs to be extracted from.
* slot_id(in): The slot from where the value must be pulled.
* key(out): The value requested.
* clear_key(out): needs to clear key if set
*
*/
static int
btree_get_value_from_leaf_slot (THREAD_ENTRY * thread_p, BTID_INT * btid_int, PAGE_PTR leaf_ptr, int slot_id,
DB_VALUE * key, bool * clear_key)
{
LEAF_REC leaf;
int first_key_offset = 0;
RECDES record;
int ret = NO_ERROR;
if (spage_get_record (thread_p, leaf_ptr, slot_id, &record, PEEK) != S_SUCCESS)
{
assert_release (false);
ret = ER_FAILED;
return ret;
}
ret = btree_read_record (thread_p, btid_int, leaf_ptr, &record, key, &leaf, BTREE_LEAF_NODE, clear_key,
&first_key_offset, PEEK_KEY_VALUE, NULL);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
return ret;
}
return ret;
}
/*
* btree_advance_to_next_slot_and_fix_page () -
*
* return: NO_ERROR or error code
* btid(in): B-tree structure
* vpid(in/out): VPID of the current page
* pg_ptr(in/out): Page pointer for the current page.
* slot_id(in/out): Slot id of the current/next value.
* key(out): Requested key.
* clear_key(out): needs to clear key if set.
* key_cnt(in/out): Number of keys in current page.
* header(in/out): The header of the current page.
* mvcc(in): Needed for visibility check.
*
* Note:
* This function will advance to a next page without using conditional latches.
* Therefore, if this is used for a descending scan, it might not work properly
* unless concurrency is guaranteed.
*/
static int
btree_advance_to_next_slot_and_fix_page (THREAD_ENTRY * thread_p, BTID_INT * btid, VPID * vpid, PAGE_PTR * pg_ptr,
INT16 * slot_id, DB_VALUE * key, bool * clear_key, bool is_desc, int *key_cnt,
BTREE_NODE_HEADER ** header, MVCC_SNAPSHOT * mvcc)
{
int ret = NO_ERROR;
VPID next_vpid;
PAGE_PTR page = *pg_ptr;
BTREE_NODE_HEADER *local_header = *header;
bool is_slot_visible = false;
PAGE_PTR old_page = NULL;
/* Clear current key, if any. */
if (!DB_IS_NULL (key))
{
pr_clear_value (key);
}
if (page == NULL)
{
page = pgbuf_fix (thread_p, vpid, OLD_PAGE, PGBUF_LATCH_READ, PGBUF_UNCONDITIONAL_LATCH);
if (page == NULL)
{
ASSERT_ERROR_AND_SET (ret);
return ret;
}
*slot_id = -1;
}
assert (page != NULL);
/* Check page. */
(void) pgbuf_check_page_ptype (thread_p, page, PAGE_BTREE);
if (local_header == NULL)
{
/* Get the header of the page. */
local_header = btree_get_node_header (thread_p, page);
}
if (*key_cnt == -1)
{
/* Get number of keys in page. */
*key_cnt = btree_node_number_of_keys (thread_p, page);
}
/* If it is the first search. */
if (*slot_id == -1)
{
*slot_id = is_desc ? (*key_cnt + 1) : 0;
}
/* Advance to next key. */
int incr_decr_val = (is_desc ? -1 : 1);
while (true)
{
*slot_id += incr_decr_val;
assert (0 <= *slot_id);
if (*slot_id == 0 || *slot_id >= *key_cnt + 1)
{
next_vpid = is_desc ? local_header->prev_vpid : local_header->next_vpid;
if (VPID_ISNULL (&next_vpid))
{
/* No next page. unfix current one. */
pgbuf_unfix_and_init (thread_p, page);
break;
}
else
{
old_page = page;
page = pgbuf_fix (thread_p, &next_vpid, OLD_PAGE, PGBUF_LATCH_READ, PGBUF_UNCONDITIONAL_LATCH);
if (page == NULL)
{
ASSERT_ERROR_AND_SET (ret);
return ret;
}
/* unfix old page */
pgbuf_unfix_and_init (thread_p, old_page);
*slot_id = is_desc ? *key_cnt : 1;
/* Get the new header. */
local_header = btree_get_node_header (thread_p, page);
/* Get number of keys in new page. */
*key_cnt = btree_node_number_of_keys (thread_p, page);
}
}
if (mvcc != NULL)
{
ret = btree_is_slot_visible (thread_p, btid, page, mvcc, *slot_id, &is_slot_visible);
if (ret != NO_ERROR)
{
return ret;
}
if (!is_slot_visible)
{
continue;
}
}
/* The slot is visible. Fall through and get the key. */
break;
}
if (page != NULL)
{
ret = btree_get_value_from_leaf_slot (thread_p, btid, page, *slot_id, key, clear_key);
}
*header = local_header;
*pg_ptr = page;
return ret;
}
/*
* btree_is_slot_visible(): States if current slot is visible or not.
*
* thread_p(in): Thread entry.
* btid(in): B-tree info.
* pg_ptr(in): Page pointer.
* mvcc_snapshot(in): The MVCC snapshot.
* slot_id(in) : Slot id to be looked for.
* is_visible(out): True or False
*
* return: error code if any error occurs.
*/
static int
btree_is_slot_visible (THREAD_ENTRY * thread_p, BTID_INT * btid, PAGE_PTR pg_ptr, MVCC_SNAPSHOT * mvcc_snapshot,
int slot_id, bool * is_visible)
{
RECDES record;
LEAF_REC leaf;
int num_visible = 0;
int key_offset = 0;
int ret = NO_ERROR;
bool dummy_clear_key;
*is_visible = false;
if (mvcc_snapshot == NULL)
{
/* Early out. */
*is_visible = true;
return ret;
}
/* Get the record. */
if (spage_get_record (thread_p, pg_ptr, slot_id, &record, PEEK) != S_SUCCESS)
{
assert_release (false);
ret = ER_FAILED;
return ret;
}
/* Read the record. - no need of actual key value */
ret = btree_read_record (thread_p, btid, pg_ptr, &record, NULL, &leaf, BTREE_LEAF_NODE, &dummy_clear_key,
&key_offset, PEEK_KEY_VALUE, NULL);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
return ret;
}
/* Get the number of visible items. */
ret = btree_get_num_visible_from_leaf_and_ovf (thread_p, btid, &record, key_offset, &leaf, NULL, mvcc_snapshot,
&num_visible);
if (ret != NO_ERROR)
{
return ret;
}
if (num_visible > 0)
{
*is_visible = true;
}
return ret;
}
BTID *
xbtree_load_online_index (THREAD_ENTRY * thread_p, BTID * btid, const char *bt_name, TP_DOMAIN * key_type,
OID * class_oids, int n_classes, int n_attrs, int *attr_ids, int *attrs_prefix_length,
HFID * hfids, int unique_pk, int not_null_flag, OID * fk_refcls_oid, BTID * fk_refcls_pk_btid,
const char *fk_name, char *pred_stream, int pred_stream_size, char *func_pred_stream,
int func_pred_stream_size, int func_col_id, int func_attr_index_start, int ib_thread_count)
{
int cur_class;
BTID_INT btid_int;
PRED_EXPR_WITH_CONTEXT *filter_pred = NULL;
FUNCTION_INDEX_INFO func_index_info;
DB_TYPE single_node_type = DB_TYPE_NULL;
XASL_UNPACK_INFO *func_unpack_info = NULL;
bool is_sysop_started = false;
MVCC_SNAPSHOT *builder_snapshot = NULL;
HEAP_SCANCACHE scan_cache;
HEAP_CACHE_ATTRINFO attr_info;
int ret = NO_ERROR;
LOCK old_lock = SCH_M_LOCK;
LOCK new_lock = IX_LOCK;
LOG_TDES *tdes;
int lock_ret;
BTID *list_btid = NULL;
int old_wait_msec;
bool old_check_intr;
SORT_ARGS tmp_args;
memset (&tmp_args, 0x00, sizeof (SORT_ARGS));
tmp_args.n_attrs = n_attrs;
tmp_args.class_ids = class_oids;
tmp_args.attr_ids = attr_ids;
tmp_args.hfids = hfids;
tmp_args.func_index_info = &func_index_info;
func_index_info.expr = NULL;
/* Check for robustness */
if (!btid || !hfids || !class_oids || !attr_ids || !key_type)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BTREE_LOAD_FAILED, 0);
return NULL;
}
btid_int.sys_btid = btid;
btid_int.unique_pk = unique_pk;
#if !defined(NDEBUG)
if (unique_pk)
{
assert (BTREE_IS_UNIQUE (btid_int.unique_pk));
assert (BTREE_IS_PRIMARY_KEY (btid_int.unique_pk) || !BTREE_IS_PRIMARY_KEY (btid_int.unique_pk));
}
#endif
btid_int.key_type = key_type;
VFID_SET_NULL (&btid_int.ovfid);
btid_int.rev_level = BTREE_CURRENT_REV_LEVEL;
btid_int.deduplicate_key_idx = dk_get_deduplicate_key_position (n_attrs, attr_ids, func_attr_index_start);
COPY_OID (&btid_int.topclass_oid, &class_oids[0]);
/*
* for btree_range_search, part_key_desc is re-set at btree_initialize_bts
*/
btid_int.part_key_desc = 0;
/* init index key copy_buf info */
btid_int.copy_buf = NULL;
btid_int.copy_buf_len = 0;
btid_int.nonleaf_key_type = btree_generate_prefix_domain (&btid_int);
/* After building index acquire lock on table, the transaction has deadlock priority */
tdes = LOG_FIND_CURRENT_TDES (thread_p);
if (tdes)
{
tdes->has_deadlock_priority = true;
}
/* Acquire snapshot!! */
builder_snapshot = logtb_get_mvcc_snapshot (thread_p);
if (builder_snapshot == NULL)
{
goto error;
}
/* Alloc memory for btid list for unique indexes. */
if (BTREE_IS_UNIQUE (unique_pk))
{
list_btid = (BTID *) malloc (n_classes * sizeof (BTID));
if (list_btid == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, n_classes * sizeof (BTID));
ret = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
}
/* Demote the locks for classes on which we want to load indices. */
for (cur_class = 0; cur_class < n_classes; cur_class++)
{
ret = lock_demote_class_lock (thread_p, &class_oids[cur_class], new_lock, &old_lock);
if (ret != NO_ERROR)
{
goto error;
}
}
for (cur_class = 0; cur_class < n_classes; cur_class++)
{
/* Reinitialize filter and function for each class, if is the case. This is needed in order to bring the regu
* variables in same state like initial state. Clearing XASL does not bring the regu variables in initial state.
* A issue is clearing the cache (see cache_dbvalp and cache_attrinfo).
* We may have the option to try to correct clearing XASL (to have initial state) or destroy it and reinitalize it.
* For now, we choose reinitialization, since is much simply, and is used also in non-online case.
*/
if (pred_stream && pred_stream_size > 0)
{
if (stx_map_stream_to_filter_pred (thread_p, &filter_pred, pred_stream, pred_stream_size) != NO_ERROR)
{
goto error;
}
}
tmp_args.filter = filter_pred;
if (func_pred_stream && func_pred_stream_size > 0)
{
func_index_info.expr_stream = func_pred_stream;
func_index_info.expr_stream_size = func_pred_stream_size;
func_index_info.col_id = func_col_id;
func_index_info.attr_index_start = func_attr_index_start;
func_index_info.expr = NULL;
if (stx_map_stream_to_func_pred (thread_p, &func_index_info.expr, func_pred_stream,
func_pred_stream_size, &func_unpack_info))
{
goto error;
}
}
/* Start scancache */
tmp_args.cur_class = cur_class;
ret = bt_load_heap_scancache_start_for_attrinfo (thread_p, &tmp_args, &scan_cache, &attr_info, true);
if (ret != NO_ERROR)
{
goto error;
}
/* Assign the snapshot to the scan_cache. */
scan_cache.mvcc_snapshot = builder_snapshot;
ret = heap_get_btid_from_index_name (thread_p, &class_oids[cur_class], bt_name, btid_int.sys_btid);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
break;
}
/* For unique indices add to the list of btids for unique constraint checks. */
if (BTREE_IS_UNIQUE (unique_pk))
{
list_btid[cur_class].root_pageid = btid_int.sys_btid->root_pageid;
list_btid[cur_class].vfid.fileid = btid_int.sys_btid->vfid.fileid;
list_btid[cur_class].vfid.volid = btid_int.sys_btid->vfid.volid;
}
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE, "DEBUG_BTREE: load start on class(%d, %d, %d), btid(%d, (%d, %d)).",
OID_AS_ARGS (&class_oids[cur_class]), BTID_AS_ARGS (btid_int.sys_btid));
}
/* Start the online index builder. */
ret = online_index_builder (thread_p, &btid_int, &hfids[cur_class], &class_oids[cur_class], n_classes, attr_ids,
n_attrs, func_index_info, filter_pred, attrs_prefix_length, &attr_info, &scan_cache,
unique_pk, ib_thread_count, key_type);
if (ret != NO_ERROR)
{
break;
}
bt_load_heap_scancache_end_for_attrinfo (thread_p, &tmp_args, &scan_cache, &attr_info);
bt_load_clear_pred_and_unpack (thread_p, &tmp_args, func_unpack_info);
tmp_args.filter = filter_pred = NULL;
}
// We should recover the lock regardless of return code from online_index_builder.
// Otherwise, we might be doomed to failure to abort the transaction.
// We are going to do best to avoid lock promotion errors such as timeout and deadlocked.
// never give up
old_wait_msec = xlogtb_reset_wait_msecs (thread_p, LK_INFINITE_WAIT);
old_check_intr = logtb_set_check_interrupt (thread_p, false);
for (cur_class = 0; cur_class < n_classes; cur_class++)
{
/* Promote the lock to SCH_M_LOCK */
/* we need to do this in a loop to retry in case of interruption */
while (true)
{
lock_ret = lock_object (thread_p, &class_oids[cur_class], oid_Root_class_oid, SCH_M_LOCK, LK_UNCOND_LOCK);
if (lock_ret == LK_GRANTED)
{
break;
}
#if defined (SERVER_MODE)
else if (lock_ret == LK_NOTGRANTED_DUE_ERROR)
{
if (er_errid () == ER_INTERRUPTED)
{
// interruptions cannot be allowed here; lock must be promoted to either commit or rollback changes
er_clear ();
// make sure the transaction interrupt flag is cleared
logtb_set_tran_index_interrupt (thread_p, thread_p->tran_index, false);
// and retry
continue;
}
}
else if (lock_ret == LK_NOTGRANTED_DUE_TIMEOUT && css_is_shutdowning_server ())
{
// server shutdown forced timeout; but consistency requires that we get the lock upgrade no matter what
er_clear ();
continue;
}
#endif // SERVER_MODE
// it is neither expected nor acceptable.
assert (0);
}
}
// reset back
(void) xlogtb_reset_wait_msecs (thread_p, old_wait_msec);
(void) logtb_set_check_interrupt (thread_p, old_check_intr);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
goto error;
}
if (BTREE_IS_UNIQUE (unique_pk))
{
for (cur_class = 0; cur_class < n_classes; cur_class++)
{
/* Check if we have a unique constraint violation for unique indexes. */
ret =
btree_online_index_check_unique_constraint (thread_p, &list_btid[cur_class], bt_name,
&class_oids[cur_class]);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
btid = NULL;
goto error;
}
}
}
assert (tmp_args.scancache_inited == false && tmp_args.attrinfo_inited == false);
if (list_btid != NULL)
{
free (list_btid);
list_btid = NULL;
}
logpb_force_flush_pages (thread_p);
/* TODO: Is this all right? */
/* Invalidate snapshot. */
if (builder_snapshot != NULL)
{
logtb_invalidate_snapshot_data (thread_p);
}
return btid;
error:
bt_load_heap_scancache_end_for_attrinfo (thread_p, &tmp_args, &scan_cache, &attr_info);
bt_load_clear_pred_and_unpack (thread_p, &tmp_args, func_unpack_info);
if (list_btid != NULL)
{
free (list_btid);
}
/* Invalidate snapshot. */
if (builder_snapshot != NULL)
{
logtb_invalidate_snapshot_data (thread_p);
}
return NULL;
}
// *INDENT-OFF*
// Refer to src/parser/csql_grammar.y:19872
REGISTER_WORKERPOOL (online_index, []() { return 16; });
static int
online_index_builder (THREAD_ENTRY * thread_p, BTID_INT * btid_int, HFID * hfids, OID * class_oids, int n_classes,
int *attrids, int n_attrs, FUNCTION_INDEX_INFO func_idx_info,
PRED_EXPR_WITH_CONTEXT * filter_pred, int *attrs_prefix_length, HEAP_CACHE_ATTRINFO * attr_info,
HEAP_SCANCACHE * scancache, int unique_pk, int ib_thread_count, TP_DOMAIN * key_type)
{
int ret = NO_ERROR, eval_res;
OID cur_oid;
RECDES cur_record;
int cur_class;
SCAN_CODE sc;
FUNCTION_INDEX_INFO *p_func_idx_info;
PR_EVAL_FNC filter_eval_fnc;
DB_TYPE single_node_type = DB_TYPE_NULL;
int attr_offset;
DB_VALUE *p_dbvalue;
int *p_prefix_length;
uint64_t tasks_started = 0;
char midxkey_buf[DBVAL_BUFSIZE + MAX_ALIGNMENT], *aligned_midxkey_buf;
index_builder_loader_context load_context;
bool is_parallel = ib_thread_count > 0;
std::atomic<int> num_keys = {0}, num_oids = {0}, num_nulls = {0};
std::unique_ptr<index_builder_loader_task> load_task = NULL;
assert (ib_thread_count <= 16);
// a worker pool is built only of loading is done in parallel
cubthread::worker_pool_type *ib_workpool =
is_parallel ? thread_create_worker_pool (ib_thread_count, 1, "online-index", load_context) : NULL;
// m_log = btree_is_worker_pool_logging_true ()
aligned_midxkey_buf = PTR_ALIGN (midxkey_buf, MAX_ALIGNMENT);
p_func_idx_info = func_idx_info.expr ? &func_idx_info : NULL;
filter_eval_fnc = (filter_pred != NULL) ? eval_fnc (thread_p, filter_pred->pred, &single_node_type) : NULL;
/* Get the first entry from heap. */
cur_class = 0;
OID_SET_NULL (&cur_oid);
cur_oid.volid = hfids[cur_class].vfid.volid;
/* Do not let the page fixed after an extract. */
scancache->cache_last_fix_page = false;
load_context.m_has_error = false;
load_context.m_error_code = NO_ERROR;
load_context.m_tasks_executed = 0UL;
load_context.m_key_type = key_type;
load_context.m_conn = thread_p->conn_entry;
PERF_UTIME_TRACKER time_online_index = PERF_UTIME_TRACKER_INITIALIZER;
PERF_UTIME_TRACKER_START (thread_p, &time_online_index);
p_prefix_length = (attrs_prefix_length) ? &(attrs_prefix_length[0]) : NULL;
/* Start extracting from heap. */
for (;;)
{
DB_VALUE dbvalue;
db_make_null (&dbvalue);
/* Scan from heap and insert into the index. */
attr_offset = cur_class * n_attrs;
cur_record.data = NULL;
sc = heap_next (thread_p, &hfids[cur_class], &class_oids[cur_class], &cur_oid, &cur_record, scancache, COPY);
if (sc != S_SUCCESS)
{
if (sc != S_END)
{
if (sc == S_ERROR)
{
ASSERT_ERROR_AND_SET (ret);
break;
}
assert (false);
}
break;
}
/* Make sure the scan was a success. */
assert (!OID_ISNULL (&cur_oid));
if (filter_pred)
{
ret = heap_attrinfo_read_dbvalues (thread_p, &cur_oid, &cur_record, filter_pred->cache_pred);
if (ret != NO_ERROR)
{
break;
}
eval_res = (*filter_eval_fnc) (thread_p, filter_pred->pred, NULL, &cur_oid);
if (eval_res == V_ERROR)
{
ret = ER_FAILED;
break;
}
else if (eval_res != V_TRUE)
{
continue;
}
}
/* Generate the key : provide key_type domain - needed for compares during sort */
p_dbvalue = heap_attrinfo_generate_key (thread_p, n_attrs, &attrids[attr_offset], p_prefix_length, attr_info,
&cur_record, &dbvalue, aligned_midxkey_buf, p_func_idx_info, key_type, &cur_oid);
if (p_dbvalue == NULL)
{
ret = ER_FAILED;
break;
}
/* Dispatch the insert operation */
if (load_task == NULL)
{
// create a new task
load_task.reset (new index_builder_loader_task (btid_int->sys_btid, &class_oids[cur_class], unique_pk,
load_context, num_keys, num_oids, num_nulls));
}
if (load_task->add_key (p_dbvalue, cur_oid) == index_builder_loader_task::BATCH_FULL)
{
// send task to worker pool for execution
thread_get_manager ()->push_task (ib_workpool, load_task.release ());
/* Increment tasks started. */
tasks_started++;
}
/* Clear index key. */
pr_clear_value (p_dbvalue);
/* Check for possible errors. */
if (load_context.m_has_error)
{
/* Also stop all threads. */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_IB_ERROR_ABORT, 0);
ret = load_context.m_error_code;
break;
}
}
/* Check if the worker pool is empty */
if (ret == NO_ERROR)
{
if (load_task != NULL && load_task->has_keys ())
{
// one last task
thread_get_manager ()->push_task (ib_workpool, load_task.release ());
/* Increment tasks started. */
tasks_started++;
}
do
{
bool dummy_continue_checking = true;
if (load_context.m_has_error != NO_ERROR)
{
/* Also stop all threads. */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_IB_ERROR_ABORT, 0);
ret = load_context.m_error_code;
break;
}
/* Wait for threads to finish. */
thread_sleep (10);
/* Check for interrupts. */
if (logtb_is_interrupted (thread_p, true, &dummy_continue_checking))
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTED, 0);
ret = ER_INTERRUPTED;
break;
}
}
while (load_context.m_tasks_executed != tasks_started);
}
PERF_UTIME_TRACKER_TIME (thread_p, &time_online_index, PSTAT_BT_ONLINE_LOAD);
thread_get_manager ()->destroy_worker_pool (ib_workpool);
if (BTREE_IS_UNIQUE (btid_int->unique_pk))
{
logtb_tran_update_btid_unique_stats (thread_p, btid_int->sys_btid, num_keys, num_oids, num_nulls);
}
return ret;
}
static bool
btree_is_worker_pool_logging_true ()
{
return cubthread::is_logging_configured (cubthread::LOG_WORKER_POOL_INDEX_BUILDER);
}
void
index_builder_loader_context::on_create (context_type &context)
{
context.claim_system_worker ();
context.conn_entry = m_conn;
}
void
index_builder_loader_context::on_retire (context_type &context)
{
context.retire_system_worker ();
context.conn_entry = NULL;
}
void
index_builder_loader_context::on_recycle (context_type &context)
{
context.tran_index = LOG_SYSTEM_TRAN_INDEX;
}
index_builder_loader_task::index_builder_loader_task (const BTID *btid, const OID *class_oid, int unique_pk,
index_builder_loader_context &load_context,
std::atomic<int> &num_keys, std::atomic<int> &num_oids,
std::atomic<int> &num_nulls)
: m_load_context (load_context)
, m_insert_list (load_context.m_key_type)
, m_num_keys (num_keys)
, m_num_oids (num_oids)
, m_num_nulls (num_nulls)
{
BTID_COPY (&m_btid, btid);
COPY_OID (&m_class_oid, class_oid);
m_unique_pk = unique_pk;
m_load_context.m_has_error = false;
m_memsize = 0;
}
index_builder_loader_task::~index_builder_loader_task ()
{
cubmem::switch_to_global_allocator_and_call ([this] { clear_keys (); });
}
index_builder_loader_task::batch_key_status
index_builder_loader_task::add_key (const DB_VALUE *key, const OID &oid)
{
if (DB_IS_NULL (key) || btree_multicol_key_is_null (const_cast<DB_VALUE *>(key)))
{
/* We do not store NULL keys, but we track them for unique indexes;
* for non-unique, just skip row */
if (BTREE_IS_UNIQUE (m_unique_pk))
{
++m_insert_list.m_ignored_nulls_cnt;
}
return BATCH_CONTINUE;
}
size_t entry_size = m_insert_list.add_key (key, oid);
m_memsize += entry_size;
return (m_memsize > (size_t) prm_get_bigint_value (PRM_ID_IB_TASK_MEMSIZE)) ? BATCH_FULL : BATCH_CONTINUE;
}
bool
index_builder_loader_task::has_keys () const
{
return !m_insert_list.m_keys_oids.empty ();
}
void
index_builder_loader_task::clear_keys ()
{
for (auto &key_oid : m_insert_list.m_keys_oids)
{
pr_clear_value (&key_oid.m_key);
}
}
void
index_builder_loader_task::execute (cubthread::entry &thread_ref)
{
int ret = NO_ERROR;
size_t key_count = 0;
LOG_TRAN_BTID_UNIQUE_STATS *p_unique_stats;
/* Check for possible errors set by the other threads. */
if (m_load_context.m_has_error)
{
return;
}
PERF_UTIME_TRACKER time_insert_task = PERF_UTIME_TRACKER_INITIALIZER;
PERF_UTIME_TRACKER_START (&thread_ref, &time_insert_task);
PERF_UTIME_TRACKER time_prepare_task = PERF_UTIME_TRACKER_INITIALIZER;
PERF_UTIME_TRACKER_START (&thread_ref, &time_prepare_task);
m_insert_list.prepare_list ();
PERF_UTIME_TRACKER_TIME (&thread_ref, &time_prepare_task, PSTAT_BT_ONLINE_PREPARE_TASK);
while (m_insert_list.m_curr_pos < (int) m_insert_list.m_sorted_keys_oids.size ())
{
ret = btree_online_index_list_dispatcher (&thread_ref, &m_btid, &m_class_oid, &m_insert_list,
m_unique_pk, BTREE_OP_ONLINE_INDEX_IB_INSERT, NULL);
if (ret != NO_ERROR)
{
if (!m_load_context.m_has_error.exchange (true))
{
m_load_context.m_error_code = ret;
// TODO: We need a mechanism to also copy the error message!!
}
break;
}
key_count++;
}
p_unique_stats = logtb_tran_find_btid_stats (&thread_ref, &m_btid, false);
if (p_unique_stats != NULL)
{
/* Cumulates and resets statistics */
m_num_keys += p_unique_stats->tran_stats.num_keys;
m_num_oids += p_unique_stats->tran_stats.num_oids + m_insert_list.m_ignored_nulls_cnt;
m_num_nulls += p_unique_stats->tran_stats.num_nulls + m_insert_list.m_ignored_nulls_cnt;
p_unique_stats->tran_stats.num_keys = 0;
p_unique_stats->tran_stats.num_oids = 0;
p_unique_stats->tran_stats.num_nulls = 0;
}
PERF_UTIME_TRACKER_TIME (&thread_ref, &time_insert_task, PSTAT_BT_ONLINE_INSERT_TASK);
/* Increment tasks executed. */
if (prm_get_bool_value (PRM_ID_LOG_BTREE_OPS))
{
_er_log_debug (ARG_FILE_LINE, "Finished task; loaded %zu keys\n", key_count);
}
m_load_context.m_tasks_executed++;
}
// *INDENT-ON*