Skip to content

File px_hash_join_task_manager.cpp

File List > cubrid > src > query > parallel > px_hash_join > px_hash_join_task_manager.cpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_hash_join_task_manager.cpp
 */

#include "px_hash_join_spawn_manager.hpp"
#include "px_hash_join_task_manager.hpp"

#include "bit.h"            /* bit64_count_trailing_zeros */
#include "error_manager.h"      /* er_errid, er_set, NO_ERROR, assert_release_error */
#include "log_impl.h"           /* logtb_set_tran_index_interrupt, logtb_get_check_interrupt, logtb_is_interrupted_tran */
#include "memory_alloc.h"       /* db_private_alloc, db_private_free_and_init */
#include "object_representation.h"  /* QFILE_GET_TUPLE_COUNT, QFILE_GET_NEXT_VPID */
#include "perf_monitor.h"       /* perfmon_update_min_timeval, perfmon_update_max_timeval */
#include "query_manager.h"      /* qmgr_get_old_page, qfile_has_next_page, qmgr_set_dirty_page, ... */

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_query
{
  namespace hash_join
  {
    /*
     * task_manager
     */

    task_manager::task_manager (worker_manager *worker_manager, cubthread::entry &main_thread_ref)
      : m_worker_manager (worker_manager)
      , m_main_thread_ref (main_thread_ref)
      , m_main_error_context (main_thread_ref.get_error_context())
      , m_all_tasks_done_cv ()
      , m_active_tasks_mutex ()
      , m_active_tasks (0)
      , m_has_error (false)
    {
      assert (m_worker_manager != nullptr);
    }

    void
    task_manager::push_task (base_task *task)
    {
      assert (task != nullptr);
      {
    std::lock_guard<std::mutex> lock (m_active_tasks_mutex);
    ++m_active_tasks;
      }
      m_worker_manager->push_task (task);
    }

    void
    task_manager::end_task ()
    {
      std::lock_guard<std::mutex> lock (m_active_tasks_mutex);
      --m_active_tasks;
      m_worker_manager->pop_task ();
      if (m_active_tasks == 0)
    {
      m_all_tasks_done_cv.notify_all ();
    }
    }

    void
    task_manager::join ()
    {
      std::unique_lock<std::mutex> lock (m_active_tasks_mutex);
      m_all_tasks_done_cv.wait (lock, [this] { return m_active_tasks == 0; });
      m_worker_manager->wait_workers ();
    }

    void
    task_manager::handle_error (cubthread::entry &thread_ref)
    {
      if (!m_has_error.exchange (true, std::memory_order_acq_rel))
    {
      m_main_error_context.get_current_error_level ().swap (cuberr::context::get_thread_local_error ());
      notify_stop ();
    }
      logtb_set_tran_index_interrupt (&thread_ref, thread_ref.tran_index, true);
    }

    void
    task_manager::notify_stop ()
    {
      std::lock_guard<std::mutex> lock (m_active_tasks_mutex);
      m_all_tasks_done_cv.notify_all ();
    }

    bool
    task_manager::check_interrupt (cubthread::entry &thread_ref)
    {
      bool dummy = false;
      if (logtb_get_check_interrupt (&thread_ref)
      && logtb_is_interrupted_tran (&thread_ref, true, &dummy, thread_ref.tran_index))
    {
      /* logtb_set_tran_index_interrupt sets ER_INTERRUPTING with ER_NOTIFICATION_SEVERITY,
       * so er_errid may return NO_ERROR in this case. */
      if (er_errid () == NO_ERROR)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTED, 0);
        }

      handle_error (thread_ref);
      return true;
    }
      return false;
    }

    void
    task_manager::clear_interrupt (cubthread::entry &thread_ref)
    {
      bool dummy = false;
      if (logtb_get_check_interrupt (&thread_ref))
    {
      (void) logtb_is_interrupted_tran (&thread_ref, true, &dummy, thread_ref.tran_index);
    }
    }

    /*
     * base_task
     */

    base_task::base_task (task_manager &task_manager, HASHJOIN_MANAGER *manager, int index)
      : m_task_manager (task_manager)
      , m_manager (manager)
      , m_index (index)
    {
      assert (m_manager != nullptr);
      assert (m_manager->context_cnt > 1);
    }

    void base_task::retire ()
    {
      m_task_manager.end_task ();
      delete this;
    }

    /*
     * split_task
     */

    split_task::split_task (task_manager &task_manager, HASHJOIN_MANAGER *manager, HASHJOIN_INPUT_SPLIT_INFO *split_info,
                HASHJOIN_SHARED_SPLIT_INFO *shared_info, int index)
      : base_task (task_manager, manager, index)
      , m_split_info (split_info)
      , m_shared_info (shared_info)
      , m_membuf_index (-1)
      , m_sector_index (-1)
      , m_current_bitmap (0)
      , m_current_vsid (VSID_INITIALIZER)
      , m_current_tfile (nullptr)
    {
      assert (m_split_info != nullptr);
      assert (m_split_info->fetch_info != nullptr);
      assert (m_split_info->fetch_info->list_id != nullptr);

      assert (m_shared_info != nullptr);
      assert (m_shared_info->part_mutexes != nullptr);
    }

    void
    split_task::execute (cubthread::entry &thread_ref)
    {
      task_execution_guard guard (thread_ref, m_task_manager);

      QFILE_LIST_ID *list_id;
      QFILE_LIST_ID **part_list_id;
      QFILE_LIST_ID **temp_part_list_id = nullptr;

      PAGE_PTR page = nullptr;
      QFILE_TUPLE_RECORD tuple_record = { nullptr, 0 };
      int tuple_cnt, tuple_index, tuple_length;

      VPID overflow_vpid = VPID_INITIALIZER;
      PAGE_PTR overflow_page = nullptr;
      QFILE_TUPLE_RECORD overflow_record = { nullptr, 0 };
      int copy_offset, copy_size;

      HASH_SCAN_KEY *temp_key = nullptr;
      unsigned int hash_key;
      UINT32 part_cnt, part_index, part_id;

      bool is_outer_join = false;
      bool need_skip_next = false;

      int error = NO_ERROR;
      bool has_error = false;

      /* Do not perform NULL checks;
      * validation is expected to be handled by the constructor */
      list_id = m_split_info->fetch_info->list_id;
      part_list_id = m_split_info->part_list_id;
      part_cnt = m_manager->context_cnt;

      is_outer_join = IS_OUTER_JOIN_TYPE (m_manager->join_type);

      temp_part_list_id = (QFILE_LIST_ID **) db_private_alloc (&thread_ref, part_cnt * sizeof (QFILE_LIST_ID *));
      if (temp_part_list_id == nullptr)
    {
      assert_release_error (er_errid () != NO_ERROR);
      m_task_manager.handle_error (thread_ref);
      return;
    }
      memset (temp_part_list_id, 0, part_cnt * sizeof (QFILE_LIST_ID *));

      temp_key = qdata_alloc_hscan_key (&thread_ref, m_manager->key_cnt, true);
      if (temp_key == nullptr)
    {
      /* cleanup */
      db_private_free_and_init (&thread_ref, temp_part_list_id);

      assert_release_error (er_errid () != NO_ERROR);
      m_task_manager.handle_error (thread_ref);
      return;
    }

      if (thread_is_on_trace (&thread_ref))
    {
      thread_ref.m_px_stats = hjoin_trace_get_worker_stats (m_manager, m_index);
      thread_ref.m_uses_px_stats = true;
    }
      else
    {
      assert (thread_ref.m_px_stats == nullptr);
    }

      /* next page */
      do
    {
      if (m_task_manager.has_error () || m_task_manager.check_interrupt (thread_ref))
        {
          has_error = true;
          break;        /* error_exit */
        }

      page = get_next_page (thread_ref);
      if (page == nullptr)
        {
          if (er_errid () != NO_ERROR)
        {
          m_task_manager.handle_error (thread_ref);
          has_error = true;
        }

          /* end */
          break;
        }

      tuple_cnt = QFILE_GET_TUPLE_COUNT (page);
      if (tuple_cnt == 0)
        {
          /* empty page */
          continue;
        }

      tuple_index = -1;

      /* first tuple */
      tuple_record.tpl = (char *) page + QFILE_PAGE_HEADER_SIZE;

      /* overflow page */
      if (QFILE_GET_OVERFLOW_PAGE_ID (page) != NULL_PAGEID)
        {
          assert (tuple_cnt == 1);

          overflow_page = page;

          tuple_length = QFILE_GET_TUPLE_LENGTH (tuple_record.tpl);

          if (overflow_record.size < tuple_length)
        {
          if (qfile_reallocate_tuple (&overflow_record, tuple_length) != NO_ERROR)
            {
              assert_release_error (er_errid () != NO_ERROR);
              m_task_manager.handle_error (thread_ref);
              has_error = true;
              break;        /* error_exit */
            }
        }

          copy_offset = 0;

          do
        {
          copy_size = MIN (tuple_length - copy_offset, QFILE_MAX_TUPLE_SIZE_IN_PAGE);

          memcpy (overflow_record.tpl + copy_offset, (char *) overflow_page + QFILE_PAGE_HEADER_SIZE, copy_size);

          copy_offset += copy_size;
          assert (copy_offset <= tuple_length);

          QFILE_GET_OVERFLOW_VPID (&overflow_vpid, overflow_page);

          if (overflow_page != page)
            {
              /* overflow continuation pages share the same tfile as the start page
               * (see qfile_allocate_new_ovf_page) */
              qmgr_free_old_page_and_init (&thread_ref, overflow_page, m_current_tfile);
            }

          if (VPID_ISNULL (&overflow_vpid))
            {
              /* end */
              break;
            }

          /* next overflow page */
          overflow_page = qmgr_get_old_page (&thread_ref, &overflow_vpid, m_current_tfile);
          if (overflow_page == nullptr)
            {
              assert_release_error (er_errid () != NO_ERROR);
              m_task_manager.handle_error (thread_ref);
              has_error = true;
              break;        /* error_exit */
            }
        }
          while (!VPID_ISNULL (&overflow_vpid));

          if (has_error)
        {
          break;    /* error_exit */
        }

          tuple_record.tpl = overflow_record.tpl;
        }   /* if (QFILE_GET_OVERFLOW_PAGE_ID (page) != NULL_PAGEID) */

      assert (has_error == false);

      /* next tuple */
      do
        {
          if (tuple_index == -1)
        {
          /* first tuple */
        }
          else if (tuple_index < tuple_cnt - 1)
        {
          /* next tuple */
          tuple_length = QFILE_GET_TUPLE_LENGTH (tuple_record.tpl);
          tuple_record.tpl += tuple_length;
        }
          else
        {
          /* next page */
          assert (tuple_index == tuple_cnt - 1);
          break;
        }

          tuple_index++;

          error = hjoin_fetch_key (&thread_ref, m_split_info->fetch_info, &tuple_record, temp_key, nullptr /* compare_key */,
                       &need_skip_next);
          if (error != NO_ERROR)
        {
          assert_release_error (er_errid () != NO_ERROR);
          m_task_manager.handle_error (thread_ref);
          has_error = true;
          break;        /* error_exit */
        }
          else if (need_skip_next)
        {
          need_skip_next = false;   /* init */

          if (is_outer_join)
            {
              /* In outer joins, tuples with NULL in any join column are placed in the last partition.
              * HASHJOIN_STATUS_FILL_NULL_VALUES is triggered for all tuples in that partition. */
              part_id = part_cnt - 1;
            }
          else
            {
              /* next tuple */
              continue;
            }
        }           /* else if (need_skip_next) */
          else
        {
          hash_key = qdata_hash_scan_key (temp_key, UINT_MAX, HASH_METH_IN_MEM);
          part_id = (is_outer_join) ? hash_key % (part_cnt - 1) : hash_key % (part_cnt);

          hjoin_update_tuple_hash_key (&thread_ref, &tuple_record, hash_key);
        }

          /* overflow page */
          if (QFILE_GET_OVERFLOW_PAGE_ID (page) != NULL_PAGEID)
        {
          std::unique_lock lock (m_shared_info->part_mutexes[part_id]);

          assert (part_list_id[part_id]->last_pgptr == nullptr);

          if (qfile_reopen_list_as_append_mode (&thread_ref, part_list_id[part_id]) != NO_ERROR)
            {
              break;        /* error_exit */
            }

          error = qfile_add_tuple_to_list (&thread_ref, part_list_id[part_id], tuple_record.tpl);
          if (error != NO_ERROR)
            {
              break;        /* error_exit */
            }

          qfile_close_list (&thread_ref, part_list_id[part_id]);

          /* next page */
          break;
        }

          if (temp_part_list_id[part_id] != nullptr
          && (temp_part_list_id[part_id]->tfile_vfid->membuf_last == temp_part_list_id[part_id]->tfile_vfid->membuf_npages - 1)
          && (temp_part_list_id[part_id]->last_offset + QFILE_GET_TUPLE_LENGTH (tuple_record.tpl)) > DB_PAGESIZE)
        {
          qfile_close_list (&thread_ref, temp_part_list_id[part_id]);   /* may be meaningless since only memory buffer is used */

          {
            std::unique_lock lock (m_shared_info->part_mutexes[part_id]);

            assert (part_list_id[part_id]->last_pgptr == nullptr);

            if (part_list_id[part_id]->tuple_cnt > 0)
              {
            error = qfile_append_list (&thread_ref, part_list_id[part_id], temp_part_list_id[part_id]);
            if (error != NO_ERROR)
              {
                assert_release_error (er_errid () != NO_ERROR);
                m_task_manager.handle_error (thread_ref);
                has_error = true;
                break;
              }

            error = qfile_truncate_list (&thread_ref, temp_part_list_id[part_id]);
            if (error != NO_ERROR)
              {
                assert_release_error (er_errid () != NO_ERROR);
                m_task_manager.handle_error (thread_ref);
                has_error = true;
                break;
              }
              }
            else
              {
            qfile_destroy_list (&thread_ref, part_list_id[part_id]);
            qfile_copy_list_id (part_list_id[part_id], temp_part_list_id[part_id], false, QFILE_PROHIBIT_DEPENDENT);
            QFILE_FREE_AND_INIT_LIST_ID (temp_part_list_id[part_id]);
              }
          }
        }

          if (temp_part_list_id[part_id] == nullptr)
        {
          temp_part_list_id[part_id] =
              qfile_open_list (&thread_ref, &list_id->type_list, nullptr, list_id->query_id, QFILE_FLAG_ALL, nullptr);
          if (temp_part_list_id[part_id] == nullptr)
            {
              assert_release_error (er_errid () != NO_ERROR);
              m_task_manager.handle_error (thread_ref);
              has_error = true;
              break;
            }
        }

          error = qfile_add_tuple_to_list (&thread_ref, temp_part_list_id[part_id], tuple_record.tpl);
          if (error != NO_ERROR)
        {
          assert_release_error (er_errid () != NO_ERROR);
          m_task_manager.handle_error (thread_ref);
          has_error = true;
          break;
        }
          assert (VFID_ISNULL (&temp_part_list_id[part_id]->tfile_vfid->temp_vfid));
        }
      while (true);     /* next tuple */

      if (page != nullptr)
        {
          qmgr_free_old_page_and_init (&thread_ref, page, m_current_tfile);
        }

      if (has_error)
        {
          break;
        }
    }
      while (true); /* next page */

      if (page != nullptr)
    {
      qmgr_free_old_page_and_init (&thread_ref, page, m_current_tfile);
    }

      assert (temp_part_list_id != nullptr);
      assert (temp_key != nullptr);

      if (!has_error)
    {
      for (part_index = 0; part_index < part_cnt; part_index++)
        {
          if (temp_part_list_id[part_index] == nullptr)
        {
          continue;
        }

          qfile_close_list (&thread_ref, temp_part_list_id[part_index]);    /* may be meaningless since only memory buffer is used */

          if (temp_part_list_id[part_index]->tuple_cnt > 0)
        {
          std::unique_lock lock (m_shared_info->part_mutexes[part_index]);

          assert (part_list_id[part_index]->last_pgptr == nullptr);

          if (part_list_id[part_index]->tuple_cnt > 0)
            {
              error = qfile_append_list (&thread_ref, part_list_id[part_index], temp_part_list_id[part_index]);
              if (error != NO_ERROR)
            {
              assert_release_error (er_errid () != NO_ERROR);
              m_task_manager.handle_error (thread_ref);
              has_error = true;
              break;
            }

              qfile_destroy_list (&thread_ref, temp_part_list_id[part_index]);
            }
          else
            {
              qfile_destroy_list (&thread_ref, part_list_id[part_index]);
              qfile_copy_list_id (part_list_id[part_index], temp_part_list_id[part_index], false, QFILE_PROHIBIT_DEPENDENT);
            }
        }
          else
        {
          qfile_destroy_list (&thread_ref, temp_part_list_id[part_index]);
        }

          QFILE_FREE_AND_INIT_LIST_ID (temp_part_list_id[part_index]);
        }
    }

      /* must be a separate `if`, not an `else` of the block above:
       * the merge loop above may set has_error = true via break, and that case still needs this cleanup to run. */
      if (has_error)
    {
      for (part_index = 0; part_index < part_cnt; part_index++)
        {
          if (temp_part_list_id[part_index] != nullptr)
        {
          qfile_close_list (&thread_ref, temp_part_list_id[part_index]);
          qfile_destroy_list (&thread_ref, temp_part_list_id[part_index]);
          QFILE_FREE_AND_INIT_LIST_ID (temp_part_list_id[part_index]);
        }
        }
    }

      /* cleanup */
      db_private_free_and_init (&thread_ref, temp_part_list_id);

      qdata_free_hscan_key (&thread_ref, temp_key, m_manager->key_cnt);

      if (overflow_record.tpl != nullptr)
    {
      db_private_free_and_init (&thread_ref, overflow_record.tpl);
    }

      thread_ref.m_px_stats = nullptr;
      thread_ref.m_uses_px_stats = false;
    }

    PAGE_PTR
    split_task::get_next_page (cubthread::entry &thread_ref)
    {
      QFILE_LIST_SECTOR_INFO *sector_info = &m_shared_info->sector_info;
      FILE_PARTIAL_SECTOR *sectors = sector_info->sectors;
      void **tfiles = sector_info->tfiles;
      int sector_index;

      /* Phase 1: membuf pages — the CAS winner claims the entire membuf region
       *                          and iterates it sequentially. Non-owners fall
       *                          through to Phase 2 directly. */
      while (true)
    {
      if (m_membuf_index >= 0)
        {
          /* this worker is the membuf owner */
          if (m_membuf_index <= sector_info->membuf_tfile->membuf_last)
        {
          VPID vpid;
          vpid.volid = NULL_VOLID;
          vpid.pageid = m_membuf_index++;

          PAGE_PTR page = qmgr_get_old_page (&thread_ref, &vpid, sector_info->membuf_tfile);
          if (page == nullptr)
            {
              assert_release_error (er_errid () != NO_ERROR);
              return nullptr;
            }

          /* skip overflow continuation pages */
          if (QFILE_GET_TUPLE_COUNT (page) == QFILE_OVERFLOW_TUPLE_COUNT_FLAG)
            {
              qmgr_free_old_page_and_init (&thread_ref, page, sector_info->membuf_tfile);
              continue;
            }

          m_current_tfile = sector_info->membuf_tfile;
          return page;
        }

          /* membuf exhausted — fall through to Phase 2 */
          m_membuf_index = -1;
          break;
        }

      if (m_sector_index == -1 && sector_info->membuf_tfile != nullptr)
        {
          /* first call: try to claim membuf (exactly one winner) */
          bool expected = false;

          if (m_shared_info->membuf_claimed.compare_exchange_strong (expected, true, std::memory_order_acq_rel))
        {
          assert (m_membuf_index == -1);
          m_membuf_index = 0;
          continue;     /* re-enter Phase 1 as the owner */
        }
        }

      /* not the owner — proceed to Phase 2 */
      break;
    }

      /* Phase 2: sector-based disk pages */
      while (true)
    {
      /* find next set bit in current sector bitmap */
      while (m_current_bitmap != 0)
        {
#if defined(__GNUC__) || defined(__clang__)
          int bit_pos = __builtin_ctzll (m_current_bitmap);
#else
          int bit_pos = bit64_count_trailing_zeros (m_current_bitmap);
#endif
          m_current_bitmap &= m_current_bitmap - 1; /* clear lowest set bit */

          VPID vpid;
          vpid.volid = m_current_vsid.volid;
          vpid.pageid = SECTOR_FIRST_PAGEID (m_current_vsid.sectid) + bit_pos;

          QMGR_TEMP_FILE *tfile = (QMGR_TEMP_FILE *) tfiles[m_sector_index];
          assert (tfile != nullptr);

          PAGE_PTR page = qmgr_get_old_page (&thread_ref, &vpid, tfile);
          if (page == nullptr)
        {
          assert_release_error (er_errid () != NO_ERROR);
          return nullptr;
        }

          /* skip overflow continuation pages — they are followed via VPID chain
           * by the worker that owns the overflow start page */
          if (QFILE_GET_TUPLE_COUNT (page) == QFILE_OVERFLOW_TUPLE_COUNT_FLAG)
        {
          qmgr_free_old_page_and_init (&thread_ref, page, tfile);
          continue;
        }

          m_current_tfile = tfile;
          return page;
        }

      /* current sector exhausted — grab next sector atomically */
      sector_index = m_shared_info->next_sector_index.fetch_add (1, std::memory_order_relaxed);
      if (sector_index >= sector_info->sector_cnt)
        {
          return nullptr;       /* all sectors distributed */
        }

      m_sector_index = sector_index;
      m_current_vsid = sectors[sector_index].vsid;
      m_current_bitmap = sectors[sector_index].page_bitmap;
    }
    }

    /*
     * join_task
     */

    join_task::join_task (task_manager &task_manager, HASHJOIN_MANAGER *manager, HASHJOIN_CONTEXT *contexts,
              HASHJOIN_SHARED_JOIN_INFO *shared_info, int index)
      : base_task (task_manager, manager, index)
      , m_contexts (contexts)
      , m_shared_info (shared_info)
    {
      assert (m_manager != nullptr);
      assert (m_manager->context_cnt > 1);
      assert (m_contexts != nullptr);

      assert (m_shared_info != nullptr);
    }

    void
    join_task::execute (cubthread::entry &thread_ref)
    {
      task_execution_guard guard (thread_ref, m_task_manager);

      spawn_manager *spawn_manager = nullptr;
      HASHJOIN_CONTEXT *context = nullptr;
      int error = NO_ERROR;

      TSCTIMEVAL total_build_time = { 0, 0 };
      TSCTIMEVAL total_probe_time = { 0, 0 };

      spawn_manager = spawn_manager::get_instance (thread_ref);
      if (spawn_manager == nullptr)
    {
      assert_release_error (er_errid () != NO_ERROR);
      m_task_manager.handle_error (thread_ref);
      return;
    }

      if (thread_is_on_trace (&thread_ref))
    {
      thread_ref.m_px_stats = hjoin_trace_get_worker_stats (m_manager, m_index);
      thread_ref.m_uses_px_stats = true;
    }
      else
    {
      assert (thread_ref.m_px_stats == nullptr);
    }

      /* next context */
      do
    {
      if (m_task_manager.has_error () || m_task_manager.check_interrupt (thread_ref))
        {
          break;        /* error_exit */
        }

      context = get_next_context ();
      if (context == nullptr)
        {
          if (er_errid () != NO_ERROR)
        {
          m_task_manager.handle_error (thread_ref);
        }

          /* end */
          break;
        }

      /* reuse TLS variables if already set */
      context->val_descr = spawn_manager->get_val_descr (m_manager->val_descr);
      context->during_join_pred = spawn_manager->get_during_join_pred (m_manager->during_join_pred);
      context->outer.regu_list_pred = spawn_manager->get_outer_regu_list_pred (m_manager->outer->regu_list_pred);
      context->inner.regu_list_pred = spawn_manager->get_inner_regu_list_pred ( m_manager->inner->regu_list_pred);

      if (er_errid () != NO_ERROR)
        {
          m_task_manager.handle_error (thread_ref);
          break;        /* error_exit */
        }

      error = hjoin_execute (&thread_ref, m_manager, context);

      if (thread_is_on_trace (&thread_ref))
        {
          TSC_ADD_TIMEVAL (total_build_time, context->stats->build.elapsed_time);
          TSC_ADD_TIMEVAL (total_probe_time, context->stats->probe.elapsed_time);
        }

      /* set to nullptr; cleaned up by clear_spawner after all tasks are done */
      context->val_descr = nullptr;
      context->during_join_pred = nullptr;
      context->outer.regu_list_pred = nullptr;
      context->inner.regu_list_pred = nullptr;

      if (error != NO_ERROR)
        {
          assert_release_error (er_errid () != NO_ERROR);
          m_task_manager.handle_error (thread_ref);
          break;        /* error_exit */
        }

    }
      while (true); /* next page */

      /* cleanup */
      spawn_manager::destroy_instance();

      if (thread_is_on_trace (&thread_ref))
    {
      std::lock_guard<std::mutex> lock (m_shared_info->stats_mutex);

      perfmon_update_min_timeval (&m_shared_info->build_range_time.min, &total_build_time);
      perfmon_update_max_timeval (&m_shared_info->build_range_time.max, &total_build_time);
      perfmon_update_min_timeval (&m_shared_info->probe_range_time.min, &total_probe_time);
      perfmon_update_max_timeval (&m_shared_info->probe_range_time.max, &total_probe_time);
    }

      thread_ref.m_px_stats = nullptr;
      thread_ref.m_uses_px_stats = false;
    }

    HASHJOIN_CONTEXT *
    join_task::get_next_context ()
    {
      /* Do not perform NULL checks;
      * validation is expected to be handled by the constructor */
      HASHJOIN_CONTEXT *contexts = m_manager->contexts;
      HASHJOIN_CONTEXT *current_context = nullptr;

      std::lock_guard<std::mutex> lock (m_shared_info->scan_mutex);

      switch (m_shared_info->scan_position)
    {
    case S_BEFORE:
      if (m_shared_info->next_index == 0)
        {
          current_context = &contexts[m_shared_info->next_index];
          assert (current_context != nullptr);

          m_shared_info->scan_position = S_ON;
          ++m_shared_info->next_index;
        }
      else
        {
          /* impossible case */
          assert_release_error (false);
          return nullptr;
        }
      break;

    case S_ON:
      if (m_shared_info->next_index < m_manager->context_cnt)
        {
          current_context = &contexts[m_shared_info->next_index];
          assert (current_context != nullptr);

          ++m_shared_info->next_index;

          if (m_shared_info->next_index == m_manager->context_cnt)
        {
          m_shared_info->scan_position = S_AFTER;
          m_shared_info->next_index = 0;
        }
        }
      else
        {
          /* impossible case */
          assert_release_error (false);
          return nullptr;
        }
      break;

    case S_AFTER:
      /* nothing to do */
      assert (m_shared_info->next_index == 0);
      return nullptr;

    default:
      /* impossible case */
      assert_release_error (false);
      return nullptr;
    }

      return current_context;
    }
  } /* namespace hash_join */
} /* namespace parallel_query */