Skip to content

File px_heap_scan.cpp

File List > cubrid > src > query > parallel > px_heap_scan > px_heap_scan.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_heap_scan.cpp - manager for parallel heap scans executed within a single XASL
 */

#include "px_heap_scan.hpp"

#include "error_code.h"
#include "object_primitive.h"
#include "perf_monitor.h"
#include "query_evaluator.h"
#include "error_context.hpp"
#include "query_executor.h"
#include "system.h"
#include "xasl.h"
#include "fetch.h"
#include "px_heap_scan_task.hpp"
#include "px_heap_scan_input_handler_ftabs.hpp"
#include "px_parallel.hpp"          /* parallel_query::compute_parallel_degree */

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

extern "C"
{
  SCAN_CODE
  scan_next_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    switch (scan_id->s.phsid.result_type)
      {
      case parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    return manager_p->next();
      }

      case parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    return manager_p->next();
      }

      case parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    return manager_p->next();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return S_ERROR;
      }
  }

  int
  scan_reset_scan_block_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    /* TODO: cleared by scan_reset_scan_block; redundant? */
    scan_id->single_fetched = false;
    scan_id->null_fetched = false;

    /* TODO: cleared by scan_next_scan_block; redundant? */
    scan_id->qualified_block = false;

    /* reset for S_HEAP_SCAN in scan_reset_scan_block */
    scan_id->position = (scan_id->direction == S_FORWARD) ? S_BEFORE : S_AFTER;
    OID_SET_NULL (&scan_id->s.phsid.curr_oid);

    using accumulative_trace_storage = parallel_heap_scan::accumulative_trace_storage;

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, alloc_size);
            manager_p->reset ();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new (( accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type());
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      case parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, alloc_size);
            manager_p->reset ();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new (( accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type());
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      case parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, alloc_size);
            manager_p->reset ();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new (( accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type());
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return er_errid ();
      }

    return er_errid ();
  }

  void
  scan_end_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    if (scan_id->direction == S_FORWARD)
      {
    scan_id->direction = S_BACKWARD;
      }
    else
      {
    scan_id->direction = S_FORWARD;
      }

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    manager_p->end();
    break;
      }

      case parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    manager_p->end();
    break;
      }

      case parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    manager_p->end();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  void
  scan_close_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    using accumulative_trace_storage = parallel_heap_scan::accumulative_trace_storage;

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST >;

    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, alloc_size);
            manager_p->close();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type());
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      case parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT >;

    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, alloc_size);
            manager_p->close();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type());
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      case parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT >;

    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, alloc_size);
            manager_p->close();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type());
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  int
  scan_open_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id, bool mvcc_select_lock_needed, int fixed_scan,
                int grouped_scan, VAL_DESCR *vd, ACCESS_SPEC_TYPE *spec, OID *class_oid, HFID *class_hfid, XASL_NODE *xasl,
                QUERY_ID query_id)
  {
    int num_user_pages = -1;
    parallel_query::worker_manager *worker_manager_p = nullptr;
    int num_parallel_threads;
    int error = NO_ERROR;

    assert (thread_p != nullptr);
    assert (scan_id != nullptr);
    assert (spec != nullptr);
    assert (spec->type == TARGET_CLASS);
    assert (spec->access == ACCESS_METHOD_SEQUENTIAL);
    assert (xasl != nullptr);
    assert (query_id != NULL_QUERY_ID);
    assert (vd != nullptr);

    scan_id->type = S_HEAP_SCAN;

    if (spec->curent == nullptr)
      {
    /* DB_PARTITION_CLASS will be parallel-heap-scanned, not DB_PARTITIONED_CLASS */
    if (spec->pruning_type == DB_PARTITIONED_CLASS)
      {
        /* try single-thread heap scan */
        return NO_ERROR;
      }

    if (oid_is_system_class (class_oid)
        || mvcc_is_mvcc_disabled_class (class_oid) || mvcc_select_lock_needed
        /* Why thread_p->private_heap_id != 0 ?
         * Because, if it is 0, it means that the scan is not executed in main thread.
         * So, we can't use parallel heap scan. */
        || thread_p->private_heap_id == 0)
      {
        /* parallel-thread heap scan not supported */
        ACCESS_SPEC_SET_FLAG (spec, ACCESS_SPEC_FLAG_NO_PARALLEL_HEAP_SCAN);
      }
      }

    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NO_PARALLEL_HEAP_SCAN) || HFID_IS_NULL (class_hfid))
      {
    /* try single-thread heap scan */
    return NO_ERROR;
      }

    /*
     * try parallel-thread heap scan
     */

    /* check if pages are enough for parallel-thread heap scan */
    error = file_get_num_user_pages (thread_p, &class_hfid->vfid, &num_user_pages);
    if (error != NO_ERROR)
      {
    assert_release_error (er_errid () != NO_ERROR);
    return er_errid ();
      }

    assert (spec->num_parallel_threads == -1 /* auto-compute */
        || ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NUM_PARALLEL_THREADS));

    num_parallel_threads = parallel_query::compute_parallel_degree (parallel_query::parallel_type::HEAP_SCAN,
               num_user_pages, spec->num_parallel_threads /* hint */);
    if (num_parallel_threads < 2)
      {
    /* try single-thread heap scan */
    assert (scan_id->type == S_HEAP_SCAN);
    return NO_ERROR;
      }

    worker_manager_p = parallel_query::worker_manager::try_reserve_workers (num_parallel_threads);
    if (worker_manager_p == nullptr)
      {
    /* try single-thread heap scan */
    assert (scan_id->type == S_HEAP_SCAN);
    return NO_ERROR;
      }

    /* update to actual reserved workers */
    num_parallel_threads = worker_manager_p->get_reserved_workers ();

    if (xasl->topn_items || XASL_IS_FLAGED (xasl, XASL_TO_BE_CACHED))
      {
    ACCESS_SPEC_UNSET_FLAG (spec, ACCESS_SPEC_FLAG_MERGEABLE_LIST);
      }

    /* should check LIST_MERGE in checker */
    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_MERGEABLE_LIST))
      {
    scan_id->s.phsid.result_type = parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST;
      }
    else if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_BUILDVALUE_OPT))
      {
    scan_id->s.phsid.result_type = parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT;
      }
    else
      {
    scan_id->s.phsid.result_type = parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT;
      }

    scan_id->s.phsid.manager = nullptr; /* init */

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type =
        parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::MERGEABLE_LIST >;

    scan_id->s.phsid.manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (scan_id->s.phsid.manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    scan_id->s.phsid.manager = placement_new ((manager_type *) scan_id->s.phsid.manager, thread_p, query_id, scan_id, xasl,
                   num_parallel_threads, *class_hfid, *class_oid, vd, (bool) fixed_scan, (bool) grouped_scan, worker_manager_p);
    assert (scan_id->s.phsid.manager != nullptr);

    error = ((manager_type *) scan_id->s.phsid.manager)->open ();
    if (error != NO_ERROR)
      {
        /* cleanup */
        ((manager_type *) scan_id->s.phsid.manager)->~manager (); /* will release worker_manager_p */
        db_private_free_and_init (thread_p, scan_id->s.phsid.manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      case parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type =
        parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::XASL_SNAPSHOT >;

    scan_id->s.phsid.manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (scan_id->s.phsid.manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    scan_id->s.phsid.manager = placement_new ((manager_type *) scan_id->s.phsid.manager, thread_p, query_id, scan_id, xasl,
                   num_parallel_threads, *class_hfid, *class_oid, vd, (bool) fixed_scan, (bool) grouped_scan, worker_manager_p);
    assert (scan_id->s.phsid.manager != nullptr);

    error = ((manager_type *) scan_id->s.phsid.manager)->open ();
    if (error != NO_ERROR)
      {
        /* cleanup */
        ((manager_type *) scan_id->s.phsid.manager)->~manager (); /* will release worker_manager_p */
        db_private_free_and_init (thread_p, scan_id->s.phsid.manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      case parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type =
        parallel_heap_scan::manager < parallel_heap_scan::RESULT_TYPE::BUILDVALUE_OPT >;

    scan_id->s.phsid.manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (scan_id->s.phsid.manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    scan_id->s.phsid.manager = placement_new ((manager_type *) scan_id->s.phsid.manager, thread_p, query_id, scan_id, xasl,
                   num_parallel_threads, *class_hfid, *class_oid, vd, (bool) fixed_scan, (bool) grouped_scan, worker_manager_p);
    assert (scan_id->s.phsid.manager != nullptr);

    error = ((manager_type *) scan_id->s.phsid.manager)->open ();
    if (error != NO_ERROR)
      {
        /* cleanup */
        ((manager_type *) scan_id->s.phsid.manager)->~manager (); /* will release worker_manager_p */
        db_private_free_and_init (thread_p, scan_id->s.phsid.manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    error = er_errid ();
    break;
      } /* switch (s_id->s.phsid.result_type) */

    if (error != NO_ERROR)
      {
    /* cleanup */
    if (worker_manager_p != nullptr)
      {
        worker_manager_p->release_workers ();
        worker_manager_p = nullptr;
      }

    if (error == ER_INTERRUPTED || er_errid () == ER_INTERRUPTED)
      {
        ASSERT_ERROR ();
        return error;
      }

    /* fallback to single-thread heap scan */
    er_clear ();
    assert (scan_id->type == S_HEAP_SCAN);
    return NO_ERROR;
      }

    er_log_debug (ARG_FILE_LINE, "parallel heap scan started.");
    scan_id->type = S_PARALLEL_HEAP_SCAN;

    ASSERT_NO_ERROR_OR_INTERRUPTED ();
    return NO_ERROR;
  }

  int
  scan_start_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    scan_id->position = S_ON;
    return NO_ERROR;
  }
}

namespace parallel_heap_scan
{
  template <RESULT_TYPE result_type>
  manager<result_type>::~manager()
  {
    if (m_worker_manager != nullptr)
      {
    m_worker_manager->release_workers ();
    m_worker_manager = nullptr;
      }
    if (m_input_handler != nullptr)
      {
    m_input_handler->~input_handler();
    db_private_free (m_thread_p, m_input_handler);
    m_input_handler = nullptr;
      }
    if (m_result_handler != nullptr)
      {
    m_result_handler->read_finalize (m_thread_p);
    m_result_handler->~result_handler();
    db_private_free (m_thread_p, m_result_handler);
    m_result_handler = nullptr;
      }
    if (m_vd != nullptr)
      {
    if (m_vd->dbval_cnt > 0)
      {
        for (int i = 0; i < m_vd->dbval_cnt; i++)
          {
        pr_clear_value (&m_vd->dbval_ptr[i]);
          }
        db_private_free (m_thread_p, m_vd->dbval_ptr);
      }
    db_private_free (m_thread_p, m_vd);
    m_vd = nullptr;
      }
  }

  template <RESULT_TYPE result_type>
  int manager<result_type>::open()
  {
    int h;
    VAL_DESCR *new_vd;
    /* TODO: should check instnum, parse instnum, result type */

    m_query_entry = qmgr_get_query_entry (m_thread_p, m_query_id, m_thread_p->tran_index);
    if (m_query_entry == nullptr)
      {
    return ER_FAILED;
      }
    h = m_query_entry->xasl_id.sha1.h[0]|m_query_entry->xasl_id.sha1.h[1]|m_query_entry->xasl_id.sha1.h[2]|m_query_entry->xasl_id.sha1.h[3]|m_query_entry->xasl_id.sha1.h[4];
    if (h == 0)
      {
    m_uses_xasl_clone = false;

    THREAD_ENTRY *main_thread_p = thread_get_main_thread (m_thread_p);
    if (main_thread_p->xasl_unpack_info_ptr)
      {
        /* use unpack info ptr for execute. */
      }
    else
      {
        assert (false);
        return ER_FAILED;
      }
      }
    else
      {
    m_uses_xasl_clone = true;
      }
    new_vd = (VAL_DESCR *) db_private_alloc (m_thread_p, sizeof (VAL_DESCR));
    if (new_vd == nullptr)
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
    return ER_FAILED;
      }
    memcpy (new_vd, m_orig_vd, sizeof (VAL_DESCR));
    if (m_orig_vd->dbval_cnt > 0)
      {
    new_vd->dbval_ptr = (DB_VALUE *) db_private_alloc (m_thread_p, sizeof (DB_VALUE) * m_orig_vd->dbval_cnt);
    if (new_vd->dbval_ptr == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
        return ER_FAILED;
      }
    for (int i = 0; i < m_orig_vd->dbval_cnt; i++)
      {
        pr_clone_value (&m_orig_vd->dbval_ptr[i], &new_vd->dbval_ptr[i]);
      }
      }
    m_vd = new_vd;
    m_input_handler = (input_handler *) db_private_alloc (m_thread_p, sizeof (input_handler));
    if (m_input_handler == nullptr)
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
    return ER_FAILED;
      }
    m_input_handler = placement_new ((input_handler *) m_input_handler, &m_interrupt, &m_err_messages);

    if (m_input_handler->init_on_main (m_thread_p, m_hfid, m_parallelism) != NO_ERROR)
      {
    m_input_handler->~input_handler();
    db_private_free_and_init (m_thread_p, m_input_handler);
    return ER_FAILED;
      }

    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    m_result_handler = (result_handler<RESULT_TYPE::MERGEABLE_LIST> *) db_private_alloc (m_thread_p,
               sizeof (result_handler<RESULT_TYPE::MERGEABLE_LIST>));
    if (m_result_handler == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
        return ER_FAILED;
      }
    if (m_xasl->type == BUILDLIST_PROC && m_xasl->proc.buildlist.g_agg_list != NULL &&
        !m_xasl->proc.buildlist.g_agg_domains_resolved)
      {
        m_g_agg_domain_resolve_need = true;
      }
    m_result_handler = placement_new ((result_handler<RESULT_TYPE::MERGEABLE_LIST> *) m_result_handler, m_query_id,
                      &m_interrupt, &m_err_messages, m_parallelism, m_g_agg_domain_resolve_need, m_xasl);
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    m_result_handler = (result_handler<RESULT_TYPE::XASL_SNAPSHOT> *) db_private_alloc (m_thread_p,
               sizeof (result_handler<RESULT_TYPE::XASL_SNAPSHOT>));
    if (m_result_handler == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
        return ER_FAILED;
      }
    m_result_handler = placement_new ((result_handler<RESULT_TYPE::XASL_SNAPSHOT> *) m_result_handler, m_query_id,
                      &m_interrupt, &m_err_messages, m_parallelism, m_g_agg_domain_resolve_need, m_xasl);
      }
    else if constexpr (result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
    m_result_handler = (result_handler<RESULT_TYPE::BUILDVALUE_OPT> *) db_private_alloc (m_thread_p,
               sizeof (result_handler<RESULT_TYPE::BUILDVALUE_OPT>));
    if (m_result_handler == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
        return ER_FAILED;
      }
    m_result_handler = placement_new ((result_handler<RESULT_TYPE::BUILDVALUE_OPT> *) m_result_handler, m_query_id,
                      &m_interrupt, &m_err_messages, m_parallelism, m_xasl->proc.buildvalue.agg_list);
      }
    else
      {
    assert (false);
    return ER_FAILED;
      }
    m_on_trace = m_thread_p->on_trace;
    if (m_thread_p->m_px_orig_thread_entry == NULL)
      {
    m_thread_p->m_px_orig_thread_entry = m_thread_p;
      }
    if (m_on_trace)
      {
    if (m_thread_p->m_px_orig_thread_entry != m_thread_p)
      {
        /* this is child thread, so we need to use px_stats */
        if (m_thread_p->m_uses_px_stats)
          {
        /* already initialized */
        m_px_stats_initialized_by_me = false;
          }
        else
          {
        /* not initialized - cannot be happened */
        assert (false);
        perfmon_initialize_parallel_stats (m_thread_p);
        m_px_stats_initialized_by_me = true;
          }
      }
    else
      {
        /* this is main thread */
        if (m_thread_p->m_uses_px_stats)
          {
        /* already initialized */
        m_px_stats_initialized_by_me = false;
        /* do nothing */
          }
        else
          {
        /* not initialized */
        perfmon_initialize_parallel_stats (m_thread_p);
        m_thread_p->m_uses_px_stats = false;
        m_px_stats_initialized_by_me = true;
          }
      }
    m_trace_handler.m_trace_storage_for_sibling_xasl.set_main_xasl_tree (m_xasl);
      }
    m_result_handler_read_initialized = false;
    m_task_started = false;
    m_interrupt.clear();

    return NO_ERROR;
  }

  template <RESULT_TYPE result_type>
  int manager<result_type>::start_tasks()
  {
    for (int i = 0; i < m_parallelism; i++)
      {
    task<result_type> *task_p = (task<result_type> *) malloc (sizeof (task<result_type>));
    if (task_p == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
        return ER_FAILED;
      }
    trace_handler *trace_handler_p = m_on_trace ? &m_trace_handler : nullptr;
    task_p = placement_new ((task<result_type> *) task_p, m_thread_p, m_query_entry, m_result_handler,
                m_input_handler, &m_interrupt, &m_err_messages, m_vd, trace_handler_p, m_worker_manager, m_xasl->header.id, m_hfid,
                m_cls_oid, m_is_fixed,
                m_is_grouped, m_uses_xasl_clone, m_xasl, &m_join_info);
    m_worker_manager->push_task (task_p);
      }
    m_task_started = true;
    return NO_ERROR;
  }

  template <RESULT_TYPE result_type>
  SCAN_CODE manager<result_type>::next()
  {
    SCAN_CODE scan_code = S_SUCCESS;
    int err_code = NO_ERROR;

    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    XASL_NODE *xptr;
    for (xptr = m_xasl; xptr != nullptr; xptr = xptr->scan_ptr)
      {
        QPROC_DB_VALUE_LIST valp = xptr->val_list->valp;
        for (int i=0; i<xptr->val_list->val_cnt; i++)
          {
        pr_clear_value (valp->val);
        valp = valp->next;
          }
      }
      }

    if (unlikely (!m_task_started))
      {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST || result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
        if (m_xasl->scan_ptr)
          {
        m_join_info.capture_join_info (m_xasl);
        for (XASL_NODE *xptr = m_xasl->scan_ptr; xptr; xptr=xptr->scan_ptr)
          {
            if (xptr->spec_list->type == TARGET_LIST)
              {
            scan_end_scan (m_thread_p, &xptr->spec_list->s_id);
              }
          }
          }
      }
    err_code = start_tasks();
    if (err_code != NO_ERROR)
      {
        return S_ERROR;
      }
      }

    if (m_result_handler_read_initialized == false)
      {
    m_result_handler->read_initialize (m_thread_p);
    m_result_handler_read_initialized = true;
      }

    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    scan_code = m_result_handler->read (m_thread_p, m_xasl->list_id);
    if (scan_code == S_ERROR)
      {
        if (m_interrupt.get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
          {
        m_interrupt.set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_MAIN_THREAD);
          }
        if (m_worker_manager != nullptr)
          {
        m_worker_manager->release_workers ();
        m_worker_manager = nullptr;
          }
      }

    if (m_xasl->scan_ptr)
      {
        m_join_info.apply_join_info (m_xasl);
      }

    XASL_NODE *xptr = m_xasl;

    for (xptr = m_xasl; xptr != nullptr; xptr = xptr->scan_ptr)
      {
        std::vector<DB_VALUE> dbval_container (xptr->val_list->val_cnt);
        QPROC_DB_VALUE_LIST valp = xptr->val_list->valp;
        for (int i = 0; i < xptr->val_list->val_cnt; i++)
          {
        pr_clone_value (valp->val, &dbval_container[i]);
        valp = valp->next;
          }

        HL_HEAPID heap_id = db_change_private_heap (m_thread_p, 0);
        valp = xptr->val_list->valp;
        for (int i = 0; i < xptr->val_list->val_cnt; i++)
          {
        pr_clear_value (valp->val);
        valp = valp->next;
          }
        db_change_private_heap (m_thread_p, heap_id);
        valp = xptr->val_list->valp;
        for (int i=0; i<xptr->val_list->val_cnt; i++)
          {
        pr_clone_value (&dbval_container[i], valp->val);
        pr_clear_value (&dbval_container[i]);
        valp = valp->next;
          }
      }

    fetch_val_list (m_thread_p, m_xasl->outptr_list->valptrp, m_vd, nullptr, nullptr, NULL, true);
    if (m_g_agg_domain_resolve_need)
      {
        qexec_resolve_domains_for_aggregation_for_parallel_heap_scan_g_agg (m_thread_p, m_xasl, m_vd,
        &m_xasl->proc.buildlist.g_agg_domains_resolved);
      }
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    scan_code = m_result_handler->read (m_thread_p, m_xasl->val_list);
      }
    else if constexpr (result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
    scan_code = m_result_handler->read (m_thread_p, m_xasl->proc.buildvalue.agg_list);
    if (m_xasl->scan_ptr)
      {
        m_join_info.apply_join_info (m_xasl);
      }
      }
    else
      {
    assert (false);
    return S_ERROR;
      }

    if (unlikely (scan_code == S_ERROR))
      {
    if (m_interrupt.get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
      {
        m_interrupt.set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_MAIN_THREAD);
      }
    if (m_worker_manager != nullptr)
      {
        m_worker_manager->release_workers ();
        m_worker_manager = nullptr;
      }
      }

    if (unlikely (m_interrupt.get_code() != parallel_query::interrupt::interrupt_code::NO_INTERRUPT))
      {
    switch (m_interrupt.get_code())
      {
      case parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_MAIN_THREAD:
      case parallel_query::interrupt::interrupt_code::USER_INTERRUPTED_FROM_MAIN_THREAD:
        break;
      case parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD:
      {
        std::lock_guard<std::mutex> lock (m_err_messages.m_mutex);
        cuberr::context::get_thread_local_error().swap (*m_err_messages.m_error_messages[0]);
        return S_ERROR;
      }
      break;
      case parallel_query::interrupt::interrupt_code::USER_INTERRUPTED_FROM_WORKER_THREAD:
      {
        std::lock_guard<std::mutex> lock (m_err_messages.m_mutex);
        cuberr::context::get_thread_local_error().swap (*m_err_messages.m_error_messages[0]);
        return S_ERROR;
      }
      break;
      case parallel_query::interrupt::interrupt_code::INST_NUM_SATISFIED:
      case parallel_query::interrupt::interrupt_code::JOB_ENDED:
      {
        return S_END;
      }
      break;
      default:
        break;
      }
      }
    return scan_code;
  }

  template <RESULT_TYPE result_type>
  int manager<result_type>::reset ()
  {
    int err_code = NO_ERROR;

    if (m_interrupt.get_code() != parallel_query::interrupt::interrupt_code::JOB_ENDED)
      {
    m_interrupt.set_code (parallel_query::interrupt::interrupt_code::JOB_ENDED);
      }

    /* Release worker manager */
    if (m_worker_manager != nullptr)
      {
    m_worker_manager->wait_workers ();
      }

    m_result_handler->read_finalize (m_thread_p);

    /* Clean up input handler */
    if (m_input_handler != nullptr)
      {
    m_input_handler->~input_handler ();
    db_private_free (m_thread_p, m_input_handler);
    m_input_handler = nullptr;
      }

    /* Clean up result handler */
    if (m_result_handler != nullptr)
      {
    m_result_handler->~result_handler ();
    db_private_free (m_thread_p, m_result_handler);
    m_result_handler = nullptr;
      }

    /* Clean up previous value descriptor */
    if (m_vd != nullptr)
      {
    if (m_vd->dbval_cnt > 0)
      {
        for (int i = 0; i < m_vd->dbval_cnt; i++)
          {
        pr_clear_value (&m_vd->dbval_ptr[i]);
          }
        db_private_free (m_thread_p, m_vd->dbval_ptr);
      }
    db_private_free (m_thread_p, m_vd);
    m_vd = nullptr;
      }

    m_trace_handler.clear();

    /* Open and initialize */
    err_code = open ();
    if (err_code != NO_ERROR)
      {
    m_worker_manager->release_workers ();
    m_worker_manager=nullptr;
    return err_code;
      }

    /* Finalize setup */
    m_scan_id->s.phsid.manager = this;
    return NO_ERROR;
  }

  template <RESULT_TYPE result_type>
  int manager<result_type>::merge_stats()
  {
    int error = NO_ERROR;

    if (m_on_trace)
      {
    if (m_thread_p->m_px_orig_thread_entry != m_thread_p)
      {
        /* child thread */
        if (m_px_stats_initialized_by_me)
          {
        perfmon_destroy_parallel_stats (m_thread_p);
        assert (false);
        error = ER_FAILED;
          }
      }
    else
      {
        /* main thread */
        if (m_px_stats_initialized_by_me)
          {
        perfmon_destroy_parallel_stats (m_thread_p);
          }
      }

    m_trace_handler.merge_stats (m_thread_p, &m_scan_id->scan_stats);
      }

    return error;
  }

  template <RESULT_TYPE result_type>
  int manager<result_type>::end()
  {
    int err_code = NO_ERROR;
    if (m_interrupt.get_code() != parallel_query::interrupt::interrupt_code::JOB_ENDED)
      {
    m_interrupt.set_code (parallel_query::interrupt::interrupt_code::JOB_ENDED);
      }
    if (m_worker_manager != nullptr)
      {
    m_worker_manager->release_workers ();
    m_worker_manager = nullptr;
      }
    err_code = merge_stats();
    m_result_handler->read_finalize (m_thread_p);
    return err_code;
  }

  template <RESULT_TYPE result_type>
  int manager<result_type>::close()
  {
    THREAD_ENTRY *thread_p = m_thread_p;
    m_scan_id->s.phsid.manager = nullptr;
    this->~manager();
    db_private_free (thread_p, this);
    return NO_ERROR;
  }

  // Explicit template instantiations
  template class manager<RESULT_TYPE::MERGEABLE_LIST>;
  template class manager<RESULT_TYPE::XASL_SNAPSHOT>;
  template class manager<RESULT_TYPE::BUILDVALUE_OPT>;
}