Skip to content

File px_hash_join.cpp

File List > cubrid > src > query > parallel > px_hash_join > px_hash_join.cpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_hash_join.cpp
 */

#include "px_hash_join.hpp"
#include "px_hash_join_task_manager.hpp"

#include "error_manager.h"      /* er_errid, NO_ERROR, assert_release_error, ASSERT_NO_ERROR_OR_INTERRUPTED */
#include "list_file.h"      /* qfile_destroy_list, QFILE_FREE_AND_INIT_LIST_ID, qfile_collect_list_sector_info */
#include "query_manager.h"      /* QMGR_TEMP_FILE (qmgr_temp_file) */
#include "storage_common.h"     /* S_BEFORE, VPID_SET_NULL */

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_query
{
  namespace hash_join
  {
    /*
     * build_partitions
     */

    int
    build_partitions (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager, HASHJOIN_SPLIT_INFO *split_info)
    {
      HASHJOIN_INPUT_SPLIT_INFO *outer, *inner;
      HASHJOIN_SHARED_SPLIT_INFO shared_info;
      UINT32 task_cnt, task_index;

      assert (manager != nullptr);
      assert (split_info != nullptr);

      HASHJOIN_STATS *stats = manager->single_context.stats;
      HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
      assert (!thread_is_on_trace (&thread_ref) || stats != nullptr);

      outer = &split_info->outer;
      inner = &split_info->inner;

      task_cnt = manager->num_parallel_threads;

      if (hjoin_init_shared_split_info (&thread_ref, manager, &shared_info) != NO_ERROR)
    {
      assert_release_error (er_errid () != NO_ERROR);
      return er_errid ();
    }

      THREAD_ENTRY *main_thread_p = thread_get_main_thread (&thread_ref);
      task_manager task_manager (manager->px_worker_manager, *main_thread_p);
      split_task *task = nullptr;

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_start (&thread_ref, &start_stats);
    }

      /* collect data page sectors for outer relation */
      if (qfile_collect_list_sector_info (&thread_ref, outer->fetch_info->list_id, &shared_info.sector_info) != NO_ERROR)
    {
      hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);
      return er_errid ();
    }
      shared_info.membuf_claimed.store (false, std::memory_order_relaxed);
      shared_info.next_sector_index.store (0, std::memory_order_relaxed);

      for (task_index = 0; task_index < task_cnt; task_index++)
    {
      task = new split_task (task_manager, manager, outer, &shared_info, task_index);
      task_manager.push_task (task);
    }

      task_manager.join ();

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_drain_worker_stats (&thread_ref, manager);
      hjoin_trace_end (&thread_ref, &stats->split, &start_stats);
    }

      if (task_manager.has_error ())
    {
      /* cleanup */
      hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);

      assert_release_error (er_errid () != NO_ERROR);
      task_manager.clear_interrupt (thread_ref);
      return er_errid ();
    }

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_start (&thread_ref, &start_stats);
    }

      /* collect data page sectors for inner relation
       * (outer's sector_info is freed internally by qfile_collect_list_sector_info) */
      if (qfile_collect_list_sector_info (&thread_ref, inner->fetch_info->list_id, &shared_info.sector_info) != NO_ERROR)
    {
      hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);
      return er_errid ();
    }
      shared_info.membuf_claimed.store (false, std::memory_order_relaxed);
      shared_info.next_sector_index.store (0, std::memory_order_relaxed);

      for (task_index = 0; task_index < task_cnt; task_index++)
    {
      task = new split_task (task_manager, manager, inner, &shared_info, task_index);
      task_manager.push_task (task);
    }

      task_manager.join ();

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_drain_worker_stats (&thread_ref, manager);
      hjoin_trace_end (&thread_ref, &stats->split, &start_stats);
    }

      /* cleanup */
      hjoin_clear_shared_split_info (&thread_ref, manager, &shared_info);

      if (task_manager.has_error ())
    {
      assert_release_error (er_errid () != NO_ERROR);
      task_manager.clear_interrupt (thread_ref);
      return er_errid ();
    }

      ASSERT_NO_ERROR_OR_INTERRUPTED ();
      return NO_ERROR;
    }

    /*
     * execute_partitions
     */

    int
    execute_partitions (cubthread::entry &thread_ref, HASHJOIN_MANAGER *manager)
    {
      HASHJOIN_CONTEXT *current_context;
      HASHJOIN_SHARED_JOIN_INFO shared_info;
      UINT32 context_index;
      UINT32 task_cnt, task_index;

      int error = NO_ERROR;

      assert (manager != nullptr);

      HASHJOIN_STATS *stats = manager->single_context.stats;
      HASHJOIN_START_STATS start_stats = HASHJOIN_START_STATS_INITIALIZER;
#if HASHJOIN_PROFILE_TIME
      HASHJOIN_START_STATS profile_start_stats = HASHJOIN_START_STATS_INITIALIZER;
#endif /* HASHJOIN_PROFILE_TIME */
      assert (!thread_is_on_trace (&thread_ref) || stats != nullptr);

      task_cnt = manager->num_parallel_threads;

      THREAD_ENTRY *main_thread_p = thread_get_main_thread (&thread_ref);
      task_manager task_manager (manager->px_worker_manager, *main_thread_p);
      join_task *task = nullptr;

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_start (&thread_ref, &start_stats);
    }

      for (task_index = 0; task_index < task_cnt; task_index++)
    {
      task = new join_task (task_manager, manager, manager->contexts, &shared_info, task_index);
      task_manager.push_task (task);
    }

      task_manager.join ();

      if (thread_is_on_trace (&thread_ref))
    {
      hjoin_trace_drain_worker_stats (&thread_ref, manager);
      hjoin_trace_end (&thread_ref, &stats->parallel, &start_stats);

      stats->build.range_time.min = shared_info.build_range_time.min;
      stats->build.range_time.max = shared_info.build_range_time.max;
      stats->probe.range_time.min = shared_info.probe_range_time.min;
      stats->probe.range_time.max = shared_info.probe_range_time.max;
    }

      if (task_manager.has_error ())
    {
      assert_release_error (er_errid () != NO_ERROR);
      task_manager.clear_interrupt (thread_ref);
      return er_errid ();
    }

      for (context_index = 0; context_index < manager->context_cnt; context_index++)
    {
      current_context = &manager->contexts[context_index];

      if (thread_is_on_trace (&thread_ref))
        {
          hjoin_trace_merge_stats (stats, current_context->stats);
        }

      if (current_context->list_id == nullptr)
        {
          error = er_errid ();
          if (error != NO_ERROR)
        {
          return error;
        }
          else
        {
          /* list_id can be NULL when the join result is empty.
           * In this case, it is NO_ERROR. */
          continue;
        }
        }

      if (current_context->list_id->tuple_cnt == 0)
        {
          qfile_destroy_list (&thread_ref, current_context->list_id);
          QFILE_FREE_AND_INIT_LIST_ID (current_context->list_id);

          /* empty context */
          continue;
        }

      HJOIN_PROFILE_START (&thread_ref, &profile_start_stats, HASHJOIN_PROFILE_MERGE);
      error = hjoin_merge_qlist (&thread_ref, manager, current_context);
      HJOIN_PROFILE_MERGE_END (&thread_ref, &stats->profile, &profile_start_stats, HASHJOIN_PROFILE_MERGE,
                   manager->single_context.list_id->tuple_cnt);

      if (error != NO_ERROR)
        {
          assert_release_error (er_errid () != NO_ERROR);
          return er_errid ();
        }
    }

      ASSERT_NO_ERROR_OR_INTERRUPTED ();
      return NO_ERROR;
    }
  } /* namespace hash_join */
} /* namespace parallel_query */