Skip to content

File px_scan.cpp

File List > cubrid > src > query > parallel > px_scan > px_scan.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_scan.cpp - manager for parallel scans executed within a single XASL
 */

#include "px_scan.hpp"

#include "error_code.h"
#include "object_primitive.h"
#include "perf_monitor.h"
#include "query_evaluator.h"
#include "error_context.hpp"
#include "query_executor.h"
#include "system.h"
#include "xasl.h"
#include "fetch.h"
#include "px_scan_task.hpp"
#include "px_scan_input_handler_heap.hpp"
#include "px_parallel.hpp"          /* parallel_query::compute_parallel_degree */
#include "list_file.h"              /* qfile_close_list, qfile_destroy_list */
#include "heap_file.h"              /* heap_attrinfo_end */
#include "file_manager.h"           /* file_get_num_user_pages */

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

extern "C"
{
  SCAN_CODE
  scan_next_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    switch (scan_id->s.phsid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    return manager_p->next();
      }

      case parallel_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::XASL_SNAPSHOT, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    return manager_p->next();
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    return manager_p->next();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return S_ERROR;
      }
  }

  int
  scan_reset_scan_block_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    scan_id->single_fetched = false;
    scan_id->null_fetched = false;
    scan_id->qualified_block = false;

    /* reset for S_HEAP_SCAN in scan_reset_scan_block */
    scan_id->position = (scan_id->direction == S_FORWARD) ? S_BEFORE : S_AFTER;
    OID_SET_NULL (&scan_id->s.phsid.curr_oid);

    using accumulative_trace_storage = parallel_scan::accumulative_trace_storage;

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    manager_p->wait_for_workers();
    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            return manager_p->reset ();
          }

        scan_id->s.phsid.trace_storage = placement_new (( accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::HEAP);
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      case parallel_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::XASL_SNAPSHOT, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    manager_p->wait_for_workers();
    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            return manager_p->reset ();
          }

        scan_id->s.phsid.trace_storage = placement_new (( accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::HEAP);
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    manager_p->wait_for_workers();
    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            return manager_p->reset ();
          }

        scan_id->s.phsid.trace_storage = placement_new (( accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::HEAP);
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return er_errid ();
      }

    return er_errid ();
  }

  void
  scan_end_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    if (scan_id->direction == S_FORWARD)
      {
    scan_id->direction = S_BACKWARD;
      }
    else
      {
    scan_id->direction = S_FORWARD;
      }

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    manager_p->end();
    break;
      }

      case parallel_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::XASL_SNAPSHOT, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    manager_p->end();
    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::HEAP >;
    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;
    manager_p->end();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  void
  scan_close_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    using accumulative_trace_storage = parallel_scan::accumulative_trace_storage;

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::HEAP >;

    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    if (thread_p->on_trace)
      {
        manager_p->wait_for_workers();
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            manager_p->close();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::HEAP);
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      case parallel_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::XASL_SNAPSHOT, parallel_scan::SCAN_TYPE::HEAP >;

    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    if (thread_p->on_trace)
      {
        manager_p->wait_for_workers();
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            manager_p->close();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::HEAP);
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::HEAP >;

    manager_type *manager_p = (manager_type *) scan_id->s.phsid.manager;

    if (thread_p->on_trace)
      {
        manager_p->wait_for_workers();
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.phsid.trace_storage = ( accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.phsid.trace_storage == nullptr)
          {
            manager_p->close();
            break;
          }

        scan_id->s.phsid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.phsid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::HEAP);
        assert (scan_id->s.phsid.trace_storage != nullptr);
          }

        scan_id->s.phsid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  int
  scan_open_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id, bool mvcc_select_lock_needed, int fixed_scan,
                int grouped_scan, VAL_DESCR *vd, ACCESS_SPEC_TYPE *spec, OID *class_oid, HFID *class_hfid, XASL_NODE *xasl,
                QUERY_ID query_id)
  {
    int num_user_pages = -1;
    parallel_query::worker_manager *worker_manager_p = nullptr;
    int num_parallel_threads;
    int error = NO_ERROR;

    assert (thread_p != nullptr);
    assert (scan_id != nullptr);
    assert (spec != nullptr);
    assert (spec->type == TARGET_CLASS);
    assert (spec->access == ACCESS_METHOD_SEQUENTIAL);
    assert (xasl != nullptr);
    assert (query_id != NULL_QUERY_ID);
    assert (vd != nullptr);

    scan_id->type = S_HEAP_SCAN;

    if (spec->curent == nullptr)
      {
    /* DB_PARTITION_CLASS will be parallel-heap-scanned, not DB_PARTITIONED_CLASS */
    if (spec->pruning_type == DB_PARTITIONED_CLASS)
      {
        /* try single-thread heap scan */
        return NO_ERROR;
      }

    if (oid_is_system_class (class_oid)
        || mvcc_is_mvcc_disabled_class (class_oid) || mvcc_select_lock_needed
        /* private_heap_id==0 means not main thread; parallel heap scan requires main thread. */
        || thread_p->private_heap_id == 0)
      {
        /* parallel-thread heap scan not supported */
        ACCESS_SPEC_SET_FLAG (spec, ACCESS_SPEC_FLAG_NO_PARALLEL_SCAN);
      }
      }

    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NO_PARALLEL_SCAN) || HFID_IS_NULL (class_hfid))
      {
    /* try single-thread heap scan */
    return NO_ERROR;
      }

    /* try parallel-thread heap scan */

    /* check if pages are enough for parallel-thread heap scan */
    error = file_get_num_user_pages (thread_p, &class_hfid->vfid, &num_user_pages);
    if (error != NO_ERROR)
      {
    assert_release_error (er_errid () != NO_ERROR);
    return er_errid ();
      }

    assert (spec->num_parallel_threads == -1 /* auto-compute */
        || ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NUM_PARALLEL_THREADS));

    num_parallel_threads = parallel_query::compute_parallel_degree (parallel_query::parallel_type::SCAN,
               num_user_pages, spec->num_parallel_threads /* hint */);
    if (num_parallel_threads < 2)
      {
    /* try single-thread heap scan */
    assert (scan_id->type == S_HEAP_SCAN);
    return NO_ERROR;
      }

    worker_manager_p = parallel_query::worker_manager::try_reserve_workers (num_parallel_threads);
    if (worker_manager_p == nullptr)
      {
    /* try single-thread heap scan */
    assert (scan_id->type == S_HEAP_SCAN);
    return NO_ERROR;
      }

    /* update to actual reserved workers */
    num_parallel_threads = worker_manager_p->get_reserved_workers ();

    if (xasl->topn_items || XASL_IS_FLAGED (xasl, XASL_TO_BE_CACHED))
      {
    ACCESS_SPEC_UNSET_FLAG (spec, ACCESS_SPEC_FLAG_MERGEABLE_LIST);
      }

    /* should check LIST_MERGE in checker */
    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_MERGEABLE_LIST))
      {
    scan_id->s.phsid.result_type = parallel_scan::RESULT_TYPE::MERGEABLE_LIST;
      }
    else if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_BUILDVALUE_OPT))
      {
    scan_id->s.phsid.result_type = parallel_scan::RESULT_TYPE::BUILDVALUE_OPT;
      }
    else
      {
    scan_id->s.phsid.result_type = parallel_scan::RESULT_TYPE::XASL_SNAPSHOT;
      }

    scan_id->s.phsid.manager = nullptr; /* init */

    switch (scan_id->s.phsid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type =
        parallel_scan::manager < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::HEAP >;

    scan_id->s.phsid.manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (scan_id->s.phsid.manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    scan_id->s.phsid.manager = placement_new ((manager_type *) scan_id->s.phsid.manager, thread_p, query_id, scan_id, xasl,
                   num_parallel_threads, *class_hfid, *class_oid, vd, (bool) fixed_scan, (bool) grouped_scan, worker_manager_p);
    assert (scan_id->s.phsid.manager != nullptr);

    error = ((manager_type *) scan_id->s.phsid.manager)->open ();
    if (error != NO_ERROR)
      {
        /* cleanup */
        ((manager_type *) scan_id->s.phsid.manager)->~manager (); /* will release worker_manager_p */
        db_private_free_and_init (thread_p, scan_id->s.phsid.manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      case parallel_scan::RESULT_TYPE::XASL_SNAPSHOT:
      {
    using manager_type =
        parallel_scan::manager < parallel_scan::RESULT_TYPE::XASL_SNAPSHOT, parallel_scan::SCAN_TYPE::HEAP >;

    scan_id->s.phsid.manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (scan_id->s.phsid.manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    scan_id->s.phsid.manager = placement_new ((manager_type *) scan_id->s.phsid.manager, thread_p, query_id, scan_id, xasl,
                   num_parallel_threads, *class_hfid, *class_oid, vd, (bool) fixed_scan, (bool) grouped_scan, worker_manager_p);
    assert (scan_id->s.phsid.manager != nullptr);

    error = ((manager_type *) scan_id->s.phsid.manager)->open ();
    if (error != NO_ERROR)
      {
        /* cleanup */
        ((manager_type *) scan_id->s.phsid.manager)->~manager (); /* will release worker_manager_p */
        db_private_free_and_init (thread_p, scan_id->s.phsid.manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type =
        parallel_scan::manager < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::HEAP >;

    scan_id->s.phsid.manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (scan_id->s.phsid.manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    scan_id->s.phsid.manager = placement_new ((manager_type *) scan_id->s.phsid.manager, thread_p, query_id, scan_id, xasl,
                   num_parallel_threads, *class_hfid, *class_oid, vd, (bool) fixed_scan, (bool) grouped_scan, worker_manager_p);
    assert (scan_id->s.phsid.manager != nullptr);

    error = ((manager_type *) scan_id->s.phsid.manager)->open ();
    if (error != NO_ERROR)
      {
        /* cleanup */
        ((manager_type *) scan_id->s.phsid.manager)->~manager (); /* will release worker_manager_p */
        db_private_free_and_init (thread_p, scan_id->s.phsid.manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    error = er_errid ();
    break;
      }

    if (error != NO_ERROR)
      {
    /* cleanup */
    if (worker_manager_p != nullptr)
      {
        worker_manager_p->release_workers ();
        worker_manager_p = nullptr;
      }

    if (error == ER_INTERRUPTED || er_errid () == ER_INTERRUPTED)
      {
        ASSERT_ERROR ();
        return error;
      }

    /* fallback to single-thread heap scan */
    er_clear ();
    assert (scan_id->type == S_HEAP_SCAN);
    return NO_ERROR;
      }

    scan_id->type = S_PARALLEL_HEAP_SCAN;

    ASSERT_NO_ERROR_OR_INTERRUPTED ();
    return NO_ERROR;
  }

  int
  scan_start_parallel_heap_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    scan_id->position = S_ON;
    return NO_ERROR;
  }

  SCAN_CODE
  scan_next_parallel_list_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    switch (scan_id->s.pllsid_parallel.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;
    return manager_p->next();
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;
    return manager_p->next();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return S_ERROR;
      }
  }

  int
  scan_reset_scan_block_parallel_list_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    scan_id->single_fetched = false;
    scan_id->null_fetched = false;
    scan_id->qualified_block = false;
    scan_id->position = (scan_id->direction == S_FORWARD) ? S_BEFORE : S_AFTER;

    using accumulative_trace_storage = parallel_scan::accumulative_trace_storage;

    switch (scan_id->s.pllsid_parallel.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;

    manager_p->wait_for_workers();
    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pllsid_parallel.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
            return manager_p->reset ();
          }

        scan_id->s.pllsid_parallel.trace_storage = placement_new ((accumulative_trace_storage *)
            scan_id->s.pllsid_parallel.trace_storage,
            manager_p->get_result_type(), parallel_scan::SCAN_TYPE::LIST);
        assert (scan_id->s.pllsid_parallel.trace_storage != nullptr);
          }

        scan_id->s.pllsid_parallel.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;

    manager_p->wait_for_workers();
    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pllsid_parallel.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
            return manager_p->reset ();
          }

        scan_id->s.pllsid_parallel.trace_storage = placement_new ((accumulative_trace_storage *)
            scan_id->s.pllsid_parallel.trace_storage,
            manager_p->get_result_type(), parallel_scan::SCAN_TYPE::LIST);
        assert (scan_id->s.pllsid_parallel.trace_storage != nullptr);
          }

        scan_id->s.pllsid_parallel.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return er_errid ();
      }

    return er_errid ();
  }

  void
  scan_end_parallel_list_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    if (scan_id->direction == S_FORWARD)
      {
    scan_id->direction = S_BACKWARD;
      }
    else
      {
    scan_id->direction = S_FORWARD;
      }

    switch (scan_id->s.pllsid_parallel.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;
    manager_p->end();
    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::LIST >;
    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;
    manager_p->end();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  void
  scan_close_parallel_list_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    using accumulative_trace_storage = parallel_scan::accumulative_trace_storage;

    switch (scan_id->s.pllsid_parallel.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::LIST >;

    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;

    if (thread_p->on_trace)
      {
        manager_p->wait_for_workers();
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pllsid_parallel.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
            manager_p->close();
            break;
          }

        scan_id->s.pllsid_parallel.trace_storage = placement_new ((accumulative_trace_storage *)
            scan_id->s.pllsid_parallel.trace_storage,
            manager_p->get_result_type(), parallel_scan::SCAN_TYPE::LIST);
        assert (scan_id->s.pllsid_parallel.trace_storage != nullptr);
          }

        scan_id->s.pllsid_parallel.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::LIST >;

    manager_type *manager_p = (manager_type *) scan_id->s.pllsid_parallel.manager;

    if (thread_p->on_trace)
      {
        manager_p->wait_for_workers();
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pllsid_parallel.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pllsid_parallel.trace_storage == nullptr)
          {
            manager_p->close();
            break;
          }

        scan_id->s.pllsid_parallel.trace_storage = placement_new ((accumulative_trace_storage *)
            scan_id->s.pllsid_parallel.trace_storage,
            manager_p->get_result_type(), parallel_scan::SCAN_TYPE::LIST);
        assert (scan_id->s.pllsid_parallel.trace_storage != nullptr);
          }

        scan_id->s.pllsid_parallel.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  int
  scan_open_parallel_list_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id,
                VAL_DESCR *vd, ACCESS_SPEC_TYPE *spec, QFILE_LIST_ID *list_id,
                XASL_NODE *xasl, QUERY_ID query_id)
  {
    parallel_query::worker_manager *worker_manager_p = nullptr;
    int num_parallel_threads;
    int error = NO_ERROR;

    assert (thread_p != nullptr);
    assert (scan_id != nullptr);
    assert (spec != nullptr);
    assert (list_id != nullptr);
    assert (xasl != nullptr);
    assert (query_id != NULL_QUERY_ID);
    assert (vd != nullptr);

    scan_id->type = S_LIST_SCAN;

    if (thread_p->private_heap_id == 0)
      {
    /* not main thread; cannot use parallel list scan */
    return NO_ERROR;
      }

    /* DML reads val_list directly from scan_id; result handler does not populate it as DML expects. */
    if (xasl->type == INSERT_PROC || xasl->type == UPDATE_PROC
    || xasl->type == DELETE_PROC || xasl->type == MERGE_PROC)
      {
    return NO_ERROR;
      }

    /* NO_PARALLEL_SCAN hint blocks parallel list scan */
    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NO_PARALLEL_SCAN))
      {
    return NO_ERROR;
      }

    if (xasl->topn_items || XASL_IS_FLAGED (xasl, XASL_TO_BE_CACHED))
      {
    return NO_ERROR;
      }

    assert (spec->num_parallel_threads == -1
        || ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NUM_PARALLEL_THREADS));

    num_parallel_threads = parallel_query::compute_parallel_degree (parallel_query::parallel_type::SCAN,
               list_id->page_cnt, spec->num_parallel_threads /* hint */);
    if (num_parallel_threads < 2)
      {
    /* try single-thread list scan */
    assert (scan_id->type == S_LIST_SCAN);
    return NO_ERROR;
      }

    worker_manager_p = parallel_query::worker_manager::try_reserve_workers (num_parallel_threads);
    if (worker_manager_p == nullptr)
      {
    /* try single-thread list scan */
    assert (scan_id->type == S_LIST_SCAN);
    return NO_ERROR;
      }

    /* update to actual reserved workers */
    num_parallel_threads = worker_manager_p->get_reserved_workers ();

    /* local result type: pllsid_parallel overlaps llsid; keep llsid intact until open() succeeds. */
    parallel_scan::RESULT_TYPE local_result_type;
    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_MERGEABLE_LIST))
      {
    local_result_type = parallel_scan::RESULT_TYPE::MERGEABLE_LIST;
      }
    else if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_BUILDVALUE_OPT))
      {
    local_result_type = parallel_scan::RESULT_TYPE::BUILDVALUE_OPT;
      }
    else
      {
    /* try single-thread list scan */
    worker_manager_p->release_workers ();
    assert (scan_id->type == S_LIST_SCAN);
    return NO_ERROR;
      }

    HFID null_hfid = HFID_INITIALIZER;
    OID null_oid = OID_INITIALIZER;

    /* local manager ptr: pllsid_parallel written only after open() succeeds; llsid stays intact for fallback. */
    void *local_manager = nullptr;

    switch (local_result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type =
        parallel_scan::manager < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::LIST >;

    local_manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (local_manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    local_manager = placement_new ((manager_type *) local_manager,
                       thread_p, query_id, scan_id, xasl,
                       num_parallel_threads, null_hfid, null_oid, vd,
                       false, false, worker_manager_p, list_id);
    assert (local_manager != nullptr);

    error = ((manager_type *) local_manager)->open ();
    if (error != NO_ERROR)
      {
        ((manager_type *) local_manager)->~manager ();
        db_private_free_and_init (thread_p, local_manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type =
        parallel_scan::manager < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::LIST >;

    local_manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (local_manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    local_manager = placement_new ((manager_type *) local_manager,
                       thread_p, query_id, scan_id, xasl,
                       num_parallel_threads, null_hfid, null_oid, vd,
                       false, false, worker_manager_p, list_id);
    assert (local_manager != nullptr);

    error = ((manager_type *) local_manager)->open ();
    if (error != NO_ERROR)
      {
        ((manager_type *) local_manager)->~manager ();
        db_private_free_and_init (thread_p, local_manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    error = er_errid ();
    break;
      } /* switch (local_result_type) */

    if (error != NO_ERROR)
      {
    /* cleanup */
    if (worker_manager_p != nullptr)
      {
        worker_manager_p->release_workers ();
        worker_manager_p = nullptr;
      }

    if (error == ER_INTERRUPTED || er_errid () == ER_INTERRUPTED)
      {
        ASSERT_ERROR ();
        return error;
      }

    /* fallback to single-thread list scan — llsid is still intact */
    er_clear ();
    assert (scan_id->type == S_LIST_SCAN);
    return NO_ERROR;
      }

    /* success: manager open, workers ready — safe to overwrite pllsid_parallel. */
    scan_id->s.pllsid_parallel.result_type = local_result_type;
    scan_id->s.pllsid_parallel.manager = local_manager;
    scan_id->s.pllsid_parallel.trace_storage = nullptr;

    scan_id->type = S_PARALLEL_LIST_SCAN;

    ASSERT_NO_ERROR_OR_INTERRUPTED ();
    return NO_ERROR;
  }

  int
  scan_start_parallel_list_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    scan_id->position = S_ON;
    return NO_ERROR;
  }

  SCAN_CODE
  scan_next_parallel_index_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    switch (scan_id->s.pisid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::INDEX >;
    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;
    return manager_p->next();
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::INDEX >;
    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;
    return manager_p->next();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return S_ERROR;
      }
  }

  int
  scan_reset_scan_block_parallel_index_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    scan_id->single_fetched = false;
    scan_id->null_fetched = false;
    scan_id->qualified_block = false;
    scan_id->position = (scan_id->direction == S_FORWARD) ? S_BEFORE : S_AFTER;

    using accumulative_trace_storage = parallel_scan::accumulative_trace_storage;

    switch (scan_id->s.pisid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::INDEX >;
    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;

    manager_p->wait_for_workers();
    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pisid.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
            return manager_p->reset ();
          }

        scan_id->s.pisid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.pisid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::INDEX);
        assert (scan_id->s.pisid.trace_storage != nullptr);
          }

        scan_id->s.pisid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::INDEX >;
    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;

    manager_p->wait_for_workers();
    manager_p->merge_stats();

    if (thread_p->on_trace)
      {
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pisid.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
            return manager_p->reset ();
          }

        scan_id->s.pisid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.pisid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::INDEX);
        assert (scan_id->s.pisid.trace_storage != nullptr);
          }

        scan_id->s.pisid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    return manager_p->reset ();
      }

      default:
    /* impossible case */
    assert_release_error (false);
    return er_errid ();
      }

    return er_errid ();
  }

  void
  scan_end_parallel_index_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    if (scan_id->direction == S_FORWARD)
      {
    scan_id->direction = S_BACKWARD;
      }
    else
      {
    scan_id->direction = S_FORWARD;
      }

    switch (scan_id->s.pisid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::INDEX >;
    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;
    manager_p->end();
    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::INDEX >;
    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;
    manager_p->end();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  void
  scan_close_parallel_index_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    using accumulative_trace_storage = parallel_scan::accumulative_trace_storage;

    switch (scan_id->s.pisid.result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::INDEX >;

    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;

    if (thread_p->on_trace)
      {
        manager_p->wait_for_workers();
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pisid.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
            manager_p->close();
            break;
          }

        scan_id->s.pisid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.pisid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::INDEX);
        assert (scan_id->s.pisid.trace_storage != nullptr);
          }

        scan_id->s.pisid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type = parallel_scan::manager
                 < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::INDEX >;

    manager_type *manager_p = (manager_type *) scan_id->s.pisid.manager;

    if (thread_p->on_trace)
      {
        manager_p->wait_for_workers();
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
        size_t alloc_size = sizeof (accumulative_trace_storage);
        scan_id->s.pisid.trace_storage = (accumulative_trace_storage *) malloc (alloc_size);
        if (scan_id->s.pisid.trace_storage == nullptr)
          {
            manager_p->close();
            break;
          }

        scan_id->s.pisid.trace_storage = placement_new ((accumulative_trace_storage *) scan_id->s.pisid.trace_storage,
                         manager_p->get_result_type(), parallel_scan::SCAN_TYPE::INDEX);
        assert (scan_id->s.pisid.trace_storage != nullptr);
          }

        scan_id->s.pisid.trace_storage->add_stats (manager_p->get_trace_handler());
      }

    manager_p->close();
    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    break;
      }
  }

  /* open-phase captures needed by scan_try_promote_parallel_index_scan to build the manager; freed on promotion attempt. */
  struct parallel_index_scan_pending
  {
    ACCESS_SPEC_TYPE *spec;
    XASL_NODE *xasl;
    OID class_oid;
    HFID class_hfid;
    QUERY_ID query_id;
  };

  void
  scan_clear_parallel_index_pending (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    if (scan_id == nullptr || scan_id->s.isid.parallel_pending == nullptr)
      {
    return;
      }
    db_private_free_and_init (thread_p, scan_id->s.isid.parallel_pending);
  }

  int
  scan_open_parallel_index_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id,
                 VAL_DESCR *vd, ACCESS_SPEC_TYPE *spec,
                 OID *class_oid, HFID *class_hfid,
                 XASL_NODE *xasl, QUERY_ID query_id)
  {
    assert (thread_p != nullptr);
    assert (scan_id != nullptr);
    assert (spec != nullptr);
    assert (xasl != nullptr);
    assert (query_id != NULL_QUERY_ID);
    assert (vd != nullptr);

    scan_id->type = S_INDX_SCAN;

    /* clear stale pending from previous open (e.g., partition pruning re-open in qexec_next_scan_block_iterations). */
    scan_clear_parallel_index_pending (thread_p, scan_id);

    if (thread_p->private_heap_id == 0)
      {
    /* not main thread; cannot use parallel index scan */
    return NO_ERROR;
      }

    /* DML reads val_list directly; parallel scan does not populate it the same way */
    if (xasl->type == INSERT_PROC || xasl->type == UPDATE_PROC
    || xasl->type == DELETE_PROC || xasl->type == MERGE_PROC)
      {
    return NO_ERROR;
      }

    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NO_PARALLEL_SCAN))
      {
    return NO_ERROR;
      }

    /* parent partitioned class only (curent==NULL); per-partition reopens flow through */
    if (spec->curent == nullptr && spec->pruning_type == DB_PARTITIONED_CLASS)
      {
    return NO_ERROR;
      }

    if (xasl->topn_items || XASL_IS_FLAGED (xasl, XASL_TO_BE_CACHED))
      {
    return NO_ERROR;
      }

    assert (spec->num_parallel_threads == -1
        || ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_NUM_PARALLEL_THREADS));

    /* defer worker reservation to start-scan: qexec_intprt_fnc may set need_count_only and skip the scan entirely. */
    parallel_index_scan_pending *pending =
        (parallel_index_scan_pending *) db_private_alloc (thread_p, sizeof (parallel_index_scan_pending));
    if (pending == nullptr)
      {
    er_clear ();
    return NO_ERROR;
      }
    pending->spec = spec;
    pending->xasl = xasl;
    pending->class_oid = *class_oid;
    pending->class_hfid = *class_hfid;
    pending->query_id = query_id;
    scan_id->s.isid.parallel_pending = pending;

    return NO_ERROR;
  }

  int
  scan_try_promote_parallel_index_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    parallel_query::worker_manager *worker_manager_p = nullptr;
    int num_parallel_threads;
    int error = NO_ERROR;

    if (scan_id == nullptr || scan_id->s.isid.parallel_pending == nullptr)
      {
    return NO_ERROR;
      }

    parallel_index_scan_pending *pending = (parallel_index_scan_pending *) scan_id->s.isid.parallel_pending;
    ACCESS_SPEC_TYPE *spec = pending->spec;
    XASL_NODE *xasl = pending->xasl;
    OID class_oid = pending->class_oid;
    HFID class_hfid = pending->class_hfid;
    QUERY_ID query_id = pending->query_id;
    VAL_DESCR *vd = scan_id->vd;
    db_private_free_and_init (thread_p, scan_id->s.isid.parallel_pending);

    assert (scan_id->type == S_INDX_SCAN);
    assert (spec != nullptr);
    assert (xasl != nullptr);
    assert (vd != nullptr);

    /* index scan degree set client-side by optimizer; trust spec->num_parallel_threads verbatim. */
    num_parallel_threads = spec->num_parallel_threads;
    if (num_parallel_threads < 2)
      {
    assert (scan_id->type == S_INDX_SCAN);
    return NO_ERROR;
      }

    worker_manager_p = parallel_query::worker_manager::try_reserve_workers (num_parallel_threads);
    if (worker_manager_p == nullptr)
      {
    assert (scan_id->type == S_INDX_SCAN);
    return NO_ERROR;
      }

    /* update to actual reserved workers */
    num_parallel_threads = worker_manager_p->get_reserved_workers ();

    /* local result type: pisid overlaps isid; keep isid intact until open() succeeds. */
    parallel_scan::RESULT_TYPE local_result_type;
    if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_MERGEABLE_LIST))
      {
    local_result_type = parallel_scan::RESULT_TYPE::MERGEABLE_LIST;
      }
    else if (ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_BUILDVALUE_OPT))
      {
    local_result_type = parallel_scan::RESULT_TYPE::BUILDVALUE_OPT;
      }
    else
      {
    /* XASL_SNAPSHOT not supported for INDEX scan; fall back before touching pisid union. */
    worker_manager_p->release_workers ();
    assert (scan_id->type == S_INDX_SCAN);
    return NO_ERROR;
      }

    /* Save indx_info from isid.  init_on_main (called from manager::open) needs the BTID. */
    INDX_INFO *saved_indx_info = scan_id->s.isid.indx_info;

    /* pisid is isid superset — promote-fail safe (parallel-only fields live after isid) */
    void *local_manager = nullptr;

    switch (local_result_type)
      {
      case parallel_scan::RESULT_TYPE::MERGEABLE_LIST:
      {
    using manager_type =
        parallel_scan::manager < parallel_scan::RESULT_TYPE::MERGEABLE_LIST, parallel_scan::SCAN_TYPE::INDEX >;

    local_manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (local_manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    local_manager = placement_new ((manager_type *) local_manager,
                       thread_p, query_id, scan_id, xasl,
                       num_parallel_threads, class_hfid, class_oid, vd,
                       false, false, worker_manager_p, nullptr, saved_indx_info);
    assert (local_manager != nullptr);

    error = ((manager_type *) local_manager)->open ();
    if (error != NO_ERROR)
      {
        ((manager_type *) local_manager)->~manager ();
        db_private_free_and_init (thread_p, local_manager);
        worker_manager_p = nullptr;

        error = er_errid ();
      }

    break;
      }

      case parallel_scan::RESULT_TYPE::BUILDVALUE_OPT:
      {
    using manager_type =
        parallel_scan::manager < parallel_scan::RESULT_TYPE::BUILDVALUE_OPT, parallel_scan::SCAN_TYPE::INDEX >;

    local_manager = (void *) db_private_alloc (thread_p, sizeof (manager_type));
    if (local_manager == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
        break;
      }

    local_manager = placement_new ((manager_type *) local_manager,
                       thread_p, query_id, scan_id, xasl,
                       num_parallel_threads, class_hfid, class_oid, vd,
                       false, false, worker_manager_p, nullptr, saved_indx_info);
    assert (local_manager != nullptr);

    error = ((manager_type *) local_manager)->open ();
    if (error != NO_ERROR)
      {
        ((manager_type *) local_manager)->~manager ();
        db_private_free_and_init (thread_p, local_manager);
        worker_manager_p = nullptr;

        assert_release_error (er_errid () != NO_ERROR);
        error = er_errid ();
      }

    break;
      }

      default:
    /* impossible case */
    assert_release_error (false);
    error = er_errid ();
    break;
      } /* switch (local_result_type) */

    if (error != NO_ERROR)
      {
    /* cleanup */
    if (worker_manager_p != nullptr)
      {
        worker_manager_p->release_workers ();
        worker_manager_p = nullptr;
      }

    return error;
      }

    /* success: manager open, workers ready — clean up isid resources before overwriting with pisid. */

    /* End heap attr caches while isid is still valid. */
    {
      INDX_SCAN_ID *isidp = &scan_id->s.isid;
      if (isidp->caches_inited)
    {
      if (isidp->range_pred.regu_list != NULL)
        {
          heap_attrinfo_end (thread_p, isidp->range_attrs.attr_cache);
        }
      if (isidp->key_pred.regu_list)
        {
          heap_attrinfo_end (thread_p, isidp->key_attrs.attr_cache);
        }
      heap_attrinfo_end (thread_p, isidp->pred_attrs.attr_cache);
      heap_attrinfo_end (thread_p, isidp->rest_attrs.attr_cache);
      isidp->caches_inited = false;
    }
    }

    /* Free scan-specific resources (bt_attr_ids, oid_list, copy_buf, etc.). */
    scan_close_scan (thread_p, scan_id);
    scan_id->status = S_OPENED; /* reset status; scan_close_scan sets it to S_CLOSED */

    if (scan_id->s.isid.indx_cov.list_id != NULL)
      {
    if (scan_id->s.isid.indx_cov.list_id->type_list.type_cnt > 0)
      {
        qfile_close_list (thread_p, scan_id->s.isid.indx_cov.list_id);
        qfile_destroy_list (thread_p, scan_id->s.isid.indx_cov.list_id);
      }
      }

    /* keep trace_storage across promotes — it accumulates per-partition worker stats */
    scan_id->s.pisid.result_type = local_result_type;
    scan_id->s.pisid.manager = local_manager;

    scan_id->type = S_PARALLEL_INDEX_SCAN;

    ASSERT_NO_ERROR_OR_INTERRUPTED ();
    return NO_ERROR;
  }

  int
  scan_start_parallel_index_scan (THREAD_ENTRY *thread_p, SCAN_ID *scan_id)
  {
    scan_id->position = S_ON;
    return NO_ERROR;
  }
}

namespace parallel_scan
{
  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  manager<result_type, ST>::~manager()
  {
    if (m_worker_manager != nullptr)
      {
    m_worker_manager->release_workers ();
    m_worker_manager = nullptr;
      }
    if (m_input_handler != nullptr)
      {
    if constexpr (ST == SCAN_TYPE::INDEX)
      {
        m_input_handler->cleanup_keys (m_thread_p);
      }
    else if constexpr (ST == SCAN_TYPE::LIST)
      {
        m_input_handler->cleanup_on_main (m_thread_p);
      }
    m_input_handler->~input_handler_t();
    db_private_free (m_thread_p, m_input_handler);
    m_input_handler = nullptr;
      }
    if (m_result_handler != nullptr)
      {
    m_result_handler->read_finalize (m_thread_p);
    m_result_handler->~result_handler();
    db_private_free (m_thread_p, m_result_handler);
    m_result_handler = nullptr;
      }
    if (m_vd != nullptr)
      {
    if (m_vd->dbval_cnt > 0)
      {
        for (int i = 0; i < m_vd->dbval_cnt; i++)
          {
        pr_clear_value (&m_vd->dbval_ptr[i]);
          }
        db_private_free (m_thread_p, m_vd->dbval_ptr);
      }
    db_private_free (m_thread_p, m_vd);
    m_vd = nullptr;
      }
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int manager<result_type, ST>::open()
  {
    int h;
    VAL_DESCR *new_vd;
    m_query_entry = qmgr_get_query_entry (m_thread_p, m_query_id, m_thread_p->tran_index);
    if (m_query_entry == nullptr)
      {
    return ER_FAILED;
      }
    h = m_query_entry->xasl_id.sha1.h[0]|m_query_entry->xasl_id.sha1.h[1]|m_query_entry->xasl_id.sha1.h[2]|m_query_entry->xasl_id.sha1.h[3]|m_query_entry->xasl_id.sha1.h[4];
    if (h == 0)
      {
    m_uses_xasl_clone = false;

    THREAD_ENTRY *main_thread_p = thread_get_main_thread (m_thread_p);
    if (main_thread_p->xasl_unpack_info_ptr)
      {
        /* use unpack info ptr for execute. */
      }
    else
      {
        assert (false);
        return ER_FAILED;
      }
      }
    else
      {
    m_uses_xasl_clone = true;
      }
    new_vd = (VAL_DESCR *) db_private_alloc (m_thread_p, sizeof (VAL_DESCR));
    if (new_vd == nullptr)
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (VAL_DESCR));
    return ER_FAILED;
      }
    memcpy (new_vd, m_orig_vd, sizeof (VAL_DESCR));
    if (m_orig_vd->dbval_cnt > 0)
      {
    new_vd->dbval_ptr = (DB_VALUE *) db_private_alloc (m_thread_p, sizeof (DB_VALUE) * m_orig_vd->dbval_cnt);
    if (new_vd->dbval_ptr == nullptr)
      {
        db_private_free (m_thread_p, new_vd);
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
            sizeof (DB_VALUE) * m_orig_vd->dbval_cnt);
        return ER_FAILED;
      }
    for (int i = 0; i < m_orig_vd->dbval_cnt; i++)
      {
        pr_clone_value (&m_orig_vd->dbval_ptr[i], &new_vd->dbval_ptr[i]);
      }
      }
    m_vd = new_vd;
    m_input_handler = (input_handler_t *) db_private_alloc (m_thread_p, sizeof (input_handler_t));
    if (m_input_handler == nullptr)
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (input_handler_t));
    return ER_FAILED;
      }
    m_input_handler = placement_new ((input_handler_t *) m_input_handler, &m_interrupt, &m_err_messages);

    {
      int init_err;
      if constexpr (ST == SCAN_TYPE::LIST)
    {
      init_err = m_input_handler->init_on_main (m_thread_p, m_list_id, m_parallelism);
    }
      else if constexpr (ST == SCAN_TYPE::INDEX)
    {
      init_err = m_input_handler->init_on_main (m_thread_p, m_indx_info, m_scan_id, m_vd, m_parallelism);
    }
      else
    {
      init_err = m_input_handler->init_on_main (m_thread_p, m_hfid, m_parallelism);
    }
      if (init_err != NO_ERROR)
    {
      m_input_handler->~input_handler_t ();
      db_private_free_and_init (m_thread_p, m_input_handler);
      return ER_FAILED;
    }
    }

    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    m_result_handler = (result_handler<RESULT_TYPE::MERGEABLE_LIST> *) db_private_alloc (m_thread_p,
               sizeof (result_handler<RESULT_TYPE::MERGEABLE_LIST>));
    if (m_result_handler == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
            sizeof (result_handler<RESULT_TYPE::MERGEABLE_LIST>));
        return ER_FAILED;
      }
    if (m_xasl->type == BUILDLIST_PROC && m_xasl->proc.buildlist.g_agg_list != NULL &&
        !m_xasl->proc.buildlist.g_agg_domains_resolved)
      {
        m_g_agg_domain_resolve_need = true;
      }
    m_result_handler = placement_new ((result_handler<RESULT_TYPE::MERGEABLE_LIST> *) m_result_handler, m_query_id,
                      &m_interrupt, &m_err_messages, m_parallelism, m_g_agg_domain_resolve_need, m_xasl);
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    m_result_handler = (result_handler<RESULT_TYPE::XASL_SNAPSHOT> *) db_private_alloc (m_thread_p,
               sizeof (result_handler<RESULT_TYPE::XASL_SNAPSHOT>));
    if (m_result_handler == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
            sizeof (result_handler<RESULT_TYPE::XASL_SNAPSHOT>));
        return ER_FAILED;
      }
    m_result_handler = placement_new ((result_handler<RESULT_TYPE::XASL_SNAPSHOT> *) m_result_handler, m_query_id,
                      &m_interrupt, &m_err_messages, m_parallelism, m_g_agg_domain_resolve_need, m_xasl);
      }
    else if constexpr (result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
    m_result_handler = (result_handler<RESULT_TYPE::BUILDVALUE_OPT> *) db_private_alloc (m_thread_p,
               sizeof (result_handler<RESULT_TYPE::BUILDVALUE_OPT>));
    if (m_result_handler == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
            sizeof (result_handler<RESULT_TYPE::BUILDVALUE_OPT>));
        return ER_FAILED;
      }
    m_result_handler = placement_new ((result_handler<RESULT_TYPE::BUILDVALUE_OPT> *) m_result_handler, m_query_id,
                      &m_interrupt, &m_err_messages, m_parallelism, m_xasl->proc.buildvalue.agg_list);
      }
    else
      {
    assert (false);
    return ER_FAILED;
      }
    m_on_trace = m_thread_p->on_trace;
    if (m_thread_p->m_px_orig_thread_entry == NULL)
      {
    m_thread_p->m_px_orig_thread_entry = m_thread_p;
      }
    if (m_on_trace)
      {
    if (m_thread_p->m_px_orig_thread_entry != m_thread_p)
      {
        /* this is child thread, so we need to use px_stats */
        if (m_thread_p->m_uses_px_stats)
          {
        /* already initialized */
        m_px_stats_initialized_by_me = false;
          }
        else
          {
        /* not initialized - cannot be happened */
        assert (false);
        perfmon_initialize_parallel_stats (m_thread_p);
        m_px_stats_initialized_by_me = true;
          }
      }
    else
      {
        /* this is main thread */
        if (m_thread_p->m_uses_px_stats)
          {
        /* already initialized */
        m_px_stats_initialized_by_me = false;
          }
        else
          {
        /* not initialized */
        perfmon_initialize_parallel_stats (m_thread_p);
        m_thread_p->m_uses_px_stats = false;
        m_px_stats_initialized_by_me = true;
          }
      }
    m_trace_handler.m_trace_storage_for_sibling_xasl.set_main_xasl_tree (m_xasl);
      }
    m_result_handler_read_initialized = false;
    m_task_started = false;
    m_interrupt.clear();

    return NO_ERROR;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int manager<result_type, ST>::start_tasks()
  {
    for (int i = 0; i < m_parallelism; i++)
      {
    task<result_type, ST> *task_p = (task<result_type, ST> *) malloc (sizeof (task<result_type, ST>));
    if (task_p == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (task<result_type, ST>));
        return ER_FAILED;
      }
    trace_handler *trace_handler_p = m_on_trace ? &m_trace_handler : nullptr;
    task_p = placement_new ((task<result_type, ST> *) task_p, m_thread_p, m_query_entry, m_result_handler,
                m_input_handler, &m_interrupt, &m_err_messages, m_vd, trace_handler_p, m_worker_manager, m_xasl->header.id, m_hfid,
                m_cls_oid, m_is_fixed,
                m_is_grouped, m_uses_xasl_clone, m_xasl, &m_join_info);
    m_worker_manager->push_task (task_p);
      }
    m_task_started = true;
    return NO_ERROR;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  SCAN_CODE manager<result_type, ST>::next()
  {
    SCAN_CODE scan_code = S_SUCCESS;
    int err_code = NO_ERROR;

    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    XASL_NODE *xptr;
    for (xptr = m_xasl; xptr != nullptr; xptr = xptr->scan_ptr)
      {
        if (xptr->val_list == nullptr)
          {
        continue;
          }
        QPROC_DB_VALUE_LIST valp = xptr->val_list->valp;
        for (int i=0; i<xptr->val_list->val_cnt; i++)
          {
        pr_clear_value (valp->val);
        valp = valp->next;
          }
      }
      }

    if (unlikely (!m_task_started))
      {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST || result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
        if (m_xasl->scan_ptr)
          {
        m_join_info.capture_join_info (m_xasl);
        for (XASL_NODE *xptr = m_xasl->scan_ptr; xptr; xptr=xptr->scan_ptr)
          {
            if (xptr->spec_list && xptr->spec_list->type == TARGET_LIST)
              {
            scan_end_scan (m_thread_p, &xptr->spec_list->s_id);
              }
          }
          }
      }
    err_code = start_tasks();
    if (err_code != NO_ERROR)
      {
        return S_ERROR;
      }
      }

    if (m_result_handler_read_initialized == false)
      {
    m_result_handler->read_initialize (m_thread_p);
    m_result_handler_read_initialized = true;
      }

    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    scan_code = m_result_handler->read (m_thread_p, m_xasl->list_id);
    if (scan_code == S_ERROR)
      {
        if (m_interrupt.get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
          {
        m_interrupt.set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_MAIN_THREAD);
          }
        if (m_worker_manager != nullptr)
          {
        m_worker_manager->release_workers ();
        m_worker_manager = nullptr;
          }
      }

    if (m_xasl->scan_ptr)
      {
        m_join_info.apply_join_info (m_xasl);
      }

    XASL_NODE *xptr = m_xasl;

    for (xptr = m_xasl; xptr != nullptr; xptr = xptr->scan_ptr)
      {
        if (xptr->val_list == nullptr)
          {
        continue;
          }
        std::vector<DB_VALUE> dbval_container (xptr->val_list->val_cnt);
        QPROC_DB_VALUE_LIST valp = xptr->val_list->valp;
        for (int i = 0; i < xptr->val_list->val_cnt; i++)
          {
        pr_clone_value (valp->val, &dbval_container[i]);
        valp = valp->next;
          }

        HL_HEAPID heap_id = db_change_private_heap (m_thread_p, 0);
        valp = xptr->val_list->valp;
        for (int i = 0; i < xptr->val_list->val_cnt; i++)
          {
        pr_clear_value (valp->val);
        valp = valp->next;
          }
        db_change_private_heap (m_thread_p, heap_id);
        valp = xptr->val_list->valp;
        for (int i=0; i<xptr->val_list->val_cnt; i++)
          {
        pr_clone_value (&dbval_container[i], valp->val);
        pr_clear_value (&dbval_container[i]);
        valp = valp->next;
          }
      }

    fetch_val_list (m_thread_p, m_xasl->outptr_list->valptrp, m_vd, nullptr, nullptr, NULL, true);
    if (m_g_agg_domain_resolve_need)
      {
        qexec_resolve_domains_for_aggregation_for_parallel_heap_scan_g_agg (m_thread_p, m_xasl, m_vd,
        &m_xasl->proc.buildlist.g_agg_domains_resolved);
      }
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    scan_code = m_result_handler->read (m_thread_p, m_xasl->val_list);
      }
    else if constexpr (result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
    scan_code = m_result_handler->read (m_thread_p, m_xasl->proc.buildvalue.agg_list);
    if (m_xasl->scan_ptr)
      {
        m_join_info.apply_join_info (m_xasl);
      }
      }
    else
      {
    assert (false);
    return S_ERROR;
      }

    if (unlikely (scan_code == S_ERROR))
      {
    if (m_interrupt.get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
      {
        m_interrupt.set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_MAIN_THREAD);
      }
    if (m_worker_manager != nullptr)
      {
        m_worker_manager->release_workers ();
        m_worker_manager = nullptr;
      }
      }

    if (unlikely (m_interrupt.get_code() != parallel_query::interrupt::interrupt_code::NO_INTERRUPT))
      {
    switch (m_interrupt.get_code())
      {
      case parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_MAIN_THREAD:
      case parallel_query::interrupt::interrupt_code::USER_INTERRUPTED_FROM_MAIN_THREAD:
        break;
      case parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD:
      case parallel_query::interrupt::interrupt_code::USER_INTERRUPTED_FROM_WORKER_THREAD:
      {
        /* drain workers so every err_messages.push_back is visible before reading [0] */
        if (m_worker_manager != nullptr)
          {
        m_worker_manager->wait_workers ();
          }
        std::lock_guard<std::mutex> lock (m_err_messages.m_mutex);
        if (!m_err_messages.m_error_messages.empty ())
          {
        cuberr::context::get_thread_local_error().swap (*m_err_messages.m_error_messages[0]);
          }
        return S_ERROR;
      }
      break;
      case parallel_query::interrupt::interrupt_code::JOB_ENDED:
      {
        return S_END;
      }
      break;
      default:
        break;
      }
      }
    return scan_code;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  void manager<result_type, ST>::wait_for_workers ()
  {
    /* signal JOB_ENDED + drain workers; idempotent */
    if (m_interrupt.get_code() != parallel_query::interrupt::interrupt_code::JOB_ENDED)
      {
    m_interrupt.set_code (parallel_query::interrupt::interrupt_code::JOB_ENDED);
      }
    if (m_worker_manager != nullptr)
      {
    m_worker_manager->wait_workers ();
      }
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int manager<result_type, ST>::reset ()
  {
    int err_code = NO_ERROR;

    /* callers must drain workers via wait_for_workers() before merge_stats / add_stats; reset is finalize-only */
    m_result_handler->read_finalize (m_thread_p);

    /* Clean up input handler */
    if (m_input_handler != nullptr)
      {
    if constexpr (ST == SCAN_TYPE::LIST)
      {
        m_input_handler->cleanup_on_main (m_thread_p);
      }
    else if constexpr (ST == SCAN_TYPE::INDEX)
      {
        m_input_handler->cleanup_keys (m_thread_p);
      }
    m_input_handler->~input_handler_t ();
    db_private_free (m_thread_p, m_input_handler);
    m_input_handler = nullptr;
      }

    /* Clean up result handler */
    if (m_result_handler != nullptr)
      {
    m_result_handler->~result_handler ();
    db_private_free (m_thread_p, m_result_handler);
    m_result_handler = nullptr;
      }

    /* Clean up previous value descriptor */
    if (m_vd != nullptr)
      {
    if (m_vd->dbval_cnt > 0)
      {
        for (int i = 0; i < m_vd->dbval_cnt; i++)
          {
        pr_clear_value (&m_vd->dbval_ptr[i]);
          }
        db_private_free (m_thread_p, m_vd->dbval_ptr);
      }
    db_private_free (m_thread_p, m_vd);
    m_vd = nullptr;
      }

    m_trace_handler.clear();

    /* Open and initialize */
    err_code = open ();
    if (err_code != NO_ERROR)
      {
    if (m_worker_manager != nullptr)
      {
        m_worker_manager->release_workers ();
        m_worker_manager = nullptr;
      }
    return err_code;
      }

    /* Finalize setup */
    if constexpr (ST == SCAN_TYPE::LIST)
      {
    m_scan_id->s.pllsid_parallel.manager = this;
      }
    else if constexpr (ST == SCAN_TYPE::INDEX)
      {
    m_scan_id->s.pisid.manager = this;
      }
    else
      {
    m_scan_id->s.phsid.manager = this;
      }
    return NO_ERROR;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int manager<result_type, ST>::merge_stats()
  {
    int error = NO_ERROR;

    if (m_on_trace)
      {
    if (m_thread_p->m_px_orig_thread_entry != m_thread_p)
      {
        /* child thread */
        if (m_px_stats_initialized_by_me)
          {
        perfmon_destroy_parallel_stats (m_thread_p);
        assert (false);
        error = ER_FAILED;
          }
      }
    else
      {
        /* main thread */
        if (m_px_stats_initialized_by_me)
          {
        perfmon_destroy_parallel_stats (m_thread_p);
          }
      }

    m_trace_handler.merge_stats (m_thread_p, &m_scan_id->scan_stats);
      }

    return error;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int manager<result_type, ST>::end()
  {
    int err_code = NO_ERROR;
    if (m_interrupt.get_code() != parallel_query::interrupt::interrupt_code::JOB_ENDED)
      {
    m_interrupt.set_code (parallel_query::interrupt::interrupt_code::JOB_ENDED);
      }
    if (m_worker_manager != nullptr)
      {
    m_worker_manager->release_workers ();
    m_worker_manager = nullptr;
      }
    err_code = merge_stats();
    m_result_handler->read_finalize (m_thread_p);
    return err_code;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int manager<result_type, ST>::close()
  {
    THREAD_ENTRY *thread_p = m_thread_p;
    if constexpr (ST == SCAN_TYPE::LIST)
      {
    m_scan_id->s.pllsid_parallel.manager = nullptr;
      }
    else if constexpr (ST == SCAN_TYPE::INDEX)
      {
    m_scan_id->s.pisid.manager = nullptr;
      }
    else
      {
    m_scan_id->s.phsid.manager = nullptr;
      }
    this->~manager();
    db_private_free (thread_p, this);
    return NO_ERROR;
  }

  /* Explicit template instantiations */
  template class manager<RESULT_TYPE::MERGEABLE_LIST, SCAN_TYPE::HEAP>;
  template class manager<RESULT_TYPE::XASL_SNAPSHOT, SCAN_TYPE::HEAP>;
  template class manager<RESULT_TYPE::BUILDVALUE_OPT, SCAN_TYPE::HEAP>;

  template class manager<RESULT_TYPE::MERGEABLE_LIST, SCAN_TYPE::LIST>;
  template class manager<RESULT_TYPE::BUILDVALUE_OPT, SCAN_TYPE::LIST>;

  template class manager<RESULT_TYPE::MERGEABLE_LIST, SCAN_TYPE::INDEX>;
  template class manager<RESULT_TYPE::BUILDVALUE_OPT, SCAN_TYPE::INDEX>;
}