File query_hash_join.c¶
File List > cubrid > src > query > query_hash_join.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* query_hash_join.c
*/
#include "query_hash_join.h"
#include "dbtype.h" /* db_make_null */
#include "error_manager.h" /* er_errid, NO_ERROR, assert_release_error */
#include "fetch.h" /* fetch_val_list */
#include "list_file.h" /* qfile_open_list, qfile_close_list */
#include "memory_alloc.h" /* CEIL_PTVDIV */
#include "object_representation.h" /* TP_DOMAIN */
#include "perf_monitor.h" /* perfmon_get_from_statistic, PSTAT_... */
#include "px_hash_join.hpp" /* parallel_query::hash_join::... */
#include "px_parallel.hpp" /* parallel_query::compute_parallel_degree */
#include "px_worker_manager.hpp" /* parallel_query::worker_manager */
#include "query_list.h" /* JOIN_TYPE */
#include "query_manager.h" /* QMGR_TEMP_FILE */
#include "system_parameter.h" /* prm_get_bigint_value, PRM_ID_... */
#include "thread_entry.hpp" /* THREAD_ENTRY */
#include "xasl.h" /* XASL_NODE, HASHJOIN_PROC_NODE */
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
/*
* Debug Macros
*/
#define PARTITION_FILL_FACTOR 0.8
#define DUMP_HASH_TABLE_LIMIT 100
#define DUMP_PROBE_LIMIT 20
/*
* Enum & Typedef Definitions
*/
typedef enum hashjoin_print_step
{
HASHJOIN_PRINT_NONE = 0,
HASHJOIN_PRINT_READ_KEY,
HASHJOIN_PRINT_NOT_MATCHED_KEY,
HASHJOIN_PRINT_NOT_QUALIFIED_KEY,
HASHJOIN_PRINT_QUALIFIED_KEY,
HASHJOIN_PRINT_FILL_EMPTY_KEY
} HASHJOIN_PRINT_STEP;
/*
* Macro Function Declarations
*/
#if HASHJOIN_DUMP_HASH_TABLE
#define HJOIN_DUMP_HASH_TABLE(thread_p, hash_scan_p, list_id_p) \
hjoin_dump_hash_table ((thread_p), (hash_scan_p), (list_id_p))
#else
#define HJOIN_DUMP_HASH_TABLE(thread_p, hash_scan_p, list_id_p) ((void) 0)
#endif /* HASHJOIN_DUMP_HASH_TABLE */
#if !defined(NDEBUG) && HASHJOIN_DUMP_PROBE
#define HJOIN_PRINT_TUPLE(list_scan_id, tuple, step) \
hjoin_print_tuple ((list_scan_id), (tuple), (step))
#else
#define HJOIN_PRINT_TUPLE(list_scan_id, tuple, step) ((void) 0)
#endif /* !NDEBUG && HASHJOIN_DUMP_PROBE */
/*
* Function Declarations
*/
/* Hash Join Execution */
static int hjoin_execute_partitions (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager);
static int hjoin_outer_fill_null_values (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_CONTEXT * context);
static int hjoin_execute_internal (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context);
/* Hash Join Manager */
static int hjoin_init_manager (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, XASL_NODE * xasl,
QUERY_ID query_id, VAL_DESCR * val_descr);
static void hjoin_clear_manager (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager);
/* Hash Join Domain Info */
static int hjoin_init_domain_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_DOMAIN_INFO * domain_info);
/* Hash Join Partitioning */
static HASHJOIN_STATUS hjoin_try_partition (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_CONTEXT * single_context);
static HASHJOIN_STATUS hjoin_check_partition (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_CONTEXT * single_context);
static int hjoin_prepare_partition (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_SPLIT_INFO * split_info);
static int hjoin_build_partitions (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_SPLIT_INFO * split_info);
static int hjoin_split_qlist (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_INPUT_SPLIT_INFO * split_info, QFILE_LIST_ID ** temp_part_list_id,
HASH_SCAN_KEY * temp_key);
/* Hash Join Parallel */
static HASHJOIN_STATUS hjoin_try_parallel (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_CONTEXT * single_context);
/* Hash Join Split Info */
static int hjoin_init_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_SPLIT_INFO * split_info);
static void hjoin_clear_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_SPLIT_INFO * split_info, bool clear_all);
/* Hash Join Context */
static int hjoin_init_context (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context);
static void hjoin_clear_context (THREAD_ENTRY * thread_p, HASHJOIN_CONTEXT * context);
static void hjoin_destroy_qlist (THREAD_ENTRY * thread_p, HASHJOIN_CONTEXT * context);
/* Hash List Scan */
static int hjoin_scan_init (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan, int key_cnt, QFILE_LIST_ID * list_id);
static void hjoin_scan_clear (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan);
/* Hash Join Processing */
static HASHJOIN_STATUS hjoin_check_empty_inputs (HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context);
/* Build Phase */
static int hjoin_build (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context);
static int hjoin_build_key (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan,
QFILE_LIST_SCAN_ID * list_scan_id, QFILE_TUPLE_RECORD * tuple_record);
/* Probe Phase */
static int hjoin_probe (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context,
QFILE_LIST_ID * list_id);
static int hjoin_outer_probe (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context,
QFILE_LIST_ID * list_id);
static int hjoin_probe_key (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan, QFILE_LIST_SCAN_ID * list_scan_id,
QFILE_TUPLE_RECORD * tuple_record);
/* Merge QFILE_LIST_ID */
static int hjoin_merge_tuple_to_list_id (THREAD_ENTRY * thread_p, QFILE_LIST_ID * list_id,
QFILE_TUPLE_RECORD * outer_record, QFILE_TUPLE_RECORD * inner_record,
QFILE_LIST_MERGE_INFO * merge_info, QFILE_TUPLE_RECORD * overflow_record);
static int hjoin_merge_tuple (THREAD_ENTRY * thread_p, QFILE_TUPLE_RECORD * outer_record,
QFILE_TUPLE_RECORD * inner_record, QFILE_LIST_MERGE_INFO * merge_info,
QFILE_TUPLE_RECORD * overflow_record);
/* Dump */
#if HASHJOIN_DUMP_HASH_TABLE
static void hjoin_dump_hash_table (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan, QFILE_LIST_ID * list_id);
#endif /* HASHJOIN_DUMP_HASH_TABLE */
#if !defined(NDEBUG) && HASHJOIN_DUMP_PROBE
static void hjoin_print_tuple (QFILE_LIST_SCAN_ID * list_scan_id, QFILE_TUPLE tuple, HASHJOIN_PRINT_STEP step);
#endif /* !NDEBUG && HASHJOIN_DUMP_PROBE */
/*
* Function Definitions
*/
/*
* qexec_hash_join() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* xasl(in): XASL node for hash join execution.
* query_id(in): Query identifier.
* val_descr(in): Value descriptor for positional values.
*/
int
qexec_hash_join (THREAD_ENTRY * thread_p, XASL_NODE * xasl, QUERY_ID query_id, VAL_DESCR * val_descr)
{
HASHJOIN_MANAGER manager;
HASHJOIN_CONTEXT *single_context;
HASHJOIN_STATUS status, part_status;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (xasl != NULL);
assert (query_id != NULL_QUERY_ID);
error = hjoin_init_manager (thread_p, &manager, xasl, query_id, val_descr);
if (error != NO_ERROR)
{
goto error_exit;
}
single_context = &manager.single_context;
status = hjoin_check_empty_inputs (&manager, single_context);
single_context->status = status;
switch (status)
{
case HASHJOIN_STATUS_FILL_NULL_VALUES:
error = hjoin_outer_fill_null_values (thread_p, &manager, single_context);
break;
case HASHJOIN_STATUS_TRY:
part_status = hjoin_try_partition (thread_p, &manager, single_context);
single_context->status = part_status;
switch (part_status)
{
case HASHJOIN_STATUS_SINGLE:
/* monitor */
perfmon_inc_stat (thread_p, PSTAT_QM_NUM_HASHJOINS);
error = hjoin_execute (thread_p, &manager, single_context);
break;
case HASHJOIN_STATUS_PARTITION:
/* monitor */
perfmon_inc_stat (thread_p, PSTAT_QM_NUM_HASHJOINS_PARTITIONED);
error = hjoin_execute_partitions (thread_p, &manager);
break;
#if defined (SERVER_MODE)
case HASHJOIN_STATUS_PARALLEL:
/* monitor */
perfmon_inc_stat (thread_p, PSTAT_QM_NUM_HASHJOINS_PARALLEL);
if (thread_is_on_trace (thread_p))
{
xasl->executed_parallelism = manager.num_parallel_threads;
}
// *INDENT-OFF*
error = parallel_query::hash_join::execute_partitions (*thread_p, &manager);
// *INDENT-ON*
break;
#endif /* defined (SERVER_MODE) */
case HASHJOIN_STATUS_END:
/* impossible case */
/* hjoin_check_empty_inputs guarantees HASHJOIN_STATUS_END cannot occur here */
assert_release_error (false);
goto error_exit;
case HASHJOIN_STATUS_ERROR:
/* hjoin_try_partition always retries as HASHJOIN_STATUS_SINGLE;
* except for ER_INTERRUPTED, never returns HASHJOIN_STATUS_ERROR */
error = er_errid ();
assert_release_error (error == ER_INTERRUPTED);
goto error_exit;
default:
/* impossible case */
assert_release_error (false);
goto error_exit;
}
break;
case HASHJOIN_STATUS_END:
/* Nothing to do */
assert (single_context->list_id == NULL);
break;
case HASHJOIN_STATUS_ERROR:
[[fallthrough]];
default:
/* impossible case */
assert_release_error (false);
goto error_exit;
}
if (error != NO_ERROR)
{
goto error_exit;
}
if (single_context->list_id != NULL)
{
/* Check if qfile_close_list was called */
assert (single_context->list_id->last_pgptr == NULL);
qfile_destroy_list (thread_p, xasl->list_id); /* may be unnecessary */
qfile_copy_list_id (xasl->list_id, single_context->list_id, false, QFILE_MOVE_DEPENDENT);
QFILE_FREE_AND_INIT_LIST_ID (single_context->list_id);
ASSERT_NO_ERROR_OR_INTERRUPTED ();
}
else if (status == HASHJOIN_STATUS_END)
{
ASSERT_NO_ERROR_OR_INTERRUPTED ();
}
else
{
/* list_id can be NULL when the join result is empty.
* In this case, it is NO_ERROR.
* Otherwise, an error may be set. */
error = er_errid ();
}
cleanup:
hjoin_clear_manager (thread_p, &manager);
return error;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
goto cleanup;
}
/*
* hjoin_execute_partitions() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
*/
static int
hjoin_execute_partitions (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager)
{
HASHJOIN_CONTEXT *current_context;
UINT32 context_cnt, context_index;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
HASHJOIN_STATS *stats = manager->single_context.stats;
#if HASHJOIN_PROFILE_TIME
HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
assert (!thread_is_on_trace (thread_p) || stats != NULL);
context_cnt = manager->context_cnt;
for (context_index = 0; context_index < context_cnt; context_index++)
{
current_context = &manager->contexts[context_index];
error = hjoin_execute (thread_p, manager, current_context);
if (error != NO_ERROR)
{
goto error_exit;
}
if (thread_is_on_trace (thread_p))
{
hjoin_trace_merge_stats (stats, current_context->stats);
}
if (current_context->list_id == NULL)
{
error = er_errid ();
if (error != NO_ERROR)
{
goto error_exit;
}
else
{
/* list_id can be NULL when the join result is empty.
* In this case, it is NO_ERROR. */
continue;
}
}
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_MERGE);
error = hjoin_merge_qlist (thread_p, manager, current_context);
HJOIN_PROFILE_MERGE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_MERGE,
manager->single_context.list_id->tuple_cnt);
if (error != NO_ERROR)
{
goto error_exit;
}
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_execute() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
*/
int
hjoin_execute (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context)
{
HASHJOIN_STATUS status;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
status = hjoin_check_empty_inputs (manager, context);
/* In outer joins, tuples with NULL in any join column are placed in the last partition.
* HASHJOIN_STATUS_FILL_NULL_VALUES is triggered for all tuples in that partition. */
if (IS_OUTER_JOIN_TYPE (manager->join_type) && context == &manager->contexts[manager->context_cnt - 1])
{
status = (status == HASHJOIN_STATUS_TRY) ? HASHJOIN_STATUS_FILL_NULL_VALUES : status;
}
context->status = status;
switch (status)
{
case HASHJOIN_STATUS_FILL_NULL_VALUES:
assert (context != &manager->single_context);
error = hjoin_outer_fill_null_values (thread_p, manager, context);
break;
case HASHJOIN_STATUS_TRY:
error = hjoin_execute_internal (thread_p, manager, context);
break;
case HASHJOIN_STATUS_END:
/* Nothing to do */
break;
case HASHJOIN_STATUS_ERROR:
default:
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
break;
}
/* Check if qfile_close_list was called */
assert (context->list_id == NULL || context->list_id->last_pgptr == NULL);
return error;
}
/*
* hjoin_outer_fill_null_values() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
*/
static int
hjoin_outer_fill_null_values (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context)
{
QFILE_LIST_ID *list_id = NULL;
QFILE_TUPLE_RECORD overflow_record = { NULL, 0 };
SCAN_CODE scan_code;
HASHJOIN_FETCH_INFO *outer, *inner;
HASHJOIN_FETCH_INFO *build, *probe;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
HASHJOIN_STATS *stats = context->stats;
HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
assert (!thread_is_on_trace (thread_p) || stats != NULL);
outer = &context->outer;
inner = &context->inner;
switch (manager->join_type)
{
case JOIN_LEFT:
context->build = inner;
context->probe = outer;
break;
case JOIN_RIGHT:
context->build = outer;
context->probe = inner;
break;
default:
/* impossible case */
assert_release_error (false);
goto error_exit;
}
build = context->build;
probe = context->probe;
/* Prevent faults when qfile_close_scan is called */
probe->list_scan_id.status = S_CLOSED;
// *INDENT-OFF*
probe->tuple_record = { NULL, 0 };
// *INDENT-ON*
build->fill_record = NULL;
probe->fill_record = &probe->tuple_record;
list_id = qfile_open_list (thread_p, &manager->type_list, NULL, manager->query_id, manager->qlist_flag, NULL);
if (list_id == NULL)
{
goto error_exit;
}
error = qfile_open_list_scan (probe->list_id, &probe->list_scan_id);
if (error != NO_ERROR)
{
goto error_exit;
}
if (thread_is_on_trace (thread_p))
{
hjoin_trace_start (thread_p, &start_stats);
assert (stats->build.read_rows == 0);
assert (stats->build.read_keys == 0);
stats->build.qualified_rows = build->list_id->tuple_cnt;
}
while ((scan_code = qfile_scan_list_next (thread_p, &probe->list_scan_id, &probe->tuple_record, PEEK)) == S_SUCCESS)
{
error =
hjoin_merge_tuple_to_list_id (thread_p, list_id, outer->fill_record, inner->fill_record, manager->merge_info,
&overflow_record);
if (error != NO_ERROR)
{
break;
}
}
if (thread_is_on_trace (thread_p))
{
hjoin_trace_end (thread_p, &stats->probe, &start_stats);
stats->probe.read_rows = probe->list_id->tuple_cnt;
assert (stats->probe.read_keys == 0);
stats->probe.qualified_rows = list_id->tuple_cnt;
}
/* After qfile_open_list_scan, if an error occurs,
* ensure qfile_close_scan runs here
* before jumping to error_exit. */
qfile_close_scan (thread_p, &probe->list_scan_id);
if (scan_code == S_ERROR || error != NO_ERROR)
{
goto error_exit;
}
qfile_close_list (thread_p, list_id);
context->list_id = list_id;
ASSERT_NO_ERROR_OR_INTERRUPTED ();
cleanup:
if (overflow_record.tpl != NULL)
{
db_private_free_and_init (thread_p, overflow_record.tpl);
}
hjoin_destroy_qlist (thread_p, context);
/* Check if qfile_close_list was called */
assert (list_id == NULL || list_id->last_pgptr == NULL);
return error;
error_exit:
if (list_id != NULL)
{
qfile_close_list (thread_p, list_id);
qfile_destroy_list (thread_p, list_id);
QFILE_FREE_AND_INIT_LIST_ID (list_id);
}
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
goto cleanup;
}
/*
* hjoin_execute_internal() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
*/
static int
hjoin_execute_internal (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context)
{
QFILE_LIST_ID *list_id = NULL;
HASHJOIN_FETCH_INFO *outer, *inner;
HASHJOIN_FETCH_INFO *build = NULL, *probe = NULL;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
assert (context->list_id == NULL);
outer = &context->outer;
inner = &context->inner;
/* Prevent faults when qfile_close_scan is called */
outer->list_scan_id.status = S_CLOSED;
inner->list_scan_id.status = S_CLOSED;
list_id = qfile_open_list (thread_p, &manager->type_list, NULL, manager->query_id, manager->qlist_flag, NULL);
if (list_id == NULL)
{
goto error_exit;
}
error = hjoin_init_context (thread_p, manager, context);
if (error != NO_ERROR)
{
goto error_exit;
}
build = context->build;
probe = context->probe;
assert (build != NULL);
assert (probe != NULL);
error = qfile_open_list_scan (build->list_id, &build->list_scan_id);
if (error != NO_ERROR)
{
goto error_exit;
}
error = hjoin_build (thread_p, manager, context);
if (error != NO_ERROR)
{
goto error_exit;
}
error = qfile_open_list_scan (probe->list_id, &probe->list_scan_id);
if (error != NO_ERROR)
{
goto error_exit;
}
if (IS_OUTER_JOIN_TYPE (manager->join_type))
{
error = hjoin_outer_probe (thread_p, manager, context, list_id);
}
else
{
error = hjoin_probe (thread_p, manager, context, list_id);
}
if (error != NO_ERROR)
{
goto error_exit;
}
qfile_close_list (thread_p, list_id);
context->list_id = list_id;
ASSERT_NO_ERROR_OR_INTERRUPTED ();
cleanup:
qfile_close_scan (thread_p, &build->list_scan_id);
qfile_close_scan (thread_p, &probe->list_scan_id);
hjoin_destroy_qlist (thread_p, context);
hjoin_scan_clear (thread_p, &context->hash_scan);
/* Check if qfile_close_list was called */
assert (list_id == NULL || list_id->last_pgptr == NULL);
return error;
error_exit:
if (list_id != NULL)
{
qfile_close_list (thread_p, list_id);
qfile_destroy_list (thread_p, list_id);
QFILE_FREE_AND_INIT_LIST_ID (list_id);
}
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
goto cleanup;
}
/*
* hjoin_init_manager() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in/out): Hash join manager to initialize.
* xasl(in): XASL node for hash join execution.
* query_id(in): Query identifier.
* val_descr(in): Value descriptor for positional values.
*/
static int
hjoin_init_manager (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, XASL_NODE * xasl, QUERY_ID query_id,
VAL_DESCR * val_descr)
{
HASHJOIN_PROC_NODE *proc;
QFILE_LIST_MERGE_INFO *merge_info;
QFILE_LIST_ID *outer_list_id, *inner_list_id;
HASHJOIN_DOMAIN_INFO *domain_info;
HASHJOIN_CONTEXT *context;
QFILE_TUPLE_VALUE_TYPE_LIST *type_list = NULL;
int type_cnt, type_index;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (xasl != NULL);
memset (manager, 0, sizeof (HASHJOIN_MANAGER));
proc = &xasl->proc.hashjoin;
merge_info = &proc->merge_info;
assert (merge_info->ls_pos_cnt > 0);
assert (merge_info->ls_pos_list != NULL);
manager->merge_info = merge_info;
manager->outer = &proc->outer;
manager->inner = &proc->inner;
assert (manager->outer->xasl != NULL);
assert (manager->inner->xasl != NULL);
outer_list_id = manager->outer->xasl->list_id;
inner_list_id = manager->inner->xasl->list_id;
assert (outer_list_id != NULL);
assert (inner_list_id != NULL);
/* When aptr_list is executed in qexec_execute_mainblock_internal,
* it checks the results from outer_xasl and inner_xasl in merge_info.
* If either has no result, the other is skipped,
* and the skipped node can have a type count of 0 in list_id.type_list. */
if (outer_list_id->type_list.type_cnt == 0 || inner_list_id->type_list.type_cnt == 0)
{
return NO_ERROR;
}
assert (outer_list_id->type_list.domp != NULL);
assert (inner_list_id->type_list.domp != NULL);
manager->join_type = merge_info->join_type;
manager->key_cnt = merge_info->ls_column_cnt;
manager->during_join_pred = xasl->during_join_pred;
manager->num_parallel_threads = xasl->parallelism;
manager->query_id = query_id;
manager->val_descr = val_descr;
domain_info = &proc->domain_info;
error = hjoin_init_domain_info (thread_p, manager, domain_info);
if (error != NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
/* single_context */
context = &manager->single_context;
assert (context->list_id == NULL);
context->outer.list_id = outer_list_id;
context->outer.input = &domain_info->outer;
context->outer.coerce_domains = domain_info->coerce_domains;
context->outer.need_coerce_domains = domain_info->need_coerce_domains;
context->outer.regu_list_pred = proc->outer.regu_list_pred;
context->inner.list_id = inner_list_id;
context->inner.input = &domain_info->inner;
context->inner.coerce_domains = domain_info->coerce_domains;
context->inner.need_coerce_domains = domain_info->need_coerce_domains;
context->inner.regu_list_pred = proc->inner.regu_list_pred;
/* Set in hjoin_init_context or hjoin_outer_fill_null_values. */
assert (context->build == NULL);
assert (context->probe == NULL);
context->during_join_pred = manager->during_join_pred;
context->val_descr = manager->val_descr;
assert (context->status == HASHJOIN_STATUS_NONE);
/* contexts */
assert (manager->contexts == NULL);
assert (manager->context_cnt == 0);
/* type_list */
type_list = &manager->type_list;
assert (type_list->domp == NULL);
assert (type_list->type_cnt == 0);
type_cnt = merge_info->ls_pos_cnt;
type_list->domp = (TP_DOMAIN **) db_private_alloc (thread_p, type_cnt * sizeof (TP_DOMAIN *));
if (type_list->domp == NULL)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
type_list->type_cnt = type_cnt;
for (type_index = 0; type_index < type_cnt; type_index++)
{
if (merge_info->ls_outer_inner_list[type_index] == QFILE_OUTER_LIST)
{
type_list->domp[type_index] = outer_list_id->type_list.domp[merge_info->ls_pos_list[type_index]];
}
else
{
type_list->domp[type_index] = inner_list_id->type_list.domp[merge_info->ls_pos_list[type_index]];
}
}
manager->qlist_merge_method = HASHJOIN_MERGE_CONNECT;
manager->qlist_flag =
(manager->qlist_merge_method == HASHJOIN_MERGE_CONNECT) ? QFILE_FLAG_ALL | QFILE_NOT_USE_MEMBUF : QFILE_FLAG_ALL;
assert (manager->px_worker_manager == NULL);
/* stats_group */
if (thread_is_on_trace (thread_p))
{
manager->stats_group = &proc->stats_group;
memset (manager->stats_group, 0, sizeof (HASHJOIN_STATS_GROUP));
context->stats = &manager->stats_group->stats;
}
else
{
assert (manager->stats_group == NULL);
assert (context->stats == NULL);
}
#if defined (SERVER_MODE) && HASHJOIN_DUMP_HASH_TABLE
pthread_mutex_init (&manager->dump_hash_table_mutex, NULL);
#endif /* defined (SERVER_MODE) && HASHJOIN_DUMP_HASH_TABLE */
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
}
/*
* hjoin_clear_manager() -
* return: None.
* thread_p(in): Thread entry.
* manager(in): Hash join manager to clear.
*/
static void
hjoin_clear_manager (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager)
{
HASHJOIN_CONTEXT *single_context;
HASHJOIN_CONTEXT *contexts = NULL;
UINT32 context_cnt, context_index;
assert (thread_p != NULL);
assert (manager != NULL);
single_context = &manager->single_context;
if (single_context->list_id != NULL)
{
qfile_close_list (thread_p, single_context->list_id);
qfile_destroy_list (thread_p, single_context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (single_context->list_id);
}
hjoin_destroy_qlist (thread_p, single_context);
if (manager->contexts != NULL)
{
contexts = manager->contexts;
context_cnt = manager->context_cnt;
assert (context_cnt > 1);
for (context_index = 0; context_index < context_cnt; context_index++)
{
hjoin_clear_context (thread_p, &contexts[context_index]);
}
db_private_free_and_init (thread_p, contexts);
manager->contexts = NULL;
manager->context_cnt = 0;
}
else
{
assert (manager->context_cnt == 0);
}
if (manager->type_list.domp != NULL)
{
db_private_free_and_init (thread_p, manager->type_list.domp);
}
#if defined (SERVER_MODE)
if (manager->px_worker_manager != NULL)
{
manager->px_worker_manager->release_workers ();
manager->px_worker_manager = NULL;
}
if (manager->px_worker_stats != NULL)
{
db_private_free_and_init (thread_p, manager->px_worker_stats);
}
THREAD_ENTRY *main_thread_p = thread_get_main_thread (thread_p);
/* only top-level parent */
if (main_thread_p == thread_p)
{
if (thread_p->m_px_stats != NULL && !thread_p->m_uses_px_stats)
{
perfmon_merge_parallel_stats_to_tran_stats (thread_p);
free_and_init (thread_p->m_px_stats);
}
}
#else
assert (manager->px_worker_manager == NULL);
assert (manager->px_worker_stats == NULL);
assert (thread_p->m_px_stats == NULL);
#endif /* defined (SERVER_MODE) */
#if defined (SERVER_MODE) && HASHJOIN_DUMP_HASH_TABLE
pthread_mutex_destroy (&manager->dump_hash_table_mutex);
#endif /* defined (SERVER_MODE) && HASHJOIN_DUMP_HASH_TABLE */
}
/*
* hjoin_init_domain_info() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* domain_info(in/out): Domain information for join columns.
*/
static int
hjoin_init_domain_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_DOMAIN_INFO * domain_info)
{
QFILE_LIST_MERGE_INFO *merge_info;
QFILE_LIST_ID *outer_list_id, *inner_list_id;
TP_DOMAIN **outer_domains, **inner_domains, **coerce_domains;
int *outer_value_indexes, *inner_value_indexes;
int outer_value_index, inner_value_index;
int domain_cnt, domain_index;
bool need_coerce_domains;
DB_TYPE outer_type, inner_type, common_type;
int outer_precision, inner_precision;
int outer_scale, inner_scale;
int outer_integral, inner_integral;
int common_precision, common_scale;
assert (thread_p != NULL);
assert (manager != NULL);
assert (domain_info != NULL);
merge_info = manager->merge_info;
assert (merge_info != NULL);
assert (manager->outer != NULL);
assert (manager->outer->xasl != NULL);
assert (manager->outer->xasl->list_id != NULL);
outer_list_id = manager->outer->xasl->list_id;
assert (manager->inner != NULL);
assert (manager->inner->xasl != NULL);
assert (manager->inner->xasl->list_id != NULL);
inner_list_id = manager->inner->xasl->list_id;
/* domain_info */
domain_cnt = merge_info->ls_column_cnt;
outer_domains = domain_info->outer.domains;
outer_value_indexes = domain_info->outer.value_indexes;
assert (outer_domains != NULL);
assert (outer_value_indexes != NULL);
inner_domains = domain_info->inner.domains;
inner_value_indexes = domain_info->inner.value_indexes;
assert (inner_domains != NULL);
assert (inner_value_indexes != NULL);
coerce_domains = domain_info->coerce_domains;
need_coerce_domains = domain_info->need_coerce_domains = false;
memset (coerce_domains, 0, domain_cnt * sizeof (TP_DOMAIN *));
/* This code references tp_infer_common_domain but reduces unnecessary calls to tp_domain_new. */
for (domain_index = 0; domain_index < domain_cnt; domain_index++)
{
outer_value_index = outer_value_indexes[domain_index];
inner_value_index = inner_value_indexes[domain_index];
outer_domains[domain_index] = outer_list_id->type_list.domp[outer_value_index];
inner_domains[domain_index] = inner_list_id->type_list.domp[inner_value_index];
assert (outer_domains[domain_index] != NULL);
assert (inner_domains[domain_index] != NULL);
outer_type = TP_DOMAIN_TYPE (outer_domains[domain_index]);
inner_type = TP_DOMAIN_TYPE (inner_domains[domain_index]);
/* common_type */
if (outer_type == inner_type)
{
common_type = outer_type;
}
else
{
need_coerce_domains = true;
if (outer_type == DB_TYPE_NULL)
{
assert (false);
coerce_domains[domain_index] = inner_domains[domain_index];
continue;
}
else if (inner_type == DB_TYPE_NULL)
{
assert (false);
coerce_domains[domain_index] = outer_domains[domain_index];
continue;
}
else
{
common_type = (tp_more_general_type (outer_type, inner_type) > 0) ? outer_type : inner_type;
}
}
/* common_precision, common_scale */
outer_precision = outer_domains[domain_index]->precision;
outer_scale = outer_domains[domain_index]->scale;
inner_precision = inner_domains[domain_index]->precision;
inner_scale = inner_domains[domain_index]->scale;
if (outer_precision == inner_precision && outer_scale == inner_scale)
{
common_precision = inner_precision;
common_scale = inner_scale;
}
else
{
need_coerce_domains = true;
if (outer_precision == TP_FLOATING_PRECISION_VALUE || inner_precision == TP_FLOATING_PRECISION_VALUE)
{
common_precision = TP_FLOATING_PRECISION_VALUE;
common_scale = 0;
}
else if (common_type == DB_TYPE_NUMERIC)
{
common_scale = MAX (outer_scale, inner_scale);
outer_integral = outer_precision - outer_scale;
inner_integral = inner_precision - inner_scale;
common_precision = MAX (outer_integral, inner_integral) + common_scale;
common_precision = MIN (common_precision, DB_MAX_NUMERIC_PRECISION);
}
else
{
common_precision = MAX (outer_precision, inner_precision);
common_scale = 0;
}
}
/* need_coerce_domains, coerce_domains */
if (need_coerce_domains)
{
if (common_type == outer_type && common_precision == outer_precision && common_scale == outer_scale)
{
coerce_domains[domain_index] = outer_domains[domain_index];
}
else if (common_type == inner_type && common_precision == inner_precision && common_scale == inner_scale)
{
coerce_domains[domain_index] = inner_domains[domain_index];
}
else
{
coerce_domains[domain_index] =
tp_domain_copy ((common_type == outer_type) ? outer_domains[domain_index] : inner_domains[domain_index],
false);
if (coerce_domains[domain_index] == NULL)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
coerce_domains[domain_index]->precision = common_precision;
coerce_domains[domain_index]->scale = common_scale;
coerce_domains[domain_index] = tp_domain_cache (coerce_domains[domain_index]);
}
}
} /* for (domain_index < domain_cnt) */
#if !defined (NDEBUG)
if (!need_coerce_domains)
{
for (domain_index = 0; domain_index < domain_cnt; domain_index++)
{
assert (coerce_domains[domain_index] == NULL);
}
}
#endif /* !NDEBUG */
/* If join predicates compare different types, need_coerce_domains is set to true;
* otherwise, it is false.
*
* When need_coerce_domains is true, coerce_domains is set to inner_domains,
* outer_domains, or a common domain for comparison.
*
* If either inner_domains or outer_domains matches coerce_domains,
* no coercion is needed. Otherwise, values are coerced to the common domain. */
domain_info->need_coerce_domains = need_coerce_domains;
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
}
/*
* hjoin_try_partition() -
* return: One of the following HASHJOIN_STATUS values:
* - HASHJOIN_STATUS_SINGLE: Partitioning is not needed or falls back on error.
* - HASHJOIN_STATUS_PARTITION: Partitioning is applied.
* - HASHJOIN_STATUS_PARALLEL: Parallel execution is applied.
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* single_context(in): Hash join context for single-threaded execution.
*/
static HASHJOIN_STATUS
hjoin_try_partition (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * single_context)
{
HASHJOIN_STATUS status;
HASHJOIN_SPLIT_INFO split_info;
UINT32 context_index;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (single_context != NULL);
assert (single_context == &manager->single_context);
status = hjoin_check_partition (thread_p, manager, single_context);
if (status == HASHJOIN_STATUS_SINGLE)
{
return status;
}
assert (status == HASHJOIN_STATUS_PARTITION);
error = hjoin_prepare_partition (thread_p, manager, &split_info);
if (error != NO_ERROR)
{
goto error_exit;
}
#if defined (SERVER_MODE) && !defined (WINDOWS)
status = hjoin_try_parallel (thread_p, manager, single_context);
single_context->status = status;
if (status == HASHJOIN_STATUS_ERROR)
{
goto error_exit;
}
#endif /* defined (SERVER_MODE) && !defined (WINDOWS) */
switch (status)
{
case HASHJOIN_STATUS_PARTITION:
if (thread_is_on_trace (thread_p))
{
assert (single_context->stats != NULL);
assert (single_context->stats->num_parallel_threads == 0);
}
error = hjoin_build_partitions (thread_p, manager, &split_info);
break;
#if defined (SERVER_MODE)
case HASHJOIN_STATUS_PARALLEL:
if (thread_is_on_trace (thread_p))
{
assert (single_context->stats != NULL);
single_context->stats->num_parallel_threads = manager->num_parallel_threads;
}
// *INDENT-OFF*
error = parallel_query::hash_join::build_partitions (*thread_p, manager, &split_info);
// *INDENT-ON*
break;
#endif /* defined (SERVER_MODE) */
case HASHJOIN_STATUS_ERROR:
goto error_exit;
default:
/* impossible case */
assert_release_error (false);
goto error_exit;
}
if (error != NO_ERROR)
{
goto error_exit;
}
/* call hjoin_destroy_qlist after build_partitions is done,
* since HASHJOIN_STATUS_SINGLE may retry on error*/
hjoin_destroy_qlist (thread_p, single_context);
assert (status == HASHJOIN_STATUS_PARTITION || status == HASHJOIN_STATUS_PARALLEL);
ASSERT_NO_ERROR_OR_INTERRUPTED ();
cleanup:
hjoin_clear_split_info (thread_p, manager, &split_info, false);
return status;
error_exit:
#if defined (SERVER_MODE)
if (manager->px_worker_manager != NULL)
{
manager->px_worker_manager->release_workers ();
manager->px_worker_manager = NULL;
}
#else
assert (manager->px_worker_manager == NULL);
#endif /* defined (SERVER_MODE) */
hjoin_clear_split_info (thread_p, manager, &split_info, true);
if (manager->contexts != NULL)
{
assert (manager->context_cnt > 1);
for (context_index = 0; context_index < manager->context_cnt; context_index++)
{
hjoin_clear_context (thread_p, &manager->contexts[context_index]);
}
db_private_free_and_init (thread_p, manager->contexts);
manager->context_cnt = 0;
}
if (thread_is_on_trace (thread_p))
{
assert (manager->stats_group != NULL);
if (manager->stats_group->context_stats != NULL)
{
free_and_init (manager->stats_group->context_stats);
}
manager->stats_group->context_cnt = 0;
}
else
{
assert (manager->stats_group == NULL);
}
if (error == ER_INTERRUPTED || er_errid () == ER_INTERRUPTED)
{
status = HASHJOIN_STATUS_ERROR;
}
else
{
/* fallback to HASHJOIN_STATUS_SINGLE */
er_clear ();
status = HASHJOIN_STATUS_SINGLE;
}
goto cleanup;
}
/*
* hjoin_check_partition() -
* return: One of the following HASHJOIN_STATUS values:
* - HASHJOIN_STATUS_SINGLE: Partitioning is not needed.
* - HASHJOIN_STATUS_PARTITION: Partitioning is applied.
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* single_context(in): Hash join context for single-threaded execution.
*/
static HASHJOIN_STATUS
hjoin_check_partition (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * single_context)
{
QFILE_LIST_ID *outer_list_id, *inner_list_id;
UINT64 mem_limit;
INT64 min_tuple_cnt;
UINT32 part_cnt;
assert (thread_p != NULL);
assert (manager != NULL);
assert (single_context != NULL);
assert (single_context == &manager->single_context);
outer_list_id = single_context->outer.list_id;
inner_list_id = single_context->inner.list_id;
assert (outer_list_id != NULL);
assert (inner_list_id != NULL);
mem_limit = prm_get_bigint_value (PRM_ID_MAX_HASH_LIST_SCAN_SIZE);
assert (mem_limit > 0);
min_tuple_cnt =
(outer_list_id->tuple_cnt < inner_list_id->tuple_cnt) ? outer_list_id->tuple_cnt : inner_list_id->tuple_cnt;
assert (min_tuple_cnt >= 0);
part_cnt =
CEIL_PTVDIV ((sizeof (HENTRY_HLS) + sizeof (QFILE_TUPLE_SIMPLE_POS)) * min_tuple_cnt,
mem_limit * PARTITION_FILL_FACTOR);
if (part_cnt > 1)
{
if (IS_OUTER_JOIN_TYPE (manager->join_type))
{
/* In outer joins, tuples with NULL in any join column are placed in the last partition.
* HASHJOIN_STATUS_FILL_NULL_VALUES is triggered for all tuples in this partition. */
part_cnt += 1;
}
manager->context_cnt = part_cnt;
return HASHJOIN_STATUS_PARTITION;
}
else
{
assert (manager->context_cnt == 0);
return HASHJOIN_STATUS_SINGLE;
}
}
/*
* hjoin_prepare_partition() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* split_info(in): Split information.
*/
static int
hjoin_prepare_partition (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_SPLIT_INFO * split_info)
{
QFILE_LIST_ID *outer_list_id, *inner_list_id;
QFILE_LIST_ID **outer_part_list_id = NULL, **inner_part_list_id = NULL;
HASHJOIN_CONTEXT *single_context;
HASHJOIN_CONTEXT *contexts = NULL, *current_context;
HASHJOIN_STATS *context_stats = NULL;
UINT32 part_cnt, part_index;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (split_info != NULL);
single_context = &manager->single_context;
outer_list_id = single_context->outer.list_id;
inner_list_id = single_context->inner.list_id;
assert (outer_list_id != NULL);
assert (inner_list_id != NULL);
error = hjoin_init_split_info (thread_p, manager, split_info);
if (error != NO_ERROR)
{
goto error_exit;
}
outer_part_list_id = split_info->outer.part_list_id;
inner_part_list_id = split_info->inner.part_list_id;
assert (outer_part_list_id != NULL);
assert (inner_part_list_id != NULL);
part_cnt = manager->context_cnt;
assert (part_cnt > 1);
contexts = (HASHJOIN_CONTEXT *) db_private_alloc (thread_p, part_cnt * sizeof (HASHJOIN_CONTEXT));
if (contexts == NULL)
{
goto error_exit;
}
memset (contexts, 0, part_cnt * sizeof (HASHJOIN_CONTEXT));
for (part_index = 0; part_index < part_cnt; part_index++)
{
current_context = &contexts[part_index];
outer_part_list_id[part_index] =
qfile_open_list (thread_p, &outer_list_id->type_list, NULL, outer_list_id->query_id, QFILE_FLAG_ALL, NULL);
if (outer_part_list_id[part_index] == NULL)
{
goto error_exit;
}
inner_part_list_id[part_index] =
qfile_open_list (thread_p, &inner_list_id->type_list, NULL, inner_list_id->query_id, QFILE_FLAG_ALL, NULL);
if (inner_part_list_id[part_index] == NULL)
{
goto error_exit;
}
assert (current_context->list_id == NULL);
current_context->outer.list_id = outer_part_list_id[part_index];
current_context->outer.input = single_context->outer.input;
current_context->outer.coerce_domains = single_context->outer.coerce_domains;
current_context->outer.need_coerce_domains = single_context->outer.need_coerce_domains;
current_context->outer.regu_list_pred = single_context->outer.regu_list_pred;
current_context->inner.list_id = inner_part_list_id[part_index];
current_context->inner.input = single_context->inner.input;
current_context->inner.coerce_domains = single_context->inner.coerce_domains;
current_context->inner.need_coerce_domains = single_context->inner.need_coerce_domains;
current_context->inner.regu_list_pred = single_context->inner.regu_list_pred;
assert (current_context->build == NULL);
assert (current_context->probe == NULL);
current_context->during_join_pred = single_context->during_join_pred;
current_context->val_descr = single_context->val_descr;
}
manager->contexts = contexts;
if (thread_is_on_trace (thread_p))
{
context_stats = (HASHJOIN_STATS *) malloc (part_cnt * sizeof (HASHJOIN_STATS));
if (context_stats == NULL)
{
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, part_cnt * sizeof (HASHJOIN_STATS));
goto error_exit;
}
memset (context_stats, 0, part_cnt * sizeof (HASHJOIN_STATS));
for (part_index = 0; part_index < part_cnt; part_index++)
{
contexts[part_index].stats = &context_stats[part_index];
}
assert (manager->stats_group != NULL);
manager->stats_group->context_stats = context_stats;
manager->stats_group->context_cnt = part_cnt;
}
else
{
assert (manager->stats_group == NULL);
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
if (contexts != NULL)
{
db_private_free_and_init (thread_p, contexts);
}
hjoin_clear_split_info (thread_p, manager, split_info, true);
if (thread_is_on_trace (thread_p))
{
if (context_stats != NULL)
{
free_and_init (context_stats);
}
assert (manager->stats_group != NULL);
manager->stats_group->context_stats = NULL;
manager->stats_group->context_cnt = 0;
}
else
{
assert (context_stats == NULL);
assert (manager->stats_group == NULL);
}
manager->contexts = NULL;
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_build_partitions() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* split_info(in): Split information.
*/
static int
hjoin_build_partitions (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_SPLIT_INFO * split_info)
{
QFILE_LIST_ID **temp_part_list_id = NULL;
HASH_SCAN_KEY *temp_key = NULL;
UINT32 part_cnt, part_index;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (split_info != NULL);
HASHJOIN_STATS *stats = manager->single_context.stats;
HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
assert (!thread_is_on_trace (thread_p) || stats != NULL);
if (thread_is_on_trace (thread_p))
{
hjoin_trace_start (thread_p, &start_stats);
}
part_cnt = manager->context_cnt;
temp_part_list_id = (QFILE_LIST_ID **) db_private_alloc (thread_p, part_cnt * sizeof (QFILE_LIST_ID *));
if (temp_part_list_id == NULL)
{
goto error_exit;
}
memset (temp_part_list_id, 0, part_cnt * sizeof (QFILE_LIST_ID *));
temp_key = qdata_alloc_hscan_key (thread_p, manager->key_cnt, true);
if (temp_key == NULL)
{
goto error_exit;
}
error = hjoin_split_qlist (thread_p, manager, &split_info->outer, temp_part_list_id, temp_key);
if (error != NO_ERROR)
{
goto error_exit;
}
error = hjoin_split_qlist (thread_p, manager, &split_info->inner, temp_part_list_id, temp_key);
if (error != NO_ERROR)
{
goto error_exit;
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
cleanup:
if (thread_is_on_trace (thread_p))
{
hjoin_trace_end (thread_p, &stats->split, &start_stats);
}
if (temp_part_list_id != NULL)
{
for (part_index = 0; part_index < part_cnt; part_index++)
{
if (temp_part_list_id[part_index] != NULL)
{
qfile_close_list (thread_p, temp_part_list_id[part_index]);
qfile_destroy_list (thread_p, temp_part_list_id[part_index]);
QFILE_FREE_AND_INIT_LIST_ID (temp_part_list_id[part_index]);
}
}
db_private_free_and_init (thread_p, temp_part_list_id);
}
if (temp_key != NULL)
{
qdata_free_hscan_key (thread_p, temp_key, manager->key_cnt);
}
return error;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
goto cleanup;
}
/*
* hjoin_split_qlist() -
* return: Error code (NO_ERROR if successful, error code otherwise)
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* split_info(in): Split information.
* key(in/out): Space for reading join column values.
*/
static int
hjoin_split_qlist (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_INPUT_SPLIT_INFO * split_info,
QFILE_LIST_ID ** temp_part_list_id, HASH_SCAN_KEY * temp_key)
{
QFILE_LIST_ID *list_id;
QFILE_LIST_ID **part_list_id;
QFILE_LIST_SCAN_ID list_scan_id;
QFILE_TUPLE_RECORD tuple_record = { NULL, 0 };
SCAN_CODE scan_code;
unsigned int hash_key;
UINT32 part_cnt, part_index, part_id;
bool is_outer_join = false;
bool need_skip_next = false;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (split_info != NULL);
assert (split_info->fetch_info != NULL);
assert (temp_part_list_id != NULL);
assert (temp_key != NULL);
list_id = split_info->fetch_info->list_id;
part_list_id = split_info->part_list_id;
part_cnt = manager->context_cnt;
assert (list_id != NULL);
assert (part_list_id != NULL);
assert (part_cnt > 1);
/* Prevent faults when qfile_close_scan is called */
list_scan_id.status = S_CLOSED;
is_outer_join = IS_OUTER_JOIN_TYPE (manager->join_type);
error = qfile_open_list_scan (list_id, &list_scan_id);
if (error != NO_ERROR)
{
goto error_exit;
}
while ((scan_code = qfile_scan_list_next (thread_p, &list_scan_id, &tuple_record, PEEK)) == S_SUCCESS)
{
error = hjoin_fetch_key (thread_p, split_info->fetch_info, &tuple_record, temp_key, NULL /* compare_key */ ,
&need_skip_next);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
need_skip_next = false; /* init */
if (is_outer_join)
{
/* In outer joins, tuples with NULL in any join column are placed in the last partition.
* HASHJOIN_STATUS_FILL_NULL_VALUES is triggered for all tuples in that partition. */
part_id = part_cnt - 1;
}
else
{
/* next tuple */
continue;
}
} /* else if (need_skip_next) */
else
{
hash_key = qdata_hash_scan_key (temp_key, UINT_MAX, HASH_METH_IN_MEM);
part_id = (is_outer_join) ? hash_key % (part_cnt - 1) : hash_key % (part_cnt);
hjoin_update_tuple_hash_key (thread_p, &tuple_record, hash_key);
}
/* overflow page */
if (QFILE_GET_OVERFLOW_PAGE_ID (list_scan_id.curr_pgptr) != NULL_PAGEID)
{
assert (part_list_id[part_id]->last_pgptr == NULL);
if (qfile_reopen_list_as_append_mode (thread_p, part_list_id[part_id]) != NO_ERROR)
{
break; /* error_exit */
}
error = qfile_add_tuple_to_list (thread_p, part_list_id[part_id], tuple_record.tpl);
if (error != NO_ERROR)
{
break; /* error_exit */
}
qfile_close_list (thread_p, part_list_id[part_id]);
/* next tuple */
continue;
}
if (temp_part_list_id[part_id] != NULL &&
(temp_part_list_id[part_id]->tfile_vfid->membuf_last ==
temp_part_list_id[part_id]->tfile_vfid->membuf_npages - 1) &&
(temp_part_list_id[part_id]->last_offset + QFILE_GET_TUPLE_LENGTH (tuple_record.tpl)) > DB_PAGESIZE)
{
qfile_close_list (thread_p, temp_part_list_id[part_id]); /* may be meaningless since only memory buffer is used */
assert (part_list_id[part_id]->last_pgptr == NULL);
if (part_list_id[part_id]->tuple_cnt > 0)
{
error = qfile_append_list (thread_p, part_list_id[part_id], temp_part_list_id[part_id]);
if (error != NO_ERROR)
{
break; /* error_exit */
}
error = qfile_truncate_list (thread_p, temp_part_list_id[part_id]);
if (error != NO_ERROR)
{
break; /* error_exit */
}
}
else
{
qfile_destroy_list (thread_p, part_list_id[part_id]);
qfile_copy_list_id (part_list_id[part_id], temp_part_list_id[part_id], false, QFILE_PROHIBIT_DEPENDENT);
QFILE_FREE_AND_INIT_LIST_ID (temp_part_list_id[part_id]);
}
}
if (temp_part_list_id[part_id] == NULL)
{
temp_part_list_id[part_id] =
qfile_open_list (thread_p, &list_id->type_list, NULL, list_id->query_id, QFILE_FLAG_ALL, NULL);
if (temp_part_list_id[part_id] == NULL)
{
break; /* error_exit */
}
}
error = qfile_add_tuple_to_list (thread_p, temp_part_list_id[part_id], tuple_record.tpl);
if (error != NO_ERROR)
{
break; /* error_exit */
}
assert (VFID_ISNULL (&temp_part_list_id[part_id]->tfile_vfid->temp_vfid));
} /* while (qfile_scan_list_next (list_scan_id)) */
/* After qfile_open_list_scan, if an error occurs,
* ensure qfile_close_scan runs here
* before jumping to error_exit. */
qfile_close_scan (thread_p, &list_scan_id);
for (part_index = 0; part_index < part_cnt; part_index++)
{
if (temp_part_list_id[part_index] != NULL)
{
qfile_close_list (thread_p, temp_part_list_id[part_index]); /* may be meaningless since only memory buffer is used */
if (temp_part_list_id[part_index]->tuple_cnt > 0)
{
assert (part_list_id[part_index]->last_pgptr == NULL);
if (part_list_id[part_index]->tuple_cnt > 0)
{
error = qfile_append_list (thread_p, part_list_id[part_index], temp_part_list_id[part_index]);
if (error != NO_ERROR)
{
break; /* error_exit */
}
qfile_destroy_list (thread_p, temp_part_list_id[part_index]);
}
else
{
qfile_destroy_list (thread_p, part_list_id[part_index]);
qfile_copy_list_id (part_list_id[part_index], temp_part_list_id[part_index], false,
QFILE_PROHIBIT_DEPENDENT);
}
}
else
{
qfile_destroy_list (thread_p, temp_part_list_id[part_index]);
}
QFILE_FREE_AND_INIT_LIST_ID (temp_part_list_id[part_index]);
}
}
if (scan_code == S_ERROR || error != NO_ERROR)
{
goto error_exit;
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_merge_qlist() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
*/
int
hjoin_merge_qlist (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context)
{
QFILE_LIST_ID *new_list_id = NULL;
HASHJOIN_CONTEXT *single_context;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
/* Check if qfile_close_list was called */
assert (context->list_id != NULL);
assert (context->list_id->last_pgptr == NULL);
single_context = &manager->single_context;
assert (single_context != context);
if (single_context->list_id == NULL)
{
single_context->list_id = context->list_id;
context->list_id = NULL;
return NO_ERROR;
}
else if (single_context->list_id->tuple_cnt == 0)
{
qfile_destroy_list (thread_p, single_context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (single_context->list_id);
single_context->list_id = context->list_id;
context->list_id = NULL;
return NO_ERROR;
}
else if (context->list_id->tuple_cnt == 0)
{
qfile_destroy_list (thread_p, context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (context->list_id);
return NO_ERROR;
}
/* Check if qfile_close_list was called */
assert (single_context->list_id->last_pgptr == NULL);
switch (manager->qlist_merge_method)
{
case HASHJOIN_MERGE_COMBINE:
{
new_list_id =
qfile_combine_two_list (thread_p, single_context->list_id, context->list_id,
QFILE_FLAG_ALL | QFILE_FLAG_UNION);
if (new_list_id == NULL)
{
goto error_exit;
}
qfile_destroy_list (thread_p, single_context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (single_context->list_id);
qfile_destroy_list (thread_p, context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (context->list_id);
single_context->list_id = new_list_id;
new_list_id = NULL;
break;
}
case HASHJOIN_MERGE_APPEND:
{
error = qfile_append_list (thread_p, single_context->list_id, context->list_id);
if (error != NO_ERROR)
{
goto error_exit;
}
qfile_destroy_list (thread_p, context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (context->list_id);
break;
}
case HASHJOIN_MERGE_CONNECT:
{
error = qfile_connect_list (thread_p, single_context->list_id, context->list_id);
if (error != NO_ERROR)
{
goto error_exit;
}
/* Do not call QFILE_FREE_AND_INIT_LIST_ID; it must be called through single_context->list_id. */
context->list_id = NULL;
break;
}
default:
/* impossible case */
assert_release_error (false);
goto error_exit;
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_try_parallel() -
* return: One of the following HASHJOIN_STATUS values:
* - HASHJOIN_STATUS_PARTITION: Parallel execution is not applied or falls back on error.
* - HASHJOIN_STATUS_PARALLEL: Parallel execution is applied.
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* single_context(in): Hash join context for single-threaded execution.
*/
static HASHJOIN_STATUS
hjoin_try_parallel (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * single_context)
{
QFILE_LIST_ID *outer_list_id, *inner_list_id;
INT64 min_page_cnt;
parallel_query::worker_manager * px_worker_manager = NULL;
UINT64 *px_worker_stats = NULL;
assert (thread_p != NULL);
assert (manager != NULL);
assert (single_context != NULL);
assert (single_context == &manager->single_context);
#if !defined (SERVER_MODE)
assert (false);
#endif /* defined (SERVER_MODE) */
outer_list_id = single_context->outer.list_id;
inner_list_id = single_context->inner.list_id;
assert (outer_list_id != NULL);
assert (inner_list_id != NULL);
/* immutable */
static const size_t stats_size = perfmon_get_number_of_statistic_values () * sizeof (UINT64);
/* check if pages are enough for parallel-thread hash join */
min_page_cnt =
(outer_list_id->page_cnt < inner_list_id->page_cnt) ? outer_list_id->page_cnt : inner_list_id->page_cnt;
assert (min_page_cnt >= 0);
manager->num_parallel_threads =
parallel_query::compute_parallel_degree (parallel_query::parallel_type::HASH_JOIN, min_page_cnt,
manager->num_parallel_threads);
if (manager->num_parallel_threads < 2)
{
/* try single-thread hash join */
assert (manager->num_parallel_threads == 0);
assert (manager->px_worker_manager == NULL);
return HASHJOIN_STATUS_PARTITION;
}
manager->num_parallel_threads = MIN (manager->num_parallel_threads, manager->context_cnt /* part_cnt */ );
px_worker_manager = parallel_query::worker_manager::try_reserve_workers (manager->num_parallel_threads);
if (px_worker_manager == NULL)
{
goto error_exit;
}
/* update to actual reserved workers */
manager->num_parallel_threads = px_worker_manager->get_reserved_workers ();
if (thread_is_on_trace (thread_p))
{
px_worker_stats = (UINT64 *) db_private_alloc (thread_p, manager->num_parallel_threads * stats_size);
if (px_worker_stats == NULL)
{
assert_release_error (er_errid () != NO_ERROR);
goto error_exit;
}
memset (px_worker_stats, 0, manager->num_parallel_threads * stats_size);
/* only top-level parent */
if (thread_p->m_px_stats == NULL)
{
thread_p->m_px_stats = perfmon_allocate_values ();
if (thread_p->m_px_stats == NULL)
{
assert_release_error (er_errid () != NO_ERROR);
goto error_exit;
}
memset (thread_p->m_px_stats, 0, stats_size);
}
manager->px_worker_stats = px_worker_stats;
}
else
{
assert (manager->px_worker_stats == NULL);
}
manager->px_worker_manager = px_worker_manager;
return HASHJOIN_STATUS_PARALLEL;
error_exit:
manager->num_parallel_threads = 0;
if (px_worker_manager != NULL)
{
px_worker_manager->release_workers ();
}
if (px_worker_stats != NULL)
{
db_private_free_and_init (thread_p, px_worker_stats);
}
if (er_errid () == ER_INTERRUPTED)
{
return HASHJOIN_STATUS_ERROR;
}
else
{
/* fallback to HASHJOIN_STATUS_PARTITION */
er_clear ();
return HASHJOIN_STATUS_PARTITION;
}
}
/*
* hjoin_init_split_info() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* split_info(in/out): Split information.
*/
static int
hjoin_init_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_SPLIT_INFO * split_info)
{
HASHJOIN_CONTEXT *single_context;
HASHJOIN_INPUT_SPLIT_INFO *outer, *inner;
UINT32 part_cnt;
assert (thread_p != NULL);
assert (manager != NULL);
assert (split_info != NULL);
single_context = &manager->single_context;
outer = &split_info->outer;
inner = &split_info->inner;
outer->fetch_info = &single_context->outer;
outer->part_list_id = NULL;
inner->fetch_info = &single_context->inner;
inner->part_list_id = NULL;
part_cnt = manager->context_cnt;
assert (part_cnt > 1);
outer->part_list_id = (QFILE_LIST_ID **) db_private_alloc (thread_p, part_cnt * sizeof (QFILE_LIST_ID *));
if (outer->part_list_id == NULL)
{
goto error_exit;
}
memset (outer->part_list_id, 0, part_cnt * sizeof (QFILE_LIST_ID *));
inner->part_list_id = (QFILE_LIST_ID **) db_private_alloc (thread_p, part_cnt * sizeof (QFILE_LIST_ID *));
if (inner->part_list_id == NULL)
{
goto error_exit;
}
memset (inner->part_list_id, 0, part_cnt * sizeof (QFILE_LIST_ID *));
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
hjoin_clear_split_info (thread_p, manager, split_info, true);
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
/*
* hjoin_clear_split_info() -
* return: None.
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* split_info(in): Split information.
* clear_all(in): True to destroy list identifiers; false otherwise.
*/
static void
hjoin_clear_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_SPLIT_INFO * split_info,
bool clear_all)
{
HASHJOIN_INPUT_SPLIT_INFO *outer, *inner;
UINT32 part_cnt, part_index;
assert (thread_p != NULL);
assert (manager != NULL);
assert (split_info != NULL);
outer = &split_info->outer;
inner = &split_info->inner;
part_cnt = manager->context_cnt;
if (part_cnt <= 1)
{
assert (outer->part_list_id == NULL);
assert (inner->part_list_id == NULL);
return; /* nothing to do */
}
/* The list identifier is still used through contexts[].outer/inner.list_id,
* even though part_list_id is freed. */
if (outer->part_list_id != NULL)
{
if (clear_all)
{
for (part_index = 0; part_index < part_cnt; part_index++)
{
qfile_close_list (thread_p, outer->part_list_id[part_index]);
qfile_destroy_list (thread_p, outer->part_list_id[part_index]);
QFILE_FREE_AND_INIT_LIST_ID (outer->part_list_id[part_index]);
if (manager->contexts != NULL)
{
manager->contexts[part_index].outer.list_id = NULL;
}
}
}
db_private_free_and_init (thread_p, outer->part_list_id);
}
if (inner->part_list_id != NULL)
{
if (clear_all)
{
for (part_index = 0; part_index < part_cnt; part_index++)
{
qfile_close_list (thread_p, inner->part_list_id[part_index]);
qfile_destroy_list (thread_p, inner->part_list_id[part_index]);
QFILE_FREE_AND_INIT_LIST_ID (inner->part_list_id[part_index]);
if (manager->contexts != NULL)
{
manager->contexts[part_index].inner.list_id = NULL;
}
}
}
db_private_free_and_init (thread_p, inner->part_list_id);
}
}
/*
* hjoin_init_shared_split_info() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* shared_info(in/out): Shared split information.
*/
int
hjoin_init_shared_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_SHARED_SPLIT_INFO * shared_info)
{
UINT32 part_cnt, part_index;
UINT32 init_cnt = 0;
assert (thread_p != NULL);
assert (manager != NULL);
assert (shared_info != NULL);
part_cnt = manager->context_cnt;
assert (part_cnt > 1);
if (manager->px_worker_manager != NULL)
{
assert (shared_info->part_mutexes == NULL);
shared_info->part_mutexes = (std::mutex *) db_private_alloc (thread_p, part_cnt * sizeof (std::mutex));
if (shared_info->part_mutexes == NULL)
{
goto error_exit;
}
try
{
for (part_index = 0; part_index < part_cnt; part_index++)
{
placement_new < std::mutex > (&shared_info->part_mutexes[part_index]);
++init_cnt;
}
}
catch ( ...)
{
goto error_exit;
}
}
assert (shared_info->membuf_claimed.load () == false);
assert (shared_info->next_sector_index.load () == 0);
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
if (shared_info->part_mutexes != NULL)
{
for (part_index = 0; part_index < init_cnt; part_index++)
{
// *INDENT-OFF*
shared_info->part_mutexes[part_index].~mutex ();
// *INDENT-ON*
}
db_private_free_and_init (thread_p, shared_info->part_mutexes);
}
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
/*
* hjoin_clear_shared_split_info() -
* return: None.
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* shared_info(in): Shared split information.
*/
void
hjoin_clear_shared_split_info (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager,
HASHJOIN_SHARED_SPLIT_INFO * shared_info)
{
UINT32 part_cnt, part_index;
assert (thread_p != NULL);
assert (manager != NULL);
assert (shared_info != NULL);
/* NOTE: sector_info must be freed BEFORE the early-return below.
* Do not move this call into the (part_cnt > 1) branch — doing so would leak
* sectors/tfiles arrays when part_cnt <= 1. */
qfile_free_list_sector_info (thread_p, &shared_info->sector_info);
part_cnt = manager->context_cnt;
if (part_cnt <= 1)
{
assert (shared_info->part_mutexes == NULL);
return; /* nothing more to do */
}
if (shared_info->part_mutexes != NULL)
{
for (part_index = 0; part_index < part_cnt; part_index++)
{
// *INDENT-OFF*
shared_info->part_mutexes[part_index].~mutex ();
// *INDENT-ON*
}
db_private_free_and_init (thread_p, shared_info->part_mutexes);
}
}
/*
* hjoin_init_context() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in/out): Hash join context to initialize.
*/
static int
hjoin_init_context (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context)
{
HASHJOIN_FETCH_INFO *outer, *inner;
HASHJOIN_FETCH_INFO *build = NULL;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
assert (!thread_is_on_trace (thread_p) || context->stats != NULL);
outer = &context->outer;
inner = &context->inner;
assert (outer->list_id != NULL && outer->list_id->tuple_cnt > 0);
assert (inner->list_id != NULL && inner->list_id->tuple_cnt > 0);
switch (manager->join_type)
{
case JOIN_INNER:
if (outer->list_id->tuple_cnt < inner->list_id->tuple_cnt)
{
context->build = outer;
context->probe = inner;
}
else if (outer->list_id->tuple_cnt == inner->list_id->tuple_cnt
&& outer->list_id->page_cnt < inner->list_id->page_cnt)
{
context->build = outer;
context->probe = inner;
}
else
{
context->build = inner;
context->probe = outer;
}
break;
case JOIN_LEFT:
outer->fill_record = &outer->tuple_record;
inner->fill_record = NULL;
context->build = inner;
context->probe = outer;
break;
case JOIN_RIGHT:
outer->fill_record = NULL;
inner->fill_record = &inner->tuple_record;
context->build = outer;
context->probe = inner;
break;
default:
/* impossible case */
assert_release_error (false);
goto error_exit;
}
build = context->build;
assert (build != NULL);
error = hjoin_scan_init (thread_p, &context->hash_scan, manager->key_cnt, build->list_id);
if (error != NO_ERROR)
{
goto error_exit;
}
if (thread_is_on_trace (thread_p))
{
context->stats->hash_method = context->hash_scan.hash_list_scan_type;
context->stats->swap_join_inputs = (context->build == outer) ? true : false;
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
hjoin_scan_clear (thread_p, &context->hash_scan);
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_clear_context() -
* return: None.
* thread_p(in): Thread entry.
* context(in): Hash join context to clear.
*/
static void
hjoin_clear_context (THREAD_ENTRY * thread_p, HASHJOIN_CONTEXT * context)
{
HASHJOIN_FETCH_INFO *outer, *inner;
assert (thread_p != NULL);
assert (context != NULL);
outer = &context->outer;
inner = &context->inner;
if (context->list_id != NULL)
{
qfile_close_list (thread_p, context->list_id);
qfile_destroy_list (thread_p, context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (context->list_id);
}
if (outer->list_id != NULL)
{
qfile_close_list (thread_p, outer->list_id);
qfile_destroy_list (thread_p, outer->list_id);
QFILE_FREE_AND_INIT_LIST_ID (outer->list_id);
}
if (inner->list_id != NULL)
{
qfile_close_list (thread_p, inner->list_id);
qfile_destroy_list (thread_p, inner->list_id);
QFILE_FREE_AND_INIT_LIST_ID (inner->list_id);
}
}
/*
* hjoin_destroy_qlist() -
* return: None.
* thread_p(in): Thread entry.
* context(in): Hash join context to destroy.
*/
static void
hjoin_destroy_qlist (THREAD_ENTRY * thread_p, HASHJOIN_CONTEXT * context)
{
HASHJOIN_FETCH_INFO *outer, *inner;
assert (thread_p != NULL);
assert (context != NULL);
outer = &context->outer;
inner = &context->inner;
if (outer->list_id != NULL)
{
qfile_close_list (thread_p, outer->list_id);
qfile_destroy_list (thread_p, outer->list_id);
}
if (inner->list_id != NULL)
{
qfile_close_list (thread_p, inner->list_id);
qfile_destroy_list (thread_p, inner->list_id);
}
}
/*
* hjoin_scan_init() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* hash_scan(in/out): Hash scan structure to initialize.
* key_cnt(in): Number of join columns.
* list_id(in): List identifier to be used as build input.
*/
static int
hjoin_scan_init (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan, int key_cnt, QFILE_LIST_ID * list_id)
{
UINT64 mem_limit;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (hash_scan != NULL);
assert (list_id != NULL && list_id->tuple_cnt > 0);
assert (key_cnt > 0);
mem_limit = prm_get_bigint_value (PRM_ID_MAX_HASH_LIST_SCAN_SIZE);
assert (mem_limit > 0);
assert (hash_scan->build_regu_list == NULL); /* Unused */
assert (hash_scan->probe_regu_list == NULL); /* Unused */
hash_scan->temp_key = qdata_alloc_hscan_key (thread_p, key_cnt, true);
if (hash_scan->temp_key == NULL)
{
goto error_exit;
}
hash_scan->temp_new_key = qdata_alloc_hscan_key (thread_p, key_cnt, true);
if (hash_scan->temp_new_key == NULL)
{
goto error_exit;
}
if ((UINT64) list_id->page_cnt * DB_PAGESIZE <= mem_limit)
{
#if HASHJOIN_DUMP_BUILD
fprintf (stdout, "\nHash Join Method: In Memory\n");
fprintf (stdout, " - Page Count: %d <= %lu\n", list_id->page_cnt, mem_limit / 16344);
#endif /* HASHJOIN_DUMP_BUILD */
hash_scan->hash_list_scan_type = HASH_METH_IN_MEM;
hash_scan->memory.hash_table = mht_create_hls ("Hash Join", list_id->tuple_cnt, NULL, NULL);
if (hash_scan->memory.hash_table == NULL)
{
goto error_exit;
}
hash_scan->memory.curr_hash_entry = NULL;
}
else if ((UINT64) list_id->tuple_cnt * (sizeof (HENTRY_HLS) + sizeof (QFILE_TUPLE_SIMPLE_POS)) <= mem_limit)
{
#if HASHJOIN_DUMP_BUILD
fprintf (stdout, "\nHash Join Method: Hybrid\n");
fprintf (stdout, " - Page Count: %d > %lu\n", list_id->page_cnt, mem_limit / 16344);
fprintf (stdout, " - Tuple Count: %ld <= %lu\n", list_id->tuple_cnt,
mem_limit / (sizeof (HENTRY_HLS) + sizeof (QFILE_TUPLE_SIMPLE_POS)));
#endif /* HASHJOIN_DUMP_BUILD */
hash_scan->hash_list_scan_type = HASH_METH_HYBRID;
hash_scan->memory.hash_table = mht_create_hls ("Hash Join", list_id->tuple_cnt, NULL, NULL);
if (hash_scan->memory.hash_table == NULL)
{
goto error_exit;
}
hash_scan->memory.curr_hash_entry = NULL;
}
else
{
#if HASHJOIN_DUMP_BUILD
fprintf (stdout, "\nHash Join Method: File\n");
fprintf (stdout, " - Page Count: %d > %lu\n", list_id->page_cnt, mem_limit / 16344);
fprintf (stdout, " - Tuple Count: %ld > %lu\n", list_id->tuple_cnt,
mem_limit / (sizeof (HENTRY_HLS) + sizeof (QFILE_TUPLE_SIMPLE_POS)));
#endif /* HASHJOIN_DUMP_BUILD */
hash_scan->hash_list_scan_type = HASH_METH_HASH_FILE;
hash_scan->file.hash_table = (FHSID *) db_private_alloc (thread_p, sizeof (FHSID));
if (hash_scan->file.hash_table == NULL)
{
goto error_exit;
}
if (fhs_create (thread_p, hash_scan->file.hash_table, list_id->tuple_cnt) == NULL)
{
goto error_exit;
}
hash_scan->file.curr_oid = OID_INITIALIZER;
hash_scan->file.is_dk_bucket = false;
}
hash_scan->curr_hash_key = 0;
hash_scan->need_coerce_type = false;
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
hjoin_scan_clear (thread_p, hash_scan);
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_scan_clear() -
* return: None.
* thread_p(in): Thread entry.
* hash_scan(in): Hash scan structure to clear.
*/
static void
hjoin_scan_clear (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan)
{
assert (thread_p != NULL);
assert (hash_scan != NULL);
if (hash_scan->temp_key != NULL)
{
qdata_free_hscan_key (thread_p, hash_scan->temp_key, hash_scan->temp_key->val_count);
hash_scan->temp_key = NULL;
}
if (hash_scan->temp_new_key != NULL)
{
qdata_free_hscan_key (thread_p, hash_scan->temp_new_key, hash_scan->temp_new_key->val_count);
hash_scan->temp_new_key = NULL;
}
switch (hash_scan->hash_list_scan_type)
{
case HASH_METH_IN_MEM:
case HASH_METH_HYBRID:
if (hash_scan->memory.hash_table != NULL)
{
mht_clear_hls (hash_scan->memory.hash_table, qdata_free_hscan_entry, (void *) thread_p);
mht_destroy_hls (hash_scan->memory.hash_table);
hash_scan->memory.hash_table = NULL;
}
break;
case HASH_METH_HASH_FILE:
if (hash_scan->file.hash_table != NULL)
{
fhs_destroy (thread_p, hash_scan->file.hash_table);
db_private_free_and_init (thread_p, hash_scan->file.hash_table);
}
break;
case HASH_METH_NOT_USE:
default:
/* Nothing to do */
break;
}
hash_scan->hash_list_scan_type = HASH_METH_NOT_USE;
}
/*
* hjoin_check_empty_inputs() -
* return: One of the following HASHJOIN_STATUS values:
* - HASHJOIN_STATUS_END: Inner join with one empty input, or outer join with empty preserved side.
* - HASHJOIN_STATUS_FILL_NULL_VALUES: Outer join with empty null-supplying side.
* - HASHJOIN_STATUS_TRY: Both inputs are non-empty; proceed with the join.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
*/
static HASHJOIN_STATUS
hjoin_check_empty_inputs (HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context)
{
HASHJOIN_FETCH_INFO *outer, *inner;
INT64 outer_tuple_cnt, inner_tuple_cnt;
HASHJOIN_STATUS status;
assert (manager != NULL);
assert (context != NULL);
outer = &context->outer;
inner = &context->inner;
/* When aptr_list is executed in qexec_execute_mainblock_internal,
* it checks the results from outer_xasl and inner_xasl in merge_info.
* If either has no result, the other is skipped,
* and the skipped node can have a type count of 0 in list_id.type_list. */
if (outer->list_id == NULL || inner->list_id == NULL)
{
return HASHJOIN_STATUS_END;
}
outer_tuple_cnt = outer->list_id->tuple_cnt;
inner_tuple_cnt = inner->list_id->tuple_cnt;
/* HASHJOIN_STATUS_END must be checked first. */
switch (manager->join_type)
{
case JOIN_INNER:
status = (outer_tuple_cnt == 0 || inner_tuple_cnt == 0) ? HASHJOIN_STATUS_END : HASHJOIN_STATUS_TRY;
break;
case JOIN_LEFT:
status =
(outer_tuple_cnt == 0) ? HASHJOIN_STATUS_END : (inner_tuple_cnt ==
0) ? HASHJOIN_STATUS_FILL_NULL_VALUES : HASHJOIN_STATUS_TRY;
break;
case JOIN_RIGHT:
status =
(inner_tuple_cnt == 0) ? HASHJOIN_STATUS_END : (outer_tuple_cnt ==
0) ? HASHJOIN_STATUS_FILL_NULL_VALUES : HASHJOIN_STATUS_TRY;
break;
default:
/* impossible case */
assert_release_error (false);
status = HASHJOIN_STATUS_ERROR;
}
return status;
}
/*
* hjoin_fetch_key() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* fetch_info(in): Information for reading join column values.
* tuple_record(in): Tuple to read values from.
* key(in/out): Space for reading join column values.
* compare_key(in): Key for comparison with the read key. (can be NULL).
* need_skip_next(in/out): Set to true if the current tuple should be skipped.
*/
int
hjoin_fetch_key (THREAD_ENTRY * thread_p, HASHJOIN_FETCH_INFO * fetch_info, QFILE_TUPLE_RECORD * tuple_record,
HASH_SCAN_KEY * key, HASH_SCAN_KEY * compare_key, bool * need_skip_next)
{
TP_DOMAIN **domains, **coerce_domains;
int *value_indexes;
bool need_coerce_domains;
QFILE_TUPLE tuple_record_end;
QFILE_TUPLE tuple_value;
OR_BUF buf;
int value_size, value_index, key_index;
TP_DOMAIN_STATUS domain_status = DOMAIN_COMPATIBLE;
DB_VALUE pre_coerce_value;
DB_VALUE_COMPARE_RESULT compare_result = DB_EQ;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (fetch_info != NULL);
assert (fetch_info->input != NULL);
assert (tuple_record != NULL);
assert (tuple_record->tpl != NULL);
assert (key != NULL);
assert (need_skip_next != NULL);
assert (*need_skip_next == false);
domains = fetch_info->input->domains;
value_indexes = fetch_info->input->value_indexes;
coerce_domains = fetch_info->coerce_domains;
need_coerce_domains = fetch_info->need_coerce_domains;
assert (domains != NULL);
assert (value_indexes != NULL);
assert (coerce_domains != NULL);
db_make_null (&pre_coerce_value);
tuple_record_end = tuple_record->tpl + QFILE_GET_TUPLE_LENGTH (tuple_record->tpl);
/* Skip the tuple header */
tuple_value = tuple_record->tpl + QFILE_TUPLE_LENGTH_SIZE;
for (value_index = 0; tuple_value < tuple_record_end; value_index++)
{
for (key_index = 0; key_index < key->val_count; key_index++)
{
/*
* The same tuple value can be referenced by multiple keys.
*
* e.g. value_indexes[0] = 0
* value_indexes[1] = 1
* value_indexes[2] = 1
* value_indexes[3] = 3
*/
if (value_indexes[key_index] != value_index)
{
continue;
}
/* Skip the tuple if any value is NULL */
if (QFILE_GET_TUPLE_VALUE_FLAG (tuple_value) == V_UNBOUND)
{
goto skip_next;
}
value_size = QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value);
assert (value_size > 0);
/* Skip the tuple value header */
or_init (&buf, tuple_value + QFILE_TUPLE_VALUE_HEADER_SIZE, value_size);
pr_clear_value (key->values[key_index]);
if (need_coerce_domains && coerce_domains[key_index] != NULL
&& coerce_domains[key_index] != domains[key_index])
{
error =
domains[key_index]->type->data_readval (&buf, &pre_coerce_value, domains[key_index], -1, false, NULL,
0);
if (error != NO_ERROR)
{
goto error_exit;
}
domain_status = tp_value_coerce (&pre_coerce_value, key->values[key_index], coerce_domains[key_index]);
if (domain_status != DOMAIN_COMPATIBLE)
{
tp_domain_status_er_set (domain_status, ARG_FILE_LINE, &pre_coerce_value, coerce_domains[key_index]);
pr_clear_value (&pre_coerce_value);
goto error_exit;
}
pr_clear_value (&pre_coerce_value);
}
else
{
error =
domains[key_index]->type->data_readval (&buf, key->values[key_index], domains[key_index], -1, false,
NULL, 0);
if (error != NO_ERROR)
{
goto error_exit;
}
}
if (compare_key != NULL)
{
/* Skip the tuple if any value does not match */
compare_result = tp_value_compare (key->values[key_index], compare_key->values[key_index], 0, 0);
if (compare_result != DB_EQ)
{
goto skip_next;
}
}
}
/* Skip the current tuple value */
tuple_value += QFILE_TUPLE_VALUE_HEADER_SIZE + QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value);
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
skip_next:
*need_skip_next = true;
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_update_tuple_hash_key() -
* return: None
* thread_p(in): Thread entry.
* tuple_record(in): Tuple containing the hash key tuple value.
* hash_key(in): Hash key to store in the tuple value.
*/
void
hjoin_update_tuple_hash_key (THREAD_ENTRY * thread_p, QFILE_TUPLE_RECORD * tuple_record, UINT32 hash_key)
{
QFILE_TUPLE tuple_value;
assert (thread_p != NULL);
assert (tuple_record != NULL);
tuple_value = tuple_record->tpl + QFILE_TUPLE_LENGTH_SIZE;
assert (QFILE_GET_TUPLE_VALUE_FLAG (tuple_value) == V_BOUND);
assert (QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value) == MAX_ALIGNMENT);
tuple_value += QFILE_TUPLE_VALUE_HEADER_SIZE;
assert (OR_GET_INT (tuple_value) == -1);
OR_PUT_INT (tuple_value, hash_key);
}
/*
* hjoin_build() -
* return: Error code (NO_ERROR if successful, error code otherwise)
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
*/
static int
hjoin_build (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context)
{
SCAN_CODE scan_code;
bool need_skip_next = false;
HASHJOIN_FETCH_INFO *build = NULL;
QFILE_TUPLE tuple_value;
HASH_LIST_SCAN *hash_scan;
HASH_METHOD hash_method;
HASH_SCAN_KEY *key;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
HASHJOIN_STATS *stats = context->stats;
HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
#if HASHJOIN_PROFILE_TIME
HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
assert (!thread_is_on_trace (thread_p) || stats != NULL);
build = context->build;
assert (build != NULL);
assert (build->list_scan_id.status != S_CLOSED);
// *INDENT-OFF*
build->tuple_record = { NULL, 0 };
// *INDENT-ON*
hash_scan = &context->hash_scan;
hash_method = hash_scan->hash_list_scan_type;
assert (hash_method != HASH_METH_NOT_USE);
key = hash_scan->temp_key;
assert (key != NULL);
if (thread_is_on_trace (thread_p))
{
hjoin_trace_start (thread_p, &start_stats);
}
if (manager->context_cnt == 0) /* HASHJOIN_STATUS_SINGLE */
{
while ((scan_code =
qfile_scan_list_next (thread_p, &build->list_scan_id, &build->tuple_record, PEEK)) == S_SUCCESS)
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_BUILD_FETCH);
error =
hjoin_fetch_key (thread_p, build, &build->tuple_record, key, NULL /* compare_key */ , &need_skip_next);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_BUILD_FETCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
need_skip_next = false; /* init */
continue;
}
else
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_BUILD_HASH);
hash_scan->curr_hash_key = qdata_hash_scan_key (key, UINT_MAX, hash_method);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_BUILD_HASH);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_BUILD_INSERT);
error = hjoin_build_key (thread_p, hash_scan, &build->list_scan_id, &build->tuple_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_BUILD_INSERT);
if (error != NO_ERROR)
{
break; /* error_exit */
}
}
} /* while (qfile_scan_list_next (list_scan_id)) */
}
else
{
while ((scan_code =
qfile_scan_list_next (thread_p, &build->list_scan_id, &build->tuple_record, PEEK)) == S_SUCCESS)
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_BUILD_FETCH);
tuple_value = build->tuple_record.tpl + QFILE_TUPLE_LENGTH_SIZE;
assert (QFILE_GET_TUPLE_VALUE_FLAG (tuple_value) == V_BOUND);
assert (QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value) == MAX_ALIGNMENT);
tuple_value += QFILE_TUPLE_VALUE_HEADER_LENGTH;
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_BUILD_FETCH);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_BUILD_HASH);
hash_scan->curr_hash_key = (UINT32) OR_GET_INT (tuple_value);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_BUILD_HASH);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_BUILD_INSERT);
error = hjoin_build_key (thread_p, hash_scan, &build->list_scan_id, &build->tuple_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_BUILD_INSERT);
if (error != NO_ERROR)
{
break; /* error_exit */
}
} /* while (qfile_scan_list_next (list_scan_id)) */
}
if (thread_is_on_trace (thread_p))
{
hjoin_trace_end (thread_p, &stats->build, &start_stats);
stats->build.read_rows = build->list_id->tuple_cnt;
assert (stats->build.read_keys == 0);
stats->build.qualified_rows = build->list_id->tuple_cnt;
#if HASHJOIN_COLLISION_RATE
if (hash_scan->hash_list_scan_type == HASH_METH_IN_MEM || hash_scan->hash_list_scan_type == HASH_METH_HYBRID)
{
stats->collision_rate = (double) hash_scan->memory.hash_table->ncollisions / build->list_id->tuple_cnt;
}
else
{
stats->collision_rate = 0;
}
#endif /* HASHJOIN_COLLISION_RATE */
}
/* qfile_close_scan is called by the caller. */
if (scan_code == S_ERROR || error != NO_ERROR)
{
error = (error == NO_ERROR) ? er_errid () : error;
goto error_exit;
}
#if HASHJOIN_DUMP_HASH_TABLE
if (build->list_id->tuple_cnt <= DUMP_HASH_TABLE_LIMIT)
{
#if defined (SERVER_MODE)
pthread_mutex_lock (&manager->dump_hash_table_mutex);
#endif /* defined (SERVER_MODE) */
HJOIN_DUMP_HASH_TABLE (thread_p, hash_scan, build->list_id);
#if defined (SERVER_MODE)
pthread_mutex_unlock (&manager->dump_hash_table_mutex);
#endif /* defined (SERVER_MODE) */
}
#endif /* HASHJOIN_DUMP_HASH_TABLE */
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
return error;
}
/*
* hjoin_build_key() -
* return: Error code (NO_ERROR if successful, error code otherwise)
* thread_p(in): Thread entry.
* hash_scan(in): Hash scan structure used for hash table operations.
* list_scan_id(in): Scan identifier for the build input.
* tuple_record(in): Tuple to be inserted into the hash table
*/
static int
hjoin_build_key (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan, QFILE_LIST_SCAN_ID * list_scan_id,
QFILE_TUPLE_RECORD * tuple_record)
{
HASH_SCAN_VALUE *hash_value = NULL;
TFTID tftid;
assert (thread_p != NULL);
assert (hash_scan != NULL);
assert (list_scan_id != NULL);
assert (tuple_record != NULL && tuple_record->tpl != NULL);
switch (hash_scan->hash_list_scan_type)
{
case HASH_METH_IN_MEM:
assert (hash_scan->memory.hash_table != NULL);
hash_value = qdata_alloc_hscan_value (thread_p, tuple_record->tpl);
if (hash_value == NULL)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
if (mht_put_hls (hash_scan->memory.hash_table, (void *) &hash_scan->curr_hash_key, (void *) hash_value) == NULL)
{
qdata_free_hscan_value (thread_p, hash_value);
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
break;
case HASH_METH_HYBRID:
assert (hash_scan->memory.hash_table != NULL);
hash_value = qdata_alloc_hscan_value_OID (thread_p, list_scan_id);
if (hash_value == NULL)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
if (mht_put_hls (hash_scan->memory.hash_table, (void *) &hash_scan->curr_hash_key, (void *) hash_value) == NULL)
{
qdata_free_hscan_value (thread_p, hash_value);
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
break;
case HASH_METH_HASH_FILE:
assert (hash_scan->file.hash_table != NULL);
SET_TFTID (tftid, list_scan_id->curr_vpid.volid, list_scan_id->curr_vpid.pageid, list_scan_id->curr_offset);
if (fhs_insert (thread_p, hash_scan->file.hash_table, (void *) &hash_scan->curr_hash_key, &tftid) == NULL)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
break;
case HASH_METH_NOT_USE:
[[fallthrough]];
default:
/* impossible case */
assert_release_error (false);
return er_errid ();
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
}
/*
* hjoin_probe() -
* return: Error code (NO_ERROR if successful, error code otherwise)
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
* list_id(in/out): List identifier containing the join result.
*/
static int
hjoin_probe (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context, QFILE_LIST_ID * list_id)
{
QFILE_TUPLE_RECORD overflow_record = { NULL, 0 };
SCAN_CODE scan_code;
bool need_skip_next = false;
HASHJOIN_FETCH_INFO *outer, *inner;
HASHJOIN_FETCH_INFO *build = NULL, *probe = NULL;
QFILE_TUPLE tuple_value;
HASH_LIST_SCAN *hash_scan;
HASH_METHOD hash_method;
HASH_SCAN_KEY *key, *found_key;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
assert (list_id != NULL);
HASHJOIN_STATS *stats = context->stats;
HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
#if HASHJOIN_PROFILE_TIME
HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
assert (!thread_is_on_trace (thread_p) || stats != NULL);
outer = &context->outer;
inner = &context->inner;
assert (outer->list_scan_id.status != S_CLOSED);
assert (inner->list_scan_id.status != S_CLOSED);
build = context->build;
probe = context->probe;
assert (build != NULL);
assert (probe != NULL);
// *INDENT-OFF*
probe->tuple_record = { NULL, 0 };
build->tuple_record = { NULL, 0 };
// *INDENT-ON*
hash_scan = &context->hash_scan;
hash_method = hash_scan->hash_list_scan_type;
assert (hash_method != HASH_METH_NOT_USE);
key = hash_scan->temp_key;
found_key = hash_scan->temp_new_key;
assert (key != NULL);
assert (found_key != NULL);
if (thread_is_on_trace (thread_p))
{
hjoin_trace_start (thread_p, &start_stats);
}
while ((scan_code = qfile_scan_list_next (thread_p, &probe->list_scan_id, &probe->tuple_record, PEEK)) == S_SUCCESS)
{
HJOIN_PRINT_TUPLE (&probe->list_scan_id, probe->tuple_record.tpl, HASHJOIN_PRINT_READ_KEY);
if (manager->context_cnt == 0) /* HASHJOIN_STATUS_SINGLE */
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
error = hjoin_fetch_key (thread_p, probe, &probe->tuple_record, key, NULL /* compare_key */ ,
&need_skip_next);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
need_skip_next = false; /* init */
continue;
}
else
{
/* fall through */
}
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
hash_scan->curr_hash_key = qdata_hash_scan_key (key, UINT_MAX, hash_method);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
}
else
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
tuple_value = probe->tuple_record.tpl + QFILE_TUPLE_LENGTH_SIZE;
assert (QFILE_GET_TUPLE_VALUE_FLAG (tuple_value) == V_BOUND);
assert (QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value) == MAX_ALIGNMENT);
tuple_value += QFILE_TUPLE_VALUE_HEADER_LENGTH;
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
hash_scan->curr_hash_key = (UINT32) OR_GET_INT (tuple_value);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
}
do
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_SEARCH);
error = hjoin_probe_key (thread_p, hash_scan, &build->list_scan_id, &build->tuple_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_SEARCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
if (build->tuple_record.tpl == NULL)
{
break; /* not found */
}
if (thread_is_on_trace (thread_p))
{
stats->probe.read_keys++; /* found */
}
if (manager->context_cnt != 0) /* HASHJOIN_STATUS_PARTITION or HASHJOIN_STATUS_PARALLEL */
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
error = hjoin_fetch_key (thread_p, probe, &probe->tuple_record, key, NULL /* compare_key */ ,
&need_skip_next);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
need_skip_next = false; /* init */
/* impossible case */
assert_release_error (false);
error = er_errid ();
break;
}
else
{
/* fall through */
}
}
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_MATCH);
error = hjoin_fetch_key (thread_p, build, &build->tuple_record, found_key, key /* compare_key */ ,
&need_skip_next);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_MATCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
HJOIN_PRINT_TUPLE (&build->list_scan_id, build->tuple_record.tpl, HASHJOIN_PRINT_NOT_MATCHED_KEY);
need_skip_next = false; /* init */
continue;
}
else
{
/* fall through */
}
HJOIN_PRINT_TUPLE (&build->list_scan_id, build->tuple_record.tpl, HASHJOIN_PRINT_QUALIFIED_KEY);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
error =
hjoin_merge_tuple_to_list_id (thread_p, list_id, &outer->tuple_record, &inner->tuple_record,
manager->merge_info, &overflow_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
if (error != NO_ERROR)
{
break; /* error_exit */
}
}
while (true);
if (error != NO_ERROR)
{
break; /* error_exit */
}
} /* while (qfile_scan_list_next (list_scan_id)) */
if (thread_is_on_trace (thread_p))
{
hjoin_trace_end (thread_p, &stats->probe, &start_stats);
stats->probe.read_rows = probe->list_id->tuple_cnt;
stats->probe.qualified_rows = list_id->tuple_cnt;
}
/* qfile_close_scan is called by the caller. */
if (scan_code == S_ERROR || error != NO_ERROR)
{
error = (error == NO_ERROR) ? er_errid () : error;
goto error_exit;
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
cleanup:
if (overflow_record.tpl != NULL)
{
db_private_free_and_init (thread_p, overflow_record.tpl);
}
return error;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
goto cleanup;
}
/*
* hjoin_outer_probe() -
* return: Error code (NO_ERROR if successful, error code otherwise)
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
* context(in): Hash join context containing per-partition state.
* list_id(in/out): List identifier containing the join result.
*/
static int
hjoin_outer_probe (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager, HASHJOIN_CONTEXT * context,
QFILE_LIST_ID * list_id)
{
QFILE_TUPLE_RECORD overflow_record = { NULL, 0 };
SCAN_CODE scan_code;
bool need_skip_next = false;
bool any_record_added;
HASHJOIN_FETCH_INFO *outer, *inner;
HASHJOIN_FETCH_INFO *build = NULL, *probe = NULL;
QFILE_TUPLE tuple_value;
HASH_LIST_SCAN *hash_scan;
HASH_METHOD hash_method;
HASH_SCAN_KEY *key, *found_key;
int error = NO_ERROR, save_error = NO_ERROR;
assert (thread_p != NULL);
assert (manager != NULL);
assert (context != NULL);
assert (list_id != NULL);
HASHJOIN_STATS *stats = context->stats;
HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
#if HASHJOIN_PROFILE_TIME
HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
assert (!thread_is_on_trace (thread_p) || stats != NULL);
outer = &context->outer;
inner = &context->inner;
assert (outer->list_scan_id.status != S_CLOSED);
assert (inner->list_scan_id.status != S_CLOSED);
assert (outer->fill_record == NULL || outer->fill_record->tpl == NULL);
build = context->build;
probe = context->probe;
assert (build != NULL);
assert (probe != NULL);
// *INDENT-OFF*
probe->tuple_record = { NULL, 0 };
build->tuple_record = { NULL, 0 };
// *INDENT-ON*
hash_scan = &context->hash_scan;
hash_method = hash_scan->hash_list_scan_type;
assert (hash_method != HASH_METH_NOT_USE);
key = hash_scan->temp_key;
found_key = hash_scan->temp_new_key;
assert (key != NULL);
assert (found_key != NULL);
if (thread_is_on_trace (thread_p))
{
hjoin_trace_start (thread_p, &start_stats);
}
while ((scan_code = qfile_scan_list_next (thread_p, &probe->list_scan_id, &probe->tuple_record, PEEK)) == S_SUCCESS)
{
HJOIN_PRINT_TUPLE (&probe->list_scan_id, probe->tuple_record.tpl, HASHJOIN_PRINT_READ_KEY);
if (manager->context_cnt == 0) /* HASHJOIN_STATUS_SINGLE */
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
error = hjoin_fetch_key (thread_p, probe, &probe->tuple_record, key, NULL /* compare_key */ ,
&need_skip_next);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
HJOIN_PRINT_TUPLE (&probe->list_scan_id, probe->tuple_record.tpl, HASHJOIN_PRINT_FILL_EMPTY_KEY);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
error =
hjoin_merge_tuple_to_list_id (thread_p, list_id, outer->fill_record, inner->fill_record,
manager->merge_info, &overflow_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
if (error != NO_ERROR)
{
break; /* error_exit */
}
need_skip_next = false; /* init */
continue;
} /* else if (need_skip_next) */
else
{
/* fall through */
}
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
hash_scan->curr_hash_key = qdata_hash_scan_key (key, UINT_MAX, hash_method);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
}
else
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
tuple_value = probe->tuple_record.tpl + QFILE_TUPLE_LENGTH_SIZE;
assert (QFILE_GET_TUPLE_VALUE_FLAG (tuple_value) == V_BOUND);
assert (QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value) == MAX_ALIGNMENT);
tuple_value += QFILE_TUPLE_VALUE_HEADER_LENGTH;
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
hash_scan->curr_hash_key = (UINT32) OR_GET_INT (tuple_value);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_HASH);
}
any_record_added = false;
do
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_SEARCH);
error = hjoin_probe_key (thread_p, hash_scan, &build->list_scan_id, &build->tuple_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_SEARCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
if (build->tuple_record.tpl == NULL)
{
break; /* not found */
}
if (thread_is_on_trace (thread_p))
{
stats->probe.read_keys++; /* found */
}
if (manager->context_cnt != 0) /* HASHJOIN_STATUS_PARTITION or HASHJOIN_STATUS_PARALLEL */
{
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
error = hjoin_fetch_key (thread_p, probe, &probe->tuple_record, key, NULL /* compare_key */ ,
&need_skip_next);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_FETCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
need_skip_next = false; /* init */
/* impossible case */
assert_release_error (false);
save_error = er_errid ();
HJOIN_PRINT_TUPLE (&probe->list_scan_id, probe->tuple_record.tpl, HASHJOIN_PRINT_FILL_EMPTY_KEY);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
error =
hjoin_merge_tuple_to_list_id (thread_p, list_id, outer->fill_record, inner->fill_record,
manager->merge_info, &overflow_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
if (error != NO_ERROR)
{
break; /* error_exit */
}
error = save_error;
any_record_added = true; /* meaningless */
break;
}
else
{
/* fall through */
}
}
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_MATCH);
error = hjoin_fetch_key (thread_p, build, &build->tuple_record, found_key, key /* compare_key */ ,
&need_skip_next);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_MATCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
else if (need_skip_next)
{
HJOIN_PRINT_TUPLE (&build->list_scan_id, build->tuple_record.tpl, HASHJOIN_PRINT_NOT_MATCHED_KEY);
need_skip_next = false; /* init */
continue;
}
else
{
/* fall through */
}
if (context->during_join_pred != NULL)
{
DB_LOGICAL ev_res = V_UNKNOWN;
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_MATCH);
do
{
error =
fetch_val_list (thread_p, probe->regu_list_pred, context->val_descr, NULL, NULL,
probe->tuple_record.tpl, PEEK);
if (error != NO_ERROR)
{
break; /* error_exit */
}
error =
fetch_val_list (thread_p, build->regu_list_pred, context->val_descr, NULL, NULL,
build->tuple_record.tpl, PEEK);
if (error != NO_ERROR)
{
break; /* error_exit */
}
ev_res = eval_pred (thread_p, context->during_join_pred, context->val_descr, NULL);
if (ev_res == V_ERROR)
{
error = ER_FAILED;
break; /* error_exit */
}
}
while (false);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_MATCH);
if (error != NO_ERROR)
{
break; /* error_exit */
}
/* Search the next hash entry if additional conditions are not satisfied */
if (ev_res != V_TRUE)
{
HJOIN_PRINT_TUPLE (&build->list_scan_id, build->tuple_record.tpl, HASHJOIN_PRINT_NOT_QUALIFIED_KEY);
assert (need_skip_next == false);
continue;
}
} /* if (context->during_join_pred != NULL) */
HJOIN_PRINT_TUPLE (&build->list_scan_id, build->tuple_record.tpl, HASHJOIN_PRINT_QUALIFIED_KEY);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
error =
hjoin_merge_tuple_to_list_id (thread_p, list_id, &outer->tuple_record, &inner->tuple_record,
manager->merge_info, &overflow_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
if (error != NO_ERROR)
{
break; /* error_exit */
}
any_record_added = true;
}
while (true);
if (error != NO_ERROR)
{
break; /* error_exit */
}
if (!any_record_added)
{
HJOIN_PRINT_TUPLE (&probe->list_scan_id, probe->tuple_record.tpl, HASHJOIN_PRINT_FILL_EMPTY_KEY);
HJOIN_PROFILE_START (thread_p, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
error =
hjoin_merge_tuple_to_list_id (thread_p, list_id, outer->fill_record, inner->fill_record,
manager->merge_info, &overflow_record);
HJOIN_PROFILE_END (thread_p, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_PROBE_ADD);
if (error != NO_ERROR)
{
break; /* error_exit */
}
} /* if (!any_record_added) */
} /* while (qfile_scan_list_next (probe_scan_id)) */
if (thread_is_on_trace (thread_p))
{
hjoin_trace_end (thread_p, &stats->probe, &start_stats);
stats->probe.read_rows = probe->list_id->tuple_cnt;
stats->probe.qualified_rows = list_id->tuple_cnt;
}
/* qfile_close_scan is called by the caller. */
if (scan_code == S_ERROR || error != NO_ERROR)
{
error = (error == NO_ERROR) ? er_errid () : error;
goto error_exit;
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
cleanup:
if (overflow_record.tpl != NULL)
{
db_private_free_and_init (thread_p, overflow_record.tpl);
}
return error;
error_exit:
if (error == NO_ERROR || er_errid () == NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
error = er_errid ();
}
goto cleanup;
}
/*
* hjoin_probe_key() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* hash_scan(in): Hash scan structure used for hash table operations.
* list_scan_id(in): Scan identifier for the probe input.
* tuple_record(in/out): Tuple found in the hash table.
*/
static int
hjoin_probe_key (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan, QFILE_LIST_SCAN_ID * list_scan_id,
QFILE_TUPLE_RECORD * tuple_record)
{
HASH_SCAN_VALUE *hash_value = NULL;
TFTID tftid;
EH_SEARCH eh_search;
QFILE_TUPLE_POSITION tuple_position;
SCAN_CODE scan_code;
assert (thread_p != NULL);
assert (hash_scan != NULL);
assert (list_scan_id != NULL);
assert (tuple_record != NULL);
switch (hash_scan->hash_list_scan_type)
{
case HASH_METH_IN_MEM:
assert (hash_scan->memory.hash_table != NULL);
if (tuple_record->tpl == NULL)
{
hash_value =
(HASH_SCAN_VALUE *) mht_get_hls (hash_scan->memory.hash_table, (void *) &hash_scan->curr_hash_key,
(void **) &hash_scan->memory.curr_hash_entry);
}
else
{
hash_value =
(HASH_SCAN_VALUE *) mht_get_next_hls (hash_scan->memory.hash_table, (void *) &hash_scan->curr_hash_key,
(void **) &hash_scan->memory.curr_hash_entry);
}
if (hash_value != NULL)
{
tuple_record->tpl = hash_value->tuple;
tuple_record->size = QFILE_GET_TUPLE_VALUE_LENGTH (tuple_record->tpl);
}
else
{
/* not found */
tuple_record->tpl = NULL;
tuple_record->size = 0;
}
break; /* HASH_METH_IN_MEM */
case HASH_METH_HYBRID:
assert (hash_scan->memory.hash_table != NULL);
if (tuple_record->tpl == NULL)
{
hash_value =
(HASH_SCAN_VALUE *) mht_get_hls (hash_scan->memory.hash_table, (void *) &hash_scan->curr_hash_key,
(void **) &hash_scan->memory.curr_hash_entry);
}
else
{
hash_value =
(HASH_SCAN_VALUE *) mht_get_next_hls (hash_scan->memory.hash_table,
(void *) &hash_scan->curr_hash_key,
(void **) &hash_scan->memory.curr_hash_entry);
}
if (hash_value != NULL)
{
MAKE_TUPLE_POSTION (tuple_position, hash_value->pos, list_scan_id);
scan_code = qfile_jump_scan_tuple_position (thread_p, list_scan_id, &tuple_position, tuple_record, PEEK);
if (scan_code != S_SUCCESS)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
}
else
{
/* not found */
tuple_record->tpl = NULL;
tuple_record->size = 0;
}
break; /* HASH_METH_HYBRID */
case HASH_METH_HASH_FILE:
assert (hash_scan->file.hash_table != NULL);
if (tuple_record->tpl == NULL)
{
eh_search = fhs_search (thread_p, hash_scan, &tftid);
}
else
{
eh_search = fhs_search_next (thread_p, hash_scan, &tftid);
}
if (eh_search == EH_KEY_FOUND)
{
MAKE_TFTID_TO_TUPLE_POSTION (tuple_position, tftid, list_scan_id);
scan_code = qfile_jump_scan_tuple_position (thread_p, list_scan_id, &tuple_position, tuple_record, PEEK);
if (scan_code != S_SUCCESS)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
}
else if (eh_search == EH_KEY_NOTFOUND)
{
/* not found */
tuple_record->tpl = NULL;
tuple_record->size = 0;
}
else
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
break; /* HASH_METH_HASH_FILE */
case HASH_METH_NOT_USE:
[[fallthrough]];
default:
/* impossible case */
assert_release_error (false);
return er_errid ();
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
}
/*
* hjoin_merge_tuple_to_list_id() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* list_id(in/out): List identifier to be merged.
* outer_record(in): Outer tuple to merge. (can be NULL).
* inner_record(in): Inner tuple to merge. (can be NULL).
* merge_info(in): Information used to merge the joined result.
* overflow_record(in/out): Space used for merging tuples too large to fit on a single page.
*/
static int
hjoin_merge_tuple_to_list_id (THREAD_ENTRY * thread_p, QFILE_LIST_ID * list_id,
QFILE_TUPLE_RECORD * outer_record,
QFILE_TUPLE_RECORD * inner_record, QFILE_LIST_MERGE_INFO * merge_info,
QFILE_TUPLE_RECORD * overflow_record)
{
QFILE_TUPLE_DESCRIPTOR *tuple_descriptor;
int max_record_size, max_unbound_size;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (list_id != NULL);
assert (outer_record != NULL || inner_record != NULL);
assert (merge_info != NULL);
assert (overflow_record != NULL);
max_unbound_size = QFILE_TUPLE_VALUE_HEADER_SIZE * (merge_info->ls_pos_cnt);
max_record_size = (outer_record != NULL) ? QFILE_GET_TUPLE_LENGTH (outer_record->tpl) : max_unbound_size;
max_record_size += (inner_record != NULL) ? QFILE_GET_TUPLE_LENGTH (inner_record->tpl) : max_unbound_size;
max_record_size = DB_ALIGN (max_record_size, MAX_ALIGNMENT);
if (max_record_size < QFILE_MAX_TUPLE_SIZE_IN_PAGE)
{
tuple_descriptor = &list_id->tpl_descr;
tuple_descriptor->tpl_size = max_record_size;
tuple_descriptor->tplrec1 = outer_record;
tuple_descriptor->tplrec2 = inner_record;
tuple_descriptor->merge_info = merge_info;
error = qfile_generate_tuple_into_list (thread_p, list_id, T_MERGE);
}
else
{
error = hjoin_merge_tuple (thread_p, outer_record, inner_record, merge_info, overflow_record);
if (error != NO_ERROR)
{
goto error_exit;
}
error = qfile_add_tuple_to_list (thread_p, list_id, overflow_record->tpl);
}
if (error != NO_ERROR)
{
goto error_exit;
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
error_exit:
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
/*
* hjoin_merge_tuple() -
* return: Error code (NO_ERROR if successful, error code otherwise).
* thread_p(in): Thread entry.
* outer_record(in): Outer tuple to merge. (can be NULL).
* inner_record(in): Inner tuple to merge. (can be NULL).
* merge_info(in): Information used to merge the joined result.
* overflow_record(in/out): Space used for merging tuples too large to fit on a single page.
*/
static int
hjoin_merge_tuple (THREAD_ENTRY * thread_p, QFILE_TUPLE_RECORD * outer_record,
QFILE_TUPLE_RECORD * inner_record, QFILE_LIST_MERGE_INFO * merge_info,
QFILE_TUPLE_RECORD * overflow_record)
{
QFILE_TUPLE_RECORD *tuple_record;
QFILE_TUPLE outer_record_end, inner_record_end, tuple_record_end;
QFILE_TUPLE tuple_value;
INT32 unbound_value[2] = { 0, 0 }; /* QFILE_TUPLE_VALUE_HEADER */
int available_size, realloc_size, offset, value_size;
int pos_index, value_index, skip_index;
int error = NO_ERROR;
assert (thread_p != NULL);
assert (outer_record != NULL || inner_record != NULL);
assert (merge_info != NULL);
assert (overflow_record != NULL);
QFILE_PUT_TUPLE_VALUE_FLAG ((char *) unbound_value, V_UNBOUND);
QFILE_PUT_TUPLE_VALUE_LENGTH ((char *) unbound_value, 0);
outer_record_end = outer_record->tpl + QFILE_GET_TUPLE_LENGTH (outer_record->tpl);
inner_record_end = inner_record->tpl + QFILE_GET_TUPLE_LENGTH (inner_record->tpl);
offset = QFILE_TUPLE_LENGTH_SIZE;
for (pos_index = 0; pos_index < merge_info->ls_pos_cnt; pos_index++)
{
if (merge_info->ls_outer_inner_list[pos_index] == QFILE_OUTER_LIST)
{
tuple_record = outer_record;
tuple_record_end = outer_record_end;
}
else if (merge_info->ls_outer_inner_list[pos_index] == QFILE_INNER_LIST)
{
tuple_record = inner_record;
tuple_record_end = inner_record_end;
}
else
{
/* impossible case */
assert_release_error (false);
return er_errid ();
}
if (tuple_record != NULL)
{
value_index = merge_info->ls_pos_list[pos_index];
tuple_value = tuple_record->tpl + QFILE_TUPLE_LENGTH_SIZE;
for (skip_index = 0; skip_index < value_index; skip_index++)
{
tuple_value += QFILE_TUPLE_VALUE_HEADER_SIZE + QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value);
}
if (tuple_value >= tuple_record_end)
{
/* impossible case */
assert (false);
error = ER_TF_BUFFER_OVERFLOW;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
return error;
}
}
else
{
tuple_value = (char *) unbound_value;
}
value_size = QFILE_TUPLE_VALUE_HEADER_SIZE + QFILE_GET_TUPLE_VALUE_LENGTH (tuple_value);
available_size = overflow_record->size - offset;
if (value_size > available_size)
{
realloc_size = CEIL_PTVDIV (overflow_record->size + (value_size - available_size), DB_PAGESIZE) * DB_PAGESIZE;
/* overflow_record is managed and cleaned up by the caller. */
error = qfile_reallocate_tuple (overflow_record, realloc_size);
if (error != NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
}
memcpy (overflow_record->tpl + offset, tuple_value, value_size);
offset += value_size;
} /* for (pos_index < merge_info->ls_pos_cnt) */
QFILE_PUT_TUPLE_LENGTH (overflow_record->tpl, offset);
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
}
/*
* hjoin_trace_start() -
* return: None.
* thread_p(in): Thread entry.
* start_stats(in/out): Profiling data captured at the start of the step.
*/
void
hjoin_trace_start (THREAD_ENTRY * thread_p, HASHJOIN_START_STATS * start_stats)
{
assert (thread_p != NULL);
assert (start_stats != NULL);
tsc_getticks (&start_stats->tick);
start_stats->fetches = perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_FETCHES);
start_stats->ioreads = perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_IOREADS);
}
/*
* hjoin_trace_end() -
* return: None.
* thread_p(in): Thread entry.
* stats(in/out): Profiling data to accumulate.
* start_stats(in): Profiling data captured at the start of the step.
*/
void
hjoin_trace_end (THREAD_ENTRY * thread_p, HASHJOIN_INPUT_STATS * stats, HASHJOIN_START_STATS * start_stats)
{
TSC_TICKS end_tick;
TSCTIMEVAL tv_diff;
assert (thread_p != NULL);
assert (stats != NULL);
assert (start_stats != NULL);
tsc_getticks (&end_tick);
tsc_elapsed_time_usec (&tv_diff, end_tick, start_stats->tick);
TSC_ADD_TIMEVAL (stats->elapsed_time, tv_diff);
stats->fetches += perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_FETCHES) - start_stats->fetches;
stats->ioreads += perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_IOREADS) - start_stats->ioreads;
}
#if HASHJOIN_PROFILE_TIME
/*
* hjoin_profile_start() -
* return: None.
* thread_p(in): Thread entry.
* start_stats(in/out): Profiling data captured at the start of the step.
* step(in): Hash join profiling step to measure.
*/
void
hjoin_profile_start (THREAD_ENTRY * thread_p, HASHJOIN_START_STATS * start_stats, HASHJOIN_PROFILE_STEP step)
{
assert (thread_p != NULL);
assert (start_stats != NULL);
tsc_getticks (&start_stats->tick);
start_stats->step = step;
switch (step)
{
case HASHJOIN_PROFILE_BUILD_FETCH:
case HASHJOIN_PROFILE_BUILD_HASH:
case HASHJOIN_PROFILE_BUILD_INSERT:
case HASHJOIN_PROFILE_PROBE_FETCH:
case HASHJOIN_PROFILE_PROBE_HASH:
case HASHJOIN_PROFILE_PROBE_SEARCH:
case HASHJOIN_PROFILE_PROBE_MATCH:
case HASHJOIN_PROFILE_PROBE_ADD:
/* nothing to do */
break;
case HASHJOIN_PROFILE_MERGE:
start_stats->fetches = perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_FETCHES);
start_stats->ioreads = perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_IOREADS);
break;
default:
/* impossible case */
assert (false);
break;
} /* switch (step) */
}
#endif /* HASHJOIN_PROFILE_TIME */
#if HASHJOIN_PROFILE_TIME
/*
* hjoin_profile_end() -
* return: None.
* thread_p(in): Thread entry.
* stats(in/out): Profiling data to accumulate.
* start_stats(in): Profiling data captured at the start of the step.
* step(in): Hash join profiling step being measured.
*/
void
hjoin_profile_end (THREAD_ENTRY * thread_p, HASHJOIN_PROFILE_STATS * stats,
HASHJOIN_START_STATS * start_stats, HASHJOIN_PROFILE_STEP step)
{
TSC_TICKS end_tick;
TSCTIMEVAL tv_diff;
assert (thread_p != NULL);
assert (stats != NULL);
assert (start_stats != NULL);
assert (start_stats->step == step);
tsc_getticks (&end_tick);
tsc_elapsed_time_usec (&tv_diff, end_tick, start_stats->tick);
switch (step)
{
case HASHJOIN_PROFILE_BUILD_FETCH:
TSC_ADD_TIMEVAL (stats->build.fetch, tv_diff);
break;
case HASHJOIN_PROFILE_BUILD_HASH:
TSC_ADD_TIMEVAL (stats->build.hash, tv_diff);
break;
case HASHJOIN_PROFILE_BUILD_INSERT:
TSC_ADD_TIMEVAL (stats->build.insert, tv_diff);
break;
case HASHJOIN_PROFILE_PROBE_FETCH:
TSC_ADD_TIMEVAL (stats->probe.fetch, tv_diff);
break;
case HASHJOIN_PROFILE_PROBE_HASH:
TSC_ADD_TIMEVAL (stats->probe.hash, tv_diff);
break;
case HASHJOIN_PROFILE_PROBE_SEARCH:
TSC_ADD_TIMEVAL (stats->probe.search, tv_diff);
break;
case HASHJOIN_PROFILE_PROBE_MATCH:
TSC_ADD_TIMEVAL (stats->probe.match, tv_diff);
break;
case HASHJOIN_PROFILE_PROBE_ADD:
TSC_ADD_TIMEVAL (stats->probe.add, tv_diff);
break;
case HASHJOIN_PROFILE_MERGE:
TSC_ADD_TIMEVAL (stats->merge.elapsed_time, tv_diff);
stats->merge.fetches += perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_FETCHES) - start_stats->fetches;
stats->merge.ioreads += perfmon_get_from_statistic (thread_p, PSTAT_PB_NUM_IOREADS) - start_stats->ioreads;
break;
default:
/* impossible case */
assert (false);
break;
} /* switch (step) */
}
#endif /* HASHJOIN_PROFILE_TIME */
/*
* hjoin_trace_merge_stats() -
* return: None.
* stats(in/out): Profiling data to be merged.
* context_stats(in): Profiling data per-partition.
*/
void
hjoin_trace_merge_stats (HASHJOIN_STATS * stats, HASHJOIN_STATS * context_stats)
{
assert (stats != NULL);
assert (context_stats != NULL);
if (stats == NULL || context_stats == NULL)
{
/* impossible case */
assert (false);
return;
}
TSC_ADD_TIMEVAL (stats->build.elapsed_time, context_stats->build.elapsed_time);
stats->build.fetches += context_stats->build.fetches;
stats->build.ioreads += context_stats->build.ioreads;
stats->build.read_rows += context_stats->build.read_rows;
stats->build.read_keys += context_stats->build.read_keys;
stats->build.qualified_rows += context_stats->build.qualified_rows;
#if HASHJOIN_COLLISION_RATE
stats->collision_rate = MAX (stats->collision_rate, context_stats->collision_rate);
#endif /* HASHJOIN_COLLISION_RATE */
#if HASHJOIN_PROFILE_TIME
TSC_ADD_TIMEVAL (stats->profile.build.fetch, context_stats->profile.build.fetch);
TSC_ADD_TIMEVAL (stats->profile.build.hash, context_stats->profile.build.hash);
TSC_ADD_TIMEVAL (stats->profile.build.insert, context_stats->profile.build.insert);
#endif /* HASHJOIN_PROFILE_TIME */
switch (context_stats->hash_method)
{
case HASH_METH_IN_MEM:
stats->use_hash_memory = true;
break;
case HASH_METH_HYBRID:
stats->use_hash_hybrid = true;
break;
case HASH_METH_HASH_FILE:
stats->use_hash_file = true;
break;
case HASH_METH_NOT_USE:
stats->use_hash_skip = true;
break;
default:
/* impossible case */
assert (false);
return;
}
TSC_ADD_TIMEVAL (stats->probe.elapsed_time, context_stats->probe.elapsed_time);
stats->probe.fetches += context_stats->probe.fetches;
stats->probe.ioreads += context_stats->probe.ioreads;
stats->probe.read_rows += context_stats->probe.read_rows;
stats->probe.read_keys += context_stats->probe.read_keys;
stats->probe.qualified_rows += context_stats->probe.qualified_rows;
#if HASHJOIN_PROFILE_TIME
TSC_ADD_TIMEVAL (stats->profile.probe.fetch, context_stats->profile.probe.fetch);
TSC_ADD_TIMEVAL (stats->profile.probe.hash, context_stats->profile.probe.hash);
TSC_ADD_TIMEVAL (stats->profile.probe.search, context_stats->profile.probe.search);
TSC_ADD_TIMEVAL (stats->profile.probe.match, context_stats->profile.probe.match);
TSC_ADD_TIMEVAL (stats->profile.probe.add, context_stats->profile.probe.add);
#endif /* HASHJOIN_PROFILE_TIME */
}
/*
* hjoin_trace_get_worker_stats() -
* return: Parallel worker stats at index.
* manager(in): Hash join manager containing shared state.
* index(in): Parallel worker index.
*/
UINT64 *
hjoin_trace_get_worker_stats (HASHJOIN_MANAGER * manager, int index)
{
assert (manager != NULL);
assert (manager->num_parallel_threads > 1);
assert (index >= 0 && index < manager->num_parallel_threads);
if (manager->px_worker_stats == NULL)
{
assert (false);
return NULL;
}
/* immutable */
static const int n_stat_values = perfmon_get_number_of_statistic_values ();
return manager->px_worker_stats + index * n_stat_values;
}
/*
* hjoin_trace_drain_worker_stats() -
* return: None.
* thread_p(in): Thread entry.
* manager(in): Hash join manager containing shared state.
*/
void
hjoin_trace_drain_worker_stats (THREAD_ENTRY * thread_p, HASHJOIN_MANAGER * manager)
{
UINT64 *worker_stats;
int task_cnt, task_index;
int stats_cnt, stats_index;
assert (thread_p->m_px_stats != NULL);
assert (manager != NULL);
assert (manager->num_parallel_threads > 1);
assert (manager->px_worker_stats != NULL);
/* immutable */
static const int offsets[] = {
pstat_Metadata[PSTAT_PB_NUM_FETCHES].start_offset,
pstat_Metadata[PSTAT_PB_NUM_IOREADS].start_offset,
pstat_Metadata[PSTAT_PB_PAGE_FIX_ACQUIRE_TIME_10USEC].start_offset
};
task_cnt = manager->num_parallel_threads;
stats_cnt = sizeof (offsets) / sizeof (offsets[0]);
for (task_index = 0; task_index < task_cnt; task_index++)
{
worker_stats = hjoin_trace_get_worker_stats (manager, task_index);
for (stats_index = 0; stats_index < stats_cnt; stats_index++)
{
const int offset = offsets[stats_index];
thread_p->m_px_stats[offset] += worker_stats[offset];
worker_stats[offset] = 0;
}
}
perfmon_merge_parallel_stats_to_tran_stats (thread_p);
}
#if HASHJOIN_DUMP_HASH_TABLE
/*
* hjoin_dump_hash_table() -
* return: None.
* thread_p(in): Thread entry.
* hash_scan(in): Hash scan structure containing the hash table.
* list_id(in): List identifier used as build input.
*/
static void
hjoin_dump_hash_table (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hash_scan, QFILE_LIST_ID * list_id)
{
assert (thread_p != NULL);
assert (hash_scan != NULL);
assert (list_id != NULL);
if (list_id->tuple_cnt > DUMP_HASH_TABLE_LIMIT)
{
return;
}
switch (hash_scan->hash_list_scan_type)
{
case HASH_METH_IN_MEM:
case HASH_METH_HYBRID:
assert (hash_scan->memory.hash_table != NULL);
mht_dump_hls (thread_p, stdout, hash_scan->memory.hash_table, 1, qdata_print_hash_scan_entry, &list_id->type_list,
(void *) &hash_scan->hash_list_scan_type);
printf ("temp file : tuple count = %ld, file_size = %dK\n", list_id->tuple_cnt, list_id->page_cnt * 16);
break;
case HASH_METH_HASH_FILE:
assert (hash_scan->file.hash_table != NULL);
fhs_dump (thread_p, hash_scan->file.hash_table);
break;
case HASH_METH_NOT_USE:
/* Nothing to do */
break;
default:
/* impossible case */
assert (false);
break;
}
}
#endif /* HASHJOIN_DUMP_HASH_TABLE */
#if !defined(NDEBUG) && HASHJOIN_DUMP_PROBE
/*
* hjoin_print_tuple() -
* return: None.
* list_scan_id(in): Scan identifier for the given tuple.
* tuple(in): Tuple to be printed.
* step(in): Step at which the tuple is printed.
*/
static void
hjoin_print_tuple (QFILE_LIST_SCAN_ID * list_scan_id, QFILE_TUPLE tuple, HASHJOIN_PRINT_STEP step)
{
assert (list_scan_id != NULL);
assert (tuple != NULL);
if (list_scan_id->list_id.tuple_cnt > DUMP_PROBE_LIMIT)
{
return;
}
switch (step)
{
case HASHJOIN_PRINT_READ_KEY:
fprintf (stdout, "\nRead Key (Probe): ");
break;
case HASHJOIN_PRINT_NOT_MATCHED_KEY:
fprintf (stdout, "\nNot Matched Key (Build): ");
break;
case HASHJOIN_PRINT_NOT_QUALIFIED_KEY:
fprintf (stdout, "\nNot Qualified Key (Build): ");
break;
case HASHJOIN_PRINT_QUALIFIED_KEY:
fprintf (stdout, "\nQualified Key (Build): ");
break;
case HASHJOIN_PRINT_FILL_EMPTY_KEY:
fprintf (stdout, "\nFill Empty Key (Probe): ");
break;
default:
/* impossible case */
assert (false);
/* Nothing to do */
break;
}
qfile_print_tuple (&list_scan_id->list_id.type_list, tuple);
}
#endif /* !NDEBUG && HASHJOIN_DUMP_PROBE */