File px_hash_join.cpp¶
File List > cubrid > src > query > parallel > px_hash_join > px_hash_join.cpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* px_hash_join.cpp
*/
#include "px_hash_join.hpp"
#include "px_hash_join_task_manager.hpp"
#include "error_manager.h" /* er_errid, NO_ERROR, assert_release_error, ASSERT_NO_ERROR_OR_INTERRUPTED */
#include "list_file.h" /* qfile_destroy_list, QFILE_FREE_AND_INIT_LIST_ID, qfile_collect_list_sector_info */
#include "query_manager.h" /* QMGR_TEMP_FILE (qmgr_temp_file) */
#include "storage_common.h" /* S_BEFORE, VPID_SET_NULL */
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace parallel_query
{
namespace hash_join
{
/*
* build_partitions
*/
int
build_partitions (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager, HASHJOIN_SPLIT_INFO *split_info)
{
HASHJOIN_INPUT_SPLIT_INFO *outer, *inner;
HASHJOIN_SHARED_SPLIT_INFO shared_info;
UINT32 task_cnt, task_index;
assert (manager != nullptr);
assert (split_info != nullptr);
HASHJOIN_STATS *stats = manager->single_context.stats;
HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
assert (!thread_is_on_trace (&thread_ref) || stats != nullptr);
outer = &split_info->outer;
inner = &split_info->inner;
task_cnt = manager->num_parallel_threads;
if (hjoin_init_shared_split_info (&thread_ref, manager, &shared_info) != NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
THREAD_ENTRY *main_thread_p = thread_get_main_thread (&thread_ref);
task_manager task_manager (manager->px_worker_manager, *main_thread_p);
split_task *task = nullptr;
if (thread_is_on_trace (&thread_ref))
{
hjoin_trace_start (&thread_ref, &start_stats);
}
/* collect data page sectors for outer relation */
if (qfile_collect_list_sector_info (&thread_ref, outer->fetch_info->list_id, &shared_info.sector_info) != NO_ERROR)
{
hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);
return er_errid ();
}
shared_info.membuf_claimed.store (false, std::memory_order_relaxed);
shared_info.next_sector_index.store (0, std::memory_order_relaxed);
for (task_index = 0; task_index < task_cnt; task_index++)
{
task = new split_task (task_manager, manager, outer, &shared_info, task_index);
task_manager.push_task (task);
}
task_manager.join ();
if (thread_is_on_trace (&thread_ref))
{
hjoin_trace_drain_worker_stats (&thread_ref, manager);
hjoin_trace_end (&thread_ref, &stats->split, &start_stats);
}
if (task_manager.has_error ())
{
/* cleanup */
hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);
assert_release_error (er_errid () != NO_ERROR);
task_manager.clear_interrupt (thread_ref);
return er_errid ();
}
if (thread_is_on_trace (&thread_ref))
{
hjoin_trace_start (&thread_ref, &start_stats);
}
/* collect data page sectors for inner relation
* (outer's sector_info is freed internally by qfile_collect_list_sector_info) */
if (qfile_collect_list_sector_info (&thread_ref, inner->fetch_info->list_id, &shared_info.sector_info) != NO_ERROR)
{
hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);
return er_errid ();
}
shared_info.membuf_claimed.store (false, std::memory_order_relaxed);
shared_info.next_sector_index.store (0, std::memory_order_relaxed);
for (task_index = 0; task_index < task_cnt; task_index++)
{
task = new split_task (task_manager, manager, inner, &shared_info, task_index);
task_manager.push_task (task);
}
task_manager.join ();
if (thread_is_on_trace (&thread_ref))
{
hjoin_trace_drain_worker_stats (&thread_ref, manager);
hjoin_trace_end (&thread_ref, &stats->split, &start_stats);
}
/* cleanup */
hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);
if (task_manager.has_error ())
{
assert_release_error (er_errid () != NO_ERROR);
task_manager.clear_interrupt (thread_ref);
return er_errid ();
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
}
/*
* execute_partitions
*/
int
execute_partitions (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager)
{
HASHJOIN_CONTEXT *current_context;
HASHJOIN_SHARED_JOIN_INFO shared_info;
UINT32 context_index;
UINT32 task_cnt, task_index;
int error = NO_ERROR;
assert (manager != nullptr);
HASHJOIN_STATS *stats = manager->single_context.stats;
HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
#if HASHJOIN_PROFILE_TIME
HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
assert (!thread_is_on_trace (&thread_ref) || stats != nullptr);
task_cnt = manager->num_parallel_threads;
THREAD_ENTRY *main_thread_p = thread_get_main_thread (&thread_ref);
task_manager task_manager (manager->px_worker_manager, *main_thread_p);
join_task *task = nullptr;
if (thread_is_on_trace (&thread_ref))
{
hjoin_trace_start (&thread_ref, &start_stats);
}
for (task_index = 0; task_index < task_cnt; task_index++)
{
task = new join_task (task_manager, manager, manager->contexts, &shared_info, task_index);
task_manager.push_task (task);
}
task_manager.join ();
if (thread_is_on_trace (&thread_ref))
{
hjoin_trace_drain_worker_stats (&thread_ref, manager);
hjoin_trace_end (&thread_ref, &stats->parallel, &start_stats);
stats->build.range_time.min = shared_info.build_range_time.min;
stats->build.range_time.max = shared_info.build_range_time.max;
stats->probe.range_time.min = shared_info.probe_range_time.min;
stats->probe.range_time.max = shared_info.probe_range_time.max;
}
if (task_manager.has_error ())
{
assert_release_error (er_errid () != NO_ERROR);
task_manager.clear_interrupt (thread_ref);
return er_errid ();
}
for (context_index = 0; context_index < manager->context_cnt; context_index++)
{
current_context = &manager->contexts[context_index];
if (thread_is_on_trace (&thread_ref))
{
hjoin_trace_merge_stats (stats, current_context->stats);
}
if (current_context->list_id == nullptr)
{
error = er_errid ();
if (error != NO_ERROR)
{
return error;
}
else
{
/* list_id can be NULL when the join result is empty.
* In this case, it is NO_ERROR. */
continue;
}
}
if (current_context->list_id->tuple_cnt == 0)
{
qfile_destroy_list (&thread_ref, current_context->list_id);
QFILE_FREE_AND_INIT_LIST_ID (current_context->list_id);
/* empty context */
continue;
}
HJOIN_PROFILE_START (&thread_ref, &profile_start_stats, HASHJOIN_PROFILE_MERGE);
error = hjoin_merge_qlist (&thread_ref, manager, current_context);
HJOIN_PROFILE_MERGE_END (&thread_ref, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_MERGE,
manager->single_context.list_id->tuple_cnt);
if (error != NO_ERROR)
{
assert_release_error (er_errid () != NO_ERROR);
return er_errid ();
}
}
ASSERT_NO_ERROR_OR_INTERRUPTED ();
return NO_ERROR;
}
} /* namespace hash_join */
} /* namespace parallel_query */