Skip to content

File px_hash_join.cpp

File List > cubrid > src > query > parallel > px_hash_join > px_hash_join.cpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_hash_join.cpp
 */

#include "px_hash_join.hpp"
#include "px_hash_join_task_manager.hpp"

#include "error_manager.h"      /* assert_release_error, er_errid, NO_ERROR, ... */
#include "list_file.h"          /* qfile_open_list, qfile_open_list_scan, qfile_close_scan, ... */
#include "query_manager.h"      /* QMGR_TEMP_FILE (qmgr_temp_file) */
#include "memory_alloc.h"       /* db_private_alloc, db_private_free_and_init */
#include "storage_common.h"     /* OID_INITIALIZER, S_CLOSED, VPID_SET_NULL, ... */

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_query
{
  namespace hash_join
  {
    /*
     * build_partitions
     */

    int
    build_partitions (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager, HASHJOIN_SPLIT_INFO *split_info)
    {
      HASHJOIN_INPUT_SPLIT_INFO *outer, *inner;
      HASHJOIN_SHARED_SPLIT_INFO shared_info;
      UINT32 task_cnt, task_index;
      int error = NO_ERROR;

      assert (manager != nullptr);
      assert (split_info != nullptr);

      HASHJOIN_STATS *stats = manager->single_context.stats;
      HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
      assert (!thread_is_on_trace (&thread_ref) || stats != nullptr);

      outer = &split_info->outer;
      inner = &split_info->inner;

      task_cnt = manager->num_parallel_threads;

      THREAD_ENTRY *main_thread_p = thread_get_main_thread (&thread_ref);
      task_manager task_manager (manager->px_worker_manager, *main_thread_p);
      split_task *task = nullptr;

      error = hjoin_init_shared_split_info (&thread_ref, manager, &shared_info);
      if (error != NO_ERROR)
    {
      goto error_exit;
    }

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_start (&thread_ref, &start_stats);
    }

      /* collect data page sectors for outer relation */
      error = qfile_open_list_sector_scan (&thread_ref, outer->fetch_info->list_id, &shared_info.sector_scan);
      if (error != NO_ERROR)
    {
      goto error_exit;
    }

      for (task_index = 0; task_index < task_cnt; task_index++)
    {
      task = new split_task (task_manager, manager, outer, &shared_info, task_index);
      task_manager.push_task (task);
    }

      task_manager.join ();

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_drain_worker_stats (&thread_ref, manager);
      hjoin_trace_end (&thread_ref, &stats->split, &start_stats);
    }

      if (task_manager.has_error ())
    {
      goto error_exit;
    }

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_start (&thread_ref, &start_stats);
    }

      /* collect data page sectors for inner relation
       * (outer's sector_info is freed internally by qfile_collect_list_sector_info) */
      error = qfile_open_list_sector_scan (&thread_ref, inner->fetch_info->list_id, &shared_info.sector_scan);
      if (error != NO_ERROR)
    {
      goto error_exit;
    }

      for (task_index = 0; task_index < task_cnt; task_index++)
    {
      task = new split_task (task_manager, manager, inner, &shared_info, task_index);
      task_manager.push_task (task);
    }

      task_manager.join ();

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_drain_worker_stats (&thread_ref, manager);
      hjoin_trace_end (&thread_ref, &stats->split, &start_stats);
    }

      if (task_manager.has_error ())
    {
      goto error_exit;
    }

      ASSERT_NO_ERROR_OR_INTERRUPTED ();

cleanup:
      hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);

      return error;

error_exit:
      task_manager.clear_interrupt (thread_ref);

      if (error == NO_ERROR || er_errid () == NO_ERROR)
    {
      assert_release_error (er_errid () != NO_ERROR);
      error = er_errid ();
    }

      goto cleanup;
    }

    /*
     * execute_partitions
     */

    int
    execute_partitions (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager)
    {
      HASHJOIN_CONTEXT *current_context;
      HASHJOIN_SHARED_JOIN_INFO shared_info;
      UINT32 context_index;
      UINT32 task_cnt, task_index;

      int error = NO_ERROR;

      assert (manager != nullptr);

      HASHJOIN_STATS *stats = manager->single_context.stats;
      HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
#if HASHJOIN_PROFILE_TIME
      HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
      assert (!thread_is_on_trace (&thread_ref) || stats != nullptr);

      task_cnt = manager->num_parallel_threads;

      THREAD_ENTRY *main_thread_p = thread_get_main_thread (&thread_ref);
      task_manager task_manager (manager->px_worker_manager, *main_thread_p);
      join_task *task = nullptr;

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_start (&thread_ref, &start_stats);
    }

      for (task_index = 0; task_index < task_cnt; task_index++)
    {
      task = new join_task (task_manager, manager, manager->contexts, &shared_info, task_index);
      task_manager.push_task (task);
    }

      task_manager.join ();

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_drain_worker_stats (&thread_ref, manager);
      hjoin_trace_end (&thread_ref, &stats->parallel, &start_stats);

      stats->build.range_elapsed_time.min = shared_info.build_range_time.min;
      stats->build.range_elapsed_time.max = shared_info.build_range_time.max;
      stats->probe.range.elapsed_time.min = shared_info.probe_range_time.min;
      stats->probe.range.elapsed_time.max = shared_info.probe_range_time.max;
    }

      if (task_manager.has_error ())
    {
      assert_release_error (er_errid () != NO_ERROR);
      task_manager.clear_interrupt (thread_ref);
      return er_errid ();
    }

      for (context_index = 0; context_index < manager->context_cnt; context_index++)
    {
      current_context = &manager->contexts[context_index];

      if (thread_is_on_trace (&thread_ref))
        {
          hjoin_trace_merge_stats (stats, current_context->stats, manager->single_context.status);
        }

      if (current_context->list_id == nullptr)
        {
          error = er_errid ();
          if (error != NO_ERROR)
        {
          return error;
        }
          else
        {
          /* list_id can be NULL when the join result is empty.
           * In this case, it is NO_ERROR. */
          continue;
        }
        }

      if (current_context->list_id->tuple_cnt == 0)
        {
          qfile_destroy_list (&thread_ref, current_context->list_id);
          QFILE_FREE_AND_INIT_LIST_ID (current_context->list_id);

          /* empty context */
          continue;
        }

      HJOIN_PROFILE_START (&thread_ref, &profile_start_stats, HASHJOIN_PROFILE_MERGE);
      error = hjoin_merge_qlist (&thread_ref, manager, current_context);
      HJOIN_PROFILE_MERGE_END (&thread_ref, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_MERGE,
                   (manager->single_context.list_id != nullptr) ? manager->single_context.list_id->tuple_cnt : 0);

      if (error != NO_ERROR)
        {
          assert_release_error (er_errid () != NO_ERROR);
          return er_errid ();
        }
    }

      ASSERT_NO_ERROR_OR_INTERRUPTED ();
      return NO_ERROR;
    }

    /*
     * parallel_probe
     */

    int
    init_context (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager, HASHJOIN_CONTEXT *context)
    {
      HASHJOIN_CONTEXT *single_context;
      int error = NO_ERROR;

      assert (manager != nullptr);
      assert (context != nullptr);

      single_context = &manager->single_context;

      context->outer.list_id = single_context->outer.list_id;
      context->outer.input = single_context->outer.input;
      context->outer.coerce_domains = single_context->outer.coerce_domains;
      context->outer.need_coerce_domains = single_context->outer.need_coerce_domains;
      context->outer.regu_list_pred = single_context->outer.regu_list_pred;

      context->inner.list_id = single_context->inner.list_id;
      context->inner.input = single_context->inner.input;
      context->inner.coerce_domains = single_context->inner.coerce_domains;
      context->inner.need_coerce_domains = single_context->inner.need_coerce_domains;
      context->inner.regu_list_pred = single_context->inner.regu_list_pred;

      assert (context->list_id == nullptr);

      /* Prevent faults when qfile_close_scan is called */
      context->outer.list_scan_id.status = S_CLOSED;
      context->inner.list_scan_id.status = S_CLOSED;

      switch (manager->join_type)
    {
    case JOIN_INNER:
      context->outer.fill_record = nullptr;
      context->inner.fill_record = nullptr;
      break;

    case JOIN_LEFT:
      context->outer.fill_record = &context->outer.tuple_record;
      context->inner.fill_record = nullptr;
      break;

    case JOIN_RIGHT:
      context->outer.fill_record = nullptr;
      context->inner.fill_record = &context->inner.tuple_record;
      break;

    default:
      /* impossible case */
      assert_release_error (false);
      goto error_exit;
    }

      if (single_context->build == &single_context->outer)
    {
      /* swap_join_inputs == true */
      context->build = &context->outer;
      context->probe = &context->inner;
    }
      else
    {
      /* swap_join_inputs == false */
      context->build = &context->inner;
      context->probe = &context->outer;
    }

      context->list_id = qfile_open_list (&thread_ref, &manager->type_list, nullptr,
                      manager->query_id, manager->qlist_flag, nullptr);
      if (context->list_id == nullptr)
    {
      goto error_exit;
    }

      context->during_join_pred = single_context->during_join_pred;
      context->val_descr = single_context->val_descr;

      context->status = HASHJOIN_STATUS_PARALLEL_PROBE;

      ASSERT_NO_ERROR_OR_INTERRUPTED ();
      return NO_ERROR;

error_exit:
      clear_context (thread_ref, context);

      if (error == NO_ERROR || er_errid () == NO_ERROR)
    {
      assert_release_error (er_errid () != NO_ERROR);
      error = er_errid ();
    }

      return error;
    }

    void
    clear_context (cubthread::entry &thread_ref, HASHJOIN_CONTEXT *context)
    {
      assert (context != nullptr);

      if (context->list_id != nullptr)
    {
      qfile_close_list (&thread_ref, context->list_id);
      qfile_destroy_list (&thread_ref, context->list_id);
      QFILE_FREE_AND_INIT_LIST_ID (context->list_id);
    }

      assert (context->outer.list_scan_id.curr_pgptr == nullptr);
      assert (context->inner.list_scan_id.curr_pgptr == nullptr);
    }

    int
    probe_prepare (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager)
    {
      HASHJOIN_CONTEXT *contexts = nullptr;
      HASHJOIN_STATS *context_stats = nullptr;
      UINT32 context_cnt, context_index;
      int error = NO_ERROR;

      assert (manager != nullptr);
      assert (manager->contexts == nullptr);
      assert (manager->context_cnt == 0);

      context_cnt = manager->num_parallel_threads;
      assert (context_cnt > 1);

      contexts = (HASHJOIN_CONTEXT *) db_private_alloc (&thread_ref, context_cnt * sizeof (HASHJOIN_CONTEXT));
      if (contexts == nullptr)
    {
      goto error_exit;
    }
      memset (contexts, 0, context_cnt * sizeof (HASHJOIN_CONTEXT));

      for (context_index = 0; context_index < context_cnt; context_index++)
    {
      error = init_context (thread_ref, manager, &contexts[context_index]);
      if (error != NO_ERROR)
        {
          goto error_exit;
        }

      manager->context_cnt++;
    }

      manager->contexts = contexts;

      if (thread_is_on_trace (&thread_ref))
    {
      context_stats = (HASHJOIN_STATS *) malloc (context_cnt * sizeof (HASHJOIN_STATS));
      if (context_stats == nullptr)
        {
          error = ER_OUT_OF_VIRTUAL_MEMORY;
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, context_cnt * sizeof (HASHJOIN_STATS));
          goto error_exit;
        }
      memset (context_stats, 0, context_cnt * sizeof (HASHJOIN_STATS));

      for (context_index = 0; context_index < context_cnt; context_index++)
        {
          contexts[context_index].stats = &context_stats[context_index];
        }

      assert (manager->stats_group != nullptr);
      manager->stats_group->context_stats = context_stats;
      manager->stats_group->context_cnt = context_cnt;
    }
      else
    {
      assert (manager->stats_group == nullptr);
    }

      ASSERT_NO_ERROR_OR_INTERRUPTED ();
      return NO_ERROR;

error_exit:
      if (contexts != nullptr)
    {
      for (context_index = 0; context_index < manager->context_cnt; context_index++)
        {
          clear_context (thread_ref, &contexts[context_index]);
        }

      db_private_free_and_init (&thread_ref, contexts);
    }

      if (thread_is_on_trace (&thread_ref))
    {
      if (context_stats != nullptr)
        {
          free_and_init (context_stats);
        }

      assert (manager->stats_group != nullptr);
      manager->stats_group->context_stats = nullptr;
      manager->stats_group->context_cnt = 0;
    }
      else
    {
      assert (context_stats == nullptr);
      assert (manager->stats_group == nullptr);
    }

      manager->contexts = nullptr;
      manager->context_cnt = 0;

      if (error == NO_ERROR || er_errid () == NO_ERROR)
    {
      assert_release_error (er_errid () != NO_ERROR);
      error = er_errid ();
    }

      return error;
    }

    int
    probe_execute (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager)
    {
      HASHJOIN_CONTEXT *contexts = nullptr, *current_context;
      HASHJOIN_SHARED_PROBE_INFO shared_info;
      UINT32 context_index;
      UINT32 task_cnt, task_index;
      int error = NO_ERROR;

      assert (manager != nullptr);
      assert (manager->single_context.status == HASHJOIN_STATUS_PARALLEL_PROBE);
      assert (manager->px_worker_manager != nullptr);

      HASHJOIN_STATS *stats = manager->single_context.stats;
      HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
#if HASHJOIN_PROFILE_TIME
      HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
      assert (!thread_is_on_trace (&thread_ref) || stats != nullptr);

      contexts = manager->contexts;

      task_cnt = manager->num_parallel_threads;

      THREAD_ENTRY *main_thread_p = thread_get_main_thread (&thread_ref);
      task_manager task_manager (manager->px_worker_manager, *main_thread_p);
      probe_task *task = nullptr;

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_start (&thread_ref, &start_stats);
    }

      /* collect data page sectors for probe relation */
      error = qfile_open_list_sector_scan (&thread_ref, manager->single_context.probe->list_id, &shared_info.sector_scan);
      if (error != NO_ERROR)
    {
      goto error_exit;
    }

      for (task_index = 0; task_index < task_cnt; task_index++)
    {
      task = new probe_task (task_manager, manager, &contexts[task_index], &shared_info, task_index);
      task_manager.push_task (task);
    }

      task_manager.join ();

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_drain_worker_stats (&thread_ref, manager);
      hjoin_trace_end (&thread_ref, &stats->probe, &start_stats);

      stats->probe.range.elapsed_time.min = shared_info.probe_range.elapsed_time.min;
      stats->probe.range.elapsed_time.max = shared_info.probe_range.elapsed_time.max;
      stats->probe.range.read_rows.min = shared_info.probe_range.read_rows.min;
      stats->probe.range.read_rows.max = shared_info.probe_range.read_rows.max;
      stats->probe.range.read_keys.min = shared_info.probe_range.read_keys.min;
      stats->probe.range.read_keys.max = shared_info.probe_range.read_keys.max;
      stats->probe.range.qualified_rows.min = shared_info.probe_range.qualified_rows.min;
      stats->probe.range.qualified_rows.max = shared_info.probe_range.qualified_rows.max;
    }

      if (task_manager.has_error ())
    {
      goto error_exit;
    }

      for (context_index = 0; context_index < manager->context_cnt; context_index++)
    {
      current_context = &contexts[context_index];

      if (thread_is_on_trace (&thread_ref))
        {
          hjoin_trace_merge_stats (stats, current_context->stats, manager->single_context.status);
        }

      if (current_context->list_id == nullptr)
        {
          error = er_errid ();
          if (error != NO_ERROR)
        {
          goto error_exit;
        }
          else
        {
          /* list_id can be NULL when the join result is empty.
           * In this case, it is NO_ERROR. */
          continue;
        }
        }

      if (current_context->list_id->tuple_cnt == 0)
        {
          qfile_destroy_list (&thread_ref, current_context->list_id);
          QFILE_FREE_AND_INIT_LIST_ID (current_context->list_id);

          /* empty context */
          continue;
        }

      HJOIN_PROFILE_START (&thread_ref, &profile_start_stats, HASHJOIN_PROFILE_MERGE);
      error = hjoin_merge_qlist (&thread_ref, manager, current_context);
      HJOIN_PROFILE_MERGE_END (&thread_ref, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_MERGE,
                   (manager->single_context.list_id != nullptr) ? manager->single_context.list_id->tuple_cnt : 0);

      if (error != NO_ERROR)
        {
          goto error_exit;
        }
    }

      ASSERT_NO_ERROR_OR_INTERRUPTED ();

cleanup:
      qfile_close_list_sector_scan (&thread_ref, &shared_info.sector_scan);

      return error;

error_exit:
      task_manager.clear_interrupt (thread_ref);

      if (error == NO_ERROR || er_errid () == NO_ERROR)
    {
      assert_release_error (er_errid () != NO_ERROR);
      error = er_errid ();
    }

      goto cleanup;
    }

  } /* namespace hash_join */
} /* namespace parallel_query */