Skip to content

File px_scan_task.cpp

File List > cubrid > src > query > parallel > px_scan > px_scan_task.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_scan_task.cpp - derived from cubthread::entry_task
 */

#include "px_scan_task.hpp"
#include "error_code.h"
#include "error_manager.h"
#include "heap_file.h"
#include "storage_common.h"
#include "xasl.h"
#include "xasl_cache.h"
#include "xasl_iteration.hpp"
#include "query_executor.h"
#include "stream_to_xasl.h"
#include "xasl_unpack_info.hpp"
#include "memoize.hpp"
#include "scan_manager.h"
#include "partition_sr.h"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_scan
{
  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  void task<result_type, ST>::execute (cubthread::entry &thread_ref)
  {
    int err_code;
    err_code = initialize (thread_ref);
    if (err_code != NO_ERROR)
      {
    m_err_messages->move_top_error_message_to_this();
    m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
    return;
      }
    loop (thread_ref);
    finalize (thread_ref);
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  void task<result_type, ST>::retire()
  {
    m_worker_manager->pop_task();
    /* paired with malloc + placement_new in manager::start_tasks() */
    this->~task ();
    free (this);
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  task<result_type, ST>::~task()
  {
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int task<result_type, ST>::initialize (cubthread::entry &thread_ref)
  {
    int err_code = NO_ERROR;
    HEAP_SCAN_ID *hsidp;
    access_spec_node *spec;
    CLS_SPEC_TYPE *cls;
    int level;
    xasl_node *xptr;
    ACCESS_SPEC_TYPE *spec_ptr;
    bool fixed_scan = false;
    bool partition_pruned = false;

    thread_ref.m_px_orig_thread_entry = m_parent_thread_p;
    thread_ref.conn_entry = m_parent_thread_p->conn_entry;
    thread_ref.tran_index = m_parent_thread_p->tran_index;
    thread_ref.on_trace = m_parent_thread_p->on_trace;

    if (thread_ref.on_trace)
      {
    perfmon_initialize_parallel_stats (&thread_ref);
    tsc_getticks (&m_start_tick);
      }

    err_code = clone_xasl (thread_ref);
    if (err_code != NO_ERROR)
      {
    return err_code;
      }
    if constexpr (ST != SCAN_TYPE::LIST)
      {
    hsidp = &m_scan_id->s.hsid;
      }
    m_scan_id->vd = m_vd;
    spec = m_xasl->spec_list;
    cls = &spec->s.cls_node;
    m_xasl->curr_spec = m_xasl->spec_list;

    for (xptr = m_xasl, level = 0; xptr != NULL; xptr = xptr->scan_ptr, level++)
      {
    spec_ptr = xptr->spec_list;
    if (level == 0)
      {
        if constexpr (ST == SCAN_TYPE::LIST)
          {
        LIST_SPEC_TYPE *list_node = &spec->s.list_node;
        err_code = scan_open_list_scan (&thread_ref, m_scan_id,
                        false, spec->single_fetch, spec->s_dbval,
                        m_xasl->val_list, m_vd,
                        m_input_handler->get_list_id (),
                        list_node->list_regu_list_pred, spec->where_pred,
                        list_node->list_regu_list_rest, list_node->list_regu_list_build,
                        list_node->list_regu_list_probe,
                        list_node->hash_list_scan_yn, false);
        if (err_code != NO_ERROR)
          {
            return err_code;
          }
        err_code = scan_start_scan (&thread_ref, m_scan_id);
          }
        else if constexpr (ST == SCAN_TYPE::INDEX)
          {
        /* clone_xasl restores parent compile-time BTID from XASL stream; override with partition BTID */
        if (spec->indexptr != nullptr && m_input_handler != nullptr)
          {
            INDX_INFO *part_indx_info = m_input_handler->get_indx_info ();
            if (part_indx_info != nullptr)
              {
            BTID_COPY (&spec->indexptr->btid, &part_indx_info->btid);
              }
          }
        bool iscan_oid_order = m_scan_id->s.isid.iscan_oid_order;
        err_code = scan_open_index_scan (&thread_ref, m_scan_id, false, S_SELECT,
                         m_is_fixed, m_is_grouped, spec->single_fetch, spec->s_dbval,
                         m_xasl->val_list, m_vd, spec->indexptr, &m_cls_oid, &m_hfid,
                         cls->cls_regu_list_key, spec->where_key,
                         cls->cls_regu_list_pred, spec->where_pred,
                         cls->cls_regu_list_rest, spec->where_range,
                         cls->cls_regu_list_range, cls->cls_output_val_list,
                         cls->cls_regu_val_list, cls->num_attrs_key,
                         cls->attrids_key, cls->cache_key,
                         cls->num_attrs_pred, cls->attrids_pred, cls->cache_pred,
                         cls->num_attrs_rest, cls->attrids_rest, cls->cache_rest,
                         cls->num_attrs_range, cls->attrids_range, cls->cache_range,
                         iscan_oid_order, m_query_entry->query_id,
                         ACCESS_SPEC_IS_FLAGED (spec, ACCESS_SPEC_FLAG_ONLY_MIN_MAX_SCAN));
        if (err_code != NO_ERROR)
          {
            return err_code;
          }
        /* scan_start_scan initializes scan caches and attr info required by slot_iterator_index for leaf page processing. */
        err_code = scan_start_scan (&thread_ref, m_scan_id);
          }
        else
          {
        scan_open_heap_scan (&thread_ref, m_scan_id, false, S_SELECT,
                     m_is_fixed, m_is_grouped, spec->single_fetch, spec->s_dbval,
                     m_xasl->val_list, m_vd, &m_cls_oid, &m_hfid,
                     cls->cls_regu_list_pred, spec->where_pred, cls->cls_regu_list_rest,
                     cls->num_attrs_pred, cls->attrids_pred, cls->cache_pred,
                     cls->num_attrs_rest, cls->attrids_rest, cls->cache_rest,
                     S_HEAP_SCAN, cls->cache_reserved, cls->cls_regu_list_reserved);
        err_code = scan_start_scan (&thread_ref, m_scan_id);
          }
      }
    else
      {
        if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST || result_type == RESULT_TYPE::BUILDVALUE_OPT)
          {
        scan_info scan_info = m_join_info->get_scan_info (xptr->header.id);
        ACCESS_SPEC_TYPE *specp = xptr->curr_spec? xptr->curr_spec : xptr->spec_list;
        xptr->curr_spec = xptr->spec_list;
        if (spec_ptr->type == TARGET_CLASS && IS_ANY_INDEX_ACCESS (spec_ptr->access)
            && qfile_is_sort_list_covered (xptr->after_iscan_list, xptr->orderby_list))
          {
            spec_ptr->grouped_scan = false;
          }

        if (specp->type == TARGET_LIST)
          {
            assert_release_error (scan_info.list_id != NULL);
            if (er_errid() != NO_ERROR)
              {
            return er_errid();
              }
            err_code =
                scan_open_list_scan (&thread_ref, &specp->s_id, specp->s_id.grouped, specp->single_fetch, specp->s_dbval,xptr->val_list,
                         m_vd,scan_info.list_id, specp->s.list_node.list_regu_list_pred,specp->where_pred,
                         specp->s.list_node.list_regu_list_rest,specp->s.list_node.list_regu_list_build, specp->s.list_node.list_regu_list_probe,
                         specp->s.list_node.hash_list_scan_yn,true);
            if (err_code != NO_ERROR)
              {
            return err_code;
              }
          }
        else if (specp->type == TARGET_CLASS)
          {
            if (xptr->scan_ptr == NULL)
              {
            fixed_scan = true;
              }

            if (thread_ref.on_trace && HFID_EQ (&xptr->curr_spec->s.cls_node.hfid, &scan_info.hfid) == false)
              {
            err_code = partition_prune_spec (&thread_ref, m_vd, xptr->curr_spec);
            if (err_code != NO_ERROR)
              {
                return err_code;
              }
            /* prune partition stats */
            for (PARTITION_SPEC_TYPE *part_spec = xptr->curr_spec->parts; part_spec != NULL; part_spec = part_spec->next)
              {
                if (HFID_EQ (&part_spec->hfid, &scan_info.hfid))
                  {
                specp->s_id.partition_stats = &part_spec->scan_stats;
                partition_pruned = true;
                break;
                  }
              }
              }

            switch (specp->access)
              {
              case ACCESS_METHOD_SEQUENTIAL:
              {
            err_code = scan_open_heap_scan (&thread_ref, &specp->s_id, false,
                            S_SELECT, fixed_scan, specp->s_id.grouped,
                            specp->single_fetch, specp->s_dbval, xptr->val_list, m_vd,
                            &scan_info.oid, &scan_info.hfid, specp->s.cls_node.cls_regu_list_pred, specp->where_pred,
                            specp->s.cls_node.cls_regu_list_rest, specp->s.cls_node.num_attrs_pred,
                            specp->s.cls_node.attrids_pred, specp->s.cls_node.cache_pred,
                            specp->s.cls_node.num_attrs_rest, specp->s.cls_node.attrids_rest,
                            specp->s.cls_node.cache_rest, S_HEAP_SCAN, specp->s.cls_node.cache_reserved,
                            specp->s.cls_node.cls_regu_list_reserved);
            if (err_code != NO_ERROR)
              {
                return err_code;
              }
              }
              break;
              case ACCESS_METHOD_INDEX:
              {
            QUERY_ID query_id = m_query_entry->query_id;
            bool iscan_oid_order = specp->s_id.s.isid.iscan_oid_order;
            specp->indexptr->btid = scan_info.btid;
            err_code =
                scan_open_index_scan (&thread_ref, &specp->s_id, false,
                              S_SELECT, fixed_scan, specp->s_id.grouped,
                              specp->single_fetch, specp->s_dbval, xptr->val_list, m_vd,
                              specp->indexptr, &scan_info.oid, &scan_info.hfid, specp->s.cls_node.cls_regu_list_key,
                              specp->where_key, specp->s.cls_node.cls_regu_list_pred, specp->where_pred,
                              specp->s.cls_node.cls_regu_list_rest, specp->where_range,
                              specp->s.cls_node.cls_regu_list_range, specp->s.cls_node.cls_output_val_list,
                              specp->s.cls_node.cls_regu_val_list, specp->s.cls_node.num_attrs_key,
                              specp->s.cls_node.attrids_key, specp->s.cls_node.cache_key,
                              specp->s.cls_node.num_attrs_pred, specp->s.cls_node.attrids_pred,
                              specp->s.cls_node.cache_pred, specp->s.cls_node.num_attrs_rest,
                              specp->s.cls_node.attrids_rest, specp->s.cls_node.cache_rest,
                              specp->s.cls_node.num_attrs_range, specp->s.cls_node.attrids_range,
                              specp->s.cls_node.cache_range, iscan_oid_order, query_id,
                              ACCESS_SPEC_IS_FLAGED (specp, ACCESS_SPEC_FLAG_ONLY_MIN_MAX_SCAN));
            if (err_code != NO_ERROR)
              {
                return S_ERROR;
              }
              }
              break;
              case ACCESS_METHOD_SEQUENTIAL_RECORD_INFO:
              case ACCESS_METHOD_SEQUENTIAL_SAMPLING_SCAN:
              case ACCESS_METHOD_SEQUENTIAL_PAGE_SCAN:
              case ACCESS_METHOD_JSON_TABLE:
              case ACCESS_METHOD_SCHEMA:
              case ACCESS_METHOD_INDEX_KEY_INFO:
              case ACCESS_METHOD_INDEX_NODE_INFO:
              default:
            assert (false);
            break;
              }
          }
        else
          {
            assert_release_error (false);
            return ER_FAILED;
          }

        err_code = scan_start_scan (&thread_ref, &specp->s_id);
        if (err_code != NO_ERROR)
          {
            return err_code;
          }

        err_code = new_memoize_storage (&thread_ref, xptr);
        if (err_code != NO_ERROR)
          {
            return err_code;
          }

        if (thread_ref.on_trace && partition_pruned)
          {
            specp->s_id.partition_stats->covered_index = specp->s_id.scan_stats.covered_index;
            specp->s_id.partition_stats->multi_range_opt = specp->s_id.scan_stats.multi_range_opt;
            specp->s_id.partition_stats->index_skip_scan = specp->s_id.scan_stats.index_skip_scan;
            specp->s_id.partition_stats->loose_index_scan = specp->s_id.scan_stats.loose_index_scan;
            specp->s_id.partition_stats->noscan = specp->s_id.scan_stats.noscan;

            /* SCAN_STATS for DB_PARTITION_CLASS does not support AGL (Aggregate Lookup Optimization). */
            specp->s_id.partition_stats->agl = NULL;
            partition_pruned = false;
          }
          }
      }
      }

    if (err_code != NO_ERROR)
      {
    return err_code;
      }

    if (level > 1)
      {
    m_scan_func_ptr = (UINTPTR *)db_private_alloc (&thread_ref, level * sizeof (UINTPTR));
    for (int i = 0; i < level; i++)
      {
        m_scan_func_ptr[i] = (UINTPTR)NULL;
      }
      }
    else
      {
    m_scan_func_ptr = nullptr;
      }

    m_slot_iterator.initialize (&thread_ref, m_scan_id, m_vd);
    if constexpr (ST == SCAN_TYPE::INDEX)
      {
    m_slot_iterator.set_input_handler (m_input_handler);
    m_input_handler->initialize (&thread_ref, nullptr, m_scan_id);
      }
    else if constexpr (ST == SCAN_TYPE::LIST)
      {
    m_input_handler->initialize (&thread_ref, nullptr, m_scan_id);
      }
    else
      {
    m_input_handler->initialize (&thread_ref, &hsidp->hfid, m_scan_id);
      }
    if constexpr (result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
    m_result_handler->write_initialize (&thread_ref, m_xasl->outptr_list, m_xasl->proc.buildvalue.agg_list, m_vd, m_xasl);
      }
    else
      {
    m_result_handler->write_initialize (&thread_ref, m_xasl->outptr_list, m_xasl, m_vd);
      }
    if (er_errid () != NO_ERROR)
      {
    return er_errid ();
      }
    return NO_ERROR;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int task<result_type, ST>::finalize (cubthread::entry &thread_ref)
  {
    THREAD_ENTRY *main_thread_p = thread_get_main_thread (m_parent_thread_p);
    xasl_node *xptr;
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST || result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
    if (m_scan_func_ptr != nullptr)
      {
        db_private_free_and_init (&thread_ref, m_scan_func_ptr);
      }
      }

    if (thread_ref.on_trace)
      {
    TSC_TICKS end_tick;
    TSCTIMEVAL tv_diff;
    struct timeval elapsed_time = {0, 0};
    tsc_getticks (&end_tick);
    tsc_elapsed_time_usec (&tv_diff, end_tick, m_start_tick);
    TSC_ADD_TIMEVAL (elapsed_time, tv_diff);
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST || result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
        m_trace_handler->m_trace_storage_for_sibling_xasl.merge_xasl_tree (m_xasl);
      }
    m_trace_handler->add_trace (perfmon_get_from_statistic (&thread_ref, PSTAT_PB_NUM_FETCHES),
                    perfmon_get_from_statistic (&thread_ref, PSTAT_PB_NUM_IOREADS),
                    perfmon_get_from_statistic (&thread_ref,PSTAT_PB_PAGE_FIX_ACQUIRE_TIME_10USEC),
                    m_scan_id,
                    elapsed_time);
    perfmon_destroy_parallel_stats (&thread_ref);
      }
    m_result_handler->write_finalize (&thread_ref);
    m_input_handler->finalize (&thread_ref);
    m_slot_iterator.finalize (&thread_ref);

    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST || result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
    for (xptr = m_xasl; xptr != NULL; xptr = xptr->scan_ptr)
      {
        if (xptr->spec_list->type == TARGET_CLASS && xptr->spec_list->parts != NULL)
          {
        xptr->spec_list->curent = NULL;

        /* init btid */
        if (xptr->spec_list->indexptr)
          {
            BTID_COPY (&xptr->spec_list->indexptr->btid, &xptr->spec_list->btid);
          }
          }

        m_join_info->record_join_info (xptr->header.id, xptr);

      }

      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    scan_end_scan (&thread_ref, m_scan_id);
    scan_close_scan (&thread_ref, m_scan_id);
      }

    for (int i = 0; i < m_vd->dbval_cnt; i++)
      {
    pr_clear_value (&m_vd->dbval_ptr[i]);
      }

    db_private_free (&thread_ref, m_vd->dbval_ptr);
    db_private_free (&thread_ref, m_xasl_state);
    qexec_clear_xasl (&thread_ref, m_xasl, true, false);

    pthread_mutex_lock (&main_thread_p->m_px_lock_mutex);
    if (m_uses_xasl_clone)
      {
    xcache_retire_clone (&thread_ref, m_xasl_cache_entry, &m_xasl_clone);
    xcache_unfix (&thread_ref, m_xasl_cache_entry);
      }
    else
      {
    if (m_xasl_unpack_info)
      {
        /* free the XASL tree */
        free_xasl_unpack_info (&thread_ref, m_xasl_unpack_info);
      }
      }
    pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);

    return NO_ERROR;
  }

  static void clear_xasl_dptr_node (THREAD_ENTRY *thread_p, XASL_NODE *xaslp, bool uses_clones)
  {
    if (uses_clones)
      {
    if (XASL_IS_FLAGED (xaslp, XASL_DECACHE_CLONE))
      {
        xaslp->status = XASL_CLEARED;
      }
    else
      {
        /* The values allocated during execution will be cleared and the xasl is reused. */
        xaslp->status = XASL_INITIALIZED;
      }
      }
    else
      {
    xaslp->status = XASL_CLEARED;
      }
    if (xaslp->list_id->tuple_cnt > 0)
      {
    qfile_truncate_list (thread_p, xaslp->list_id);
      }
    if (xaslp->single_tuple)
      {
    QPROC_DB_VALUE_LIST value_list;
    int i;
    for (value_list = xaslp->single_tuple->valp, i = 0; i < xaslp->single_tuple->val_cnt;
         value_list = value_list->next, i++)
      {
        pr_clear_value (value_list->val);
      }
      }
  }

  /* walk the full scan_ptr chain; mainline qexec_clear_scan_all_lists does the same. */
  static void clear_xasl_dptr_list (THREAD_ENTRY *thread_p, XASL_NODE *xasl, bool uses_clones)
  {
    for (XASL_NODE *scan_xasl = xasl; scan_xasl != nullptr; scan_xasl = scan_xasl->scan_ptr)
      {
    for (XASL_NODE *xaslp = scan_xasl->dptr_list; xaslp != nullptr; xaslp = xaslp->next)
      {
        clear_xasl_dptr_node (thread_p, xaslp, uses_clones);
      }
      }
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  int task<result_type, ST>::clone_xasl (cubthread::entry &thread_ref)
  {
    THREAD_ENTRY *main_thread_p = thread_get_main_thread (m_parent_thread_p);
    int err_code = NO_ERROR;
    int i;

    if (m_uses_xasl_clone)
      {
    pthread_mutex_lock (&main_thread_p->m_px_lock_mutex);
    err_code = xcache_find_xasl_id_for_execute (&thread_ref, &m_query_entry->xasl_id, &m_xasl_cache_entry, &m_xasl_clone);
    if (err_code != NO_ERROR)
      {
        pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);
        return err_code;
      }
    m_xasl = xasl_find_by_id (m_xasl_clone.xasl, m_xasl_id);
    if (m_xasl == nullptr)
      {
        pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_QPROC_INVALID_XASLNODE, 0);
        return ER_FAILED;
      }
    pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);
      }
    else
      {
    pthread_mutex_lock (&main_thread_p->m_px_lock_mutex);
    err_code = stx_map_stream_to_xasl (&thread_ref, &m_xasl_tree, false, main_thread_p->xasl_unpack_info_ptr->packed_xasl,
                       main_thread_p->xasl_unpack_info_ptr->packed_size, &m_xasl_unpack_info);
    if (err_code != NO_ERROR)
      {
        pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);
        return err_code;
      }
    m_xasl = xasl_find_by_id (m_xasl_tree, m_xasl_id);
    if (m_xasl == nullptr)
      {
        pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_QPROC_INVALID_XASLNODE, 0);
        return ER_FAILED;
      }
    pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);
      }

    m_scan_id = &m_xasl->spec_list->s_id;

    m_xasl_state = (xasl_state *) db_private_alloc (&thread_ref, sizeof (xasl_state));
    if (m_xasl_state == nullptr)
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
    return ER_FAILED;
      }
    m_xasl_state->qp_xasl_line = m_orig_vd->xasl_state->qp_xasl_line;
    m_xasl_state->query_id = m_orig_vd->xasl_state->query_id;
    m_vd = &m_xasl_state->vd;
    memcpy (m_vd, m_orig_vd, sizeof (val_descr));
    m_vd->xasl_state = m_xasl_state;
    if (m_orig_vd->dbval_cnt > 0)
      {
    m_vd->dbval_ptr = (DB_VALUE *) db_private_alloc (&thread_ref, sizeof (DB_VALUE) * m_orig_vd->dbval_cnt);
    if (m_vd->dbval_ptr == nullptr)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 0);
        db_private_free_and_init (&thread_ref, m_xasl_state);
        m_vd = nullptr;
        return ER_FAILED;
      }
    for (i = 0; i < m_orig_vd->dbval_cnt; i++)
      {
        pr_clone_value (&m_orig_vd->dbval_ptr[i], &m_vd->dbval_ptr[i]);
      }
      }
    return NO_ERROR;
  }

  /* Shared OID-drain helper: leaf-path + late-joiner. Returns S_END on completion, S_ERROR with stop=true on terminal failure. */
  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  SCAN_CODE task<result_type, ST>::drain_slot_oids (cubthread::entry &thread_ref, bool &stop)
  {
    SCAN_CODE scan_code, xs_scan;
    bool uses_clones = xcache_uses_clones ();
    DB_LOGICAL ev_res;
    result_handler<result_type> *result_handler_p = m_result_handler;

    while (!stop)
      {
    scan_code = m_slot_iterator.next_qualified_slot_with_peek (&thread_ref);
    if (scan_code == S_END)
      {
        return S_END;
      }
    if (scan_code == S_ERROR)
      {
        m_err_messages->move_top_error_message_to_this();
        m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        stop = true;
        return S_ERROR;
      }

    if (m_xasl->if_pred)
      {
        ev_res = eval_pred (&thread_ref, m_xasl->if_pred, m_vd, NULL);
        if (ev_res != V_TRUE)
          {
        clear_xasl_dptr_list (&thread_ref, m_xasl, uses_clones);
        if (ev_res == V_FALSE || ev_res == V_UNKNOWN)
          {
            continue;
          }
        else
          {
            m_err_messages->move_top_error_message_to_this();
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            stop = true;
            return S_ERROR;
          }
          }
      }
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST || result_type == RESULT_TYPE::BUILDVALUE_OPT)
      {
        if (m_xasl->scan_ptr)
          {
        m_xasl->curr_spec->s_id.qualified_block = true;

        /* handle the scan procedure */
        m_xasl->scan_ptr->next_scan_on = false;
        if (scan_reset_scan_block (&thread_ref, &m_xasl->scan_ptr->curr_spec->s_id) == S_ERROR)
          {
            m_err_messages->move_top_error_message_to_this();
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            stop = true;
            return S_ERROR;
          }

        m_xasl->next_scan_on = true;
        if (m_xasl->scan_ptr->memoize_storage)
          {
            m_xasl->scan_ptr->memoize_storage->set_key_changed ();
          }
        while ((xs_scan = qexec_execute_scan_ptr (&thread_ref, m_xasl->scan_ptr, m_xasl_state, m_scan_func_ptr)) == S_SUCCESS)
          {
            if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
              {
            result_handler_p->write (&thread_ref, m_xasl->outptr_list);
              }
            else if constexpr (result_type == RESULT_TYPE::BUILDVALUE_OPT)
              {
            result_handler_p->write (&thread_ref);
              }
          }
        if (xs_scan == S_ERROR)
          {
            m_err_messages->move_top_error_message_to_this();
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            stop = true;
            return S_ERROR;
          }
        m_xasl->next_scan_on = false;
          }
        else
          {
        if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
          {
            result_handler_p->write (&thread_ref, m_xasl->outptr_list);
          }
        else if constexpr (result_type == RESULT_TYPE::BUILDVALUE_OPT)
          {
            result_handler_p->write (&thread_ref);
          }
          }
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
        result_handler_p->write (&thread_ref, m_xasl->val_list);
      }

    /* dptrs reaching here are per-row re-evaluated correlated subqueries; checker blocks join-type and IN-clause variants. clear all. */

    clear_xasl_dptr_list (&thread_ref, m_xasl, uses_clones);
      }
    return S_END;
  }

  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  void task<result_type, ST>::loop (cubthread::entry &thread_ref)
  {
    SCAN_CODE scan_code;
    VPID vpid;
    int err_code;
    bool stop = false;
    bool is_interrupt;
    bool dummy = false;
    int set_page_err;
    PAGE_PTR list_page = nullptr;
    QMGR_TEMP_FILE *list_tfile = nullptr;
    PAGE_PTR index_page = nullptr;

    INT16 index_slot_hint = NULL_SLOTID;
    int index_range_idx = -1;

    /* abort path: signal_no_more_leaves before leave_worker; else wait_or_help_overflow stalls. S_END disarms via handler=nullptr. */
    struct worker_scope_guard
    {
      input_handler_t *handler;
      ~worker_scope_guard ()
      {
    if constexpr (ST == SCAN_TYPE::INDEX)
      {
        if (handler != nullptr)
          {
        handler->signal_no_more_leaves ();
        handler->leave_worker ();
          }
      }
      }
    };
    worker_scope_guard worker_guard;
    worker_guard.handler = nullptr;
    if constexpr (ST == SCAN_TYPE::INDEX)
      {
    m_input_handler->enter_worker ();
    worker_guard.handler = m_input_handler;
      }

    while (!stop)
      {
    if (m_interrupt->get_code() != parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
      {
        break;
      }
    is_interrupt= logtb_get_check_interrupt (&thread_ref)
              && logtb_is_interrupted_tran (&thread_ref, true, &dummy, thread_ref.tran_index);
    if (is_interrupt)
      {
        if (m_interrupt->get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
          {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTED, 0);
        m_err_messages->move_top_error_message_to_this();
        m_interrupt->set_code (parallel_query::interrupt::interrupt_code::USER_INTERRUPTED_FROM_WORKER_THREAD);
          }
        break;
      }
    /* LIST/INDEX: handler hands fixed page to iterator. HEAP: vpid; page lives in scan_cache->page_watcher. */

    if constexpr (ST == SCAN_TYPE::LIST)
      {
        list_page = nullptr;
        list_tfile = nullptr;
        scan_code = m_input_handler->get_next_page_with_fix (&thread_ref, list_page, list_tfile);
      }
    else if constexpr (ST == SCAN_TYPE::INDEX)
      {
        index_page = nullptr;
        index_slot_hint = NULL_SLOTID;
        index_range_idx = -1;
        scan_code = m_input_handler->get_next_page_with_fix (&thread_ref, m_scan_id, index_page, &index_slot_hint,
            &index_range_idx);
      }
    else
      {
        scan_code = m_input_handler->get_next_vpid_with_fix (&thread_ref, &vpid);
      }
    if (scan_code == S_END)
      {
        if constexpr (ST == SCAN_TYPE::INDEX)
          {
        /* drain_late_joiner_chains calls leave_worker itself; disarm the RAII guard first. */
        worker_guard.handler = nullptr;
        drain_late_joiner_chains (thread_ref, stop);
          }
        m_xasl->curr_spec->s_id.position = S_AFTER;
        break;
      }
    if (scan_code == S_ERROR)
      {
        if (m_interrupt->get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
          {
        err_code = m_err_messages->move_top_error_message_to_this();
        if (err_code == ER_INTERRUPTED)
          {
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::USER_INTERRUPTED_FROM_WORKER_THREAD);
          }
        else
          {
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
          }
          }
        break;
      }

    if constexpr (ST == SCAN_TYPE::LIST)
      {
        set_page_err = m_slot_iterator.set_page (&thread_ref, list_page, list_tfile);
      }
    else if constexpr (ST == SCAN_TYPE::INDEX)
      {
        /* refresh slot_iterator's range_idx only on descent (>= 0); -1 sentinel = chain-walk, leave local cursor alone. */
        if (index_range_idx >= 0)
          {
        m_slot_iterator.set_range_idx (index_range_idx);
          }
        set_page_err = m_slot_iterator.set_page (&thread_ref, index_page, index_slot_hint);
      }
    else
      {
        set_page_err = m_slot_iterator.set_page (&thread_ref, &vpid);
      }
    if (set_page_err != NO_ERROR)
      {
        if (m_interrupt->get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
          {
        err_code = m_err_messages->move_top_error_message_to_this();
        if (err_code == ER_INTERRUPTED)
          {
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::USER_INTERRUPTED_FROM_WORKER_THREAD);
          }
        else
          {
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
          }
          }
        break;
      }
    /* drain_slot_oids returns S_END at completion or S_ERROR with stop=true on terminal failure. */
    (void) drain_slot_oids (thread_ref, stop);
      }
  }

  /* INDEX-only: after leaf supply exhausted, signal no-more-leaves, leave worker count, then help drain remaining shared overflow chains until the pool quiesces. */
  template <RESULT_TYPE result_type, SCAN_TYPE ST>
  void task<result_type, ST>::drain_late_joiner_chains (cubthread::entry &thread_ref, bool &stop)
  {
    if constexpr (ST == SCAN_TYPE::INDEX)
      {
    /* Order: signal_no_more_leaves before leave_worker so waiters seeing active==0 also see no_more_leaves. */
    m_input_handler->signal_no_more_leaves ();
    m_input_handler->leave_worker ();
    while (!stop)
      {
        PAGE_PTR ovf_page = nullptr;
        DB_VALUE ovf_local_key;
        bool ovf_local_clear_key = false;
        int ovf_range = -1;
        int ovf_slot_idx = -1;
        db_make_null (&ovf_local_key);
        SCAN_CODE help = m_input_handler->wait_or_help_overflow (&thread_ref, ovf_page,
                 &ovf_local_key, &ovf_local_clear_key, ovf_range, ovf_slot_idx);
        if (help == S_END)
          {
        break;
          }
        if (help == S_ERROR)
          {
        if (m_interrupt->get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
          {
            m_err_messages->move_top_error_message_to_this();
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
          }
        stop = true;
        break;
          }
        /* Ownership transfer: on S_SUCCESS, set_overflow_page adopts ovf_local_key body. */
        int sp_err = m_slot_iterator.set_overflow_page (&thread_ref, ovf_page, &ovf_local_key,
             ovf_local_clear_key, ovf_range, ovf_slot_idx);
        if (sp_err != NO_ERROR)
          {
        if (m_interrupt->get_code() == parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
          {
            m_err_messages->move_top_error_message_to_this();
            m_interrupt->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
          }
        stop = true;
        break;
          }
        /* drain_slot_oids returns S_END at completion or S_ERROR with stop=true on terminal failure. */
        (void) drain_slot_oids (thread_ref, stop);
      }
      }
  }

  /* Explicit template instantiations */
  template class task<RESULT_TYPE::MERGEABLE_LIST, SCAN_TYPE::HEAP>;
  template class task<RESULT_TYPE::XASL_SNAPSHOT, SCAN_TYPE::HEAP>;
  template class task<RESULT_TYPE::BUILDVALUE_OPT, SCAN_TYPE::HEAP>;

  template class task<RESULT_TYPE::MERGEABLE_LIST, SCAN_TYPE::LIST>;
  template class task<RESULT_TYPE::BUILDVALUE_OPT, SCAN_TYPE::LIST>;

  template class task<RESULT_TYPE::MERGEABLE_LIST, SCAN_TYPE::INDEX>;
  template class task<RESULT_TYPE::BUILDVALUE_OPT, SCAN_TYPE::INDEX>;
}