Skip to content

File px_heap_scan_result_handler.cpp

File List > cubrid > src > query > parallel > px_heap_scan > px_heap_scan_result_handler.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_heap_scan_result_handler.cpp
 */

#include "px_heap_scan_result_handler.hpp"
#include "error_code.h"
#include "error_manager.h"
#include "memory_alloc.h"
#include "object_primitive.h"
#include "porting.h"
#include "query_opfunc.h"
#include "list_file.h"
#include "dbtype_def.h"
#include "object_representation.h"
#include <chrono>
#include "dbtype.h"
#include "fetch.h"
#include "query_aggregate.hpp"
#include "xasl_aggregate.hpp"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_heap_scan
{
  template <RESULT_TYPE result_type>
  thread_local typename result_handler<result_type>::tls result_handler<result_type>::tl;

  thread_local AGGREGATE_TYPE *result_handler<RESULT_TYPE::BUILDVALUE_OPT>::tl_agg_p;
  thread_local OUTPTR_LIST *result_handler<RESULT_TYPE::BUILDVALUE_OPT>::tl_outptr_list_p;
  thread_local VAL_DESCR *result_handler<RESULT_TYPE::BUILDVALUE_OPT>::tl_vd;
  thread_local xasl_node *result_handler<RESULT_TYPE::BUILDVALUE_OPT>::tl_xasl_p;
  thread_local QFILE_TUPLE_RECORD result_handler<RESULT_TYPE::BUILDVALUE_OPT>::tl_tpl_buf;
  thread_local OR_BUF result_handler<RESULT_TYPE::BUILDVALUE_OPT>::tl_or_buf;

  int update_domains_on_type_list_by_val_list (THREAD_ENTRY *thread_p, QFILE_LIST_ID *list_id_p, VAL_LIST *val_list_p)
  {
    assert (thread_p != nullptr);
    assert (list_id_p != nullptr);
    assert (val_list_p != nullptr);
    int i;
    QPROC_DB_VALUE_LIST valp = val_list_p->valp;
    list_id_p->is_domain_resolved = true;

    for (i=0; i<val_list_p->val_cnt; i++, valp = valp->next)
      {
    assert (i >= 0 && i < val_list_p->val_cnt);
    assert (valp != nullptr);
    assert (valp->val != nullptr);
    assert (i >= 0 && i < list_id_p->type_list.type_cnt);
    if (valp->val->domain.general_info.is_null)
      {
        list_id_p->is_domain_resolved = false;
      }
    else
      {
        list_id_p->type_list.domp[i] = valp->dom;
      }
      }
    return NO_ERROR;
  }


  template <RESULT_TYPE result_type>
  result_handler<result_type>::result_handler (QUERY_ID query_id, interrupt *interrupt_p,
      err_messages_with_lock *err_messages_p, int parallelism, bool g_agg_domain_resolve_need,
      XASL_NODE *orig_xasl_tree_for_domain_resolve)
  {
    m_parallelism = parallelism;
    m_query_id = query_id;
    m_interrupt_p = interrupt_p;
    m_err_messages_p = err_messages_p;
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    m_.orig_xasl = orig_xasl_tree_for_domain_resolve;
    m_.active_results = parallelism;
    m_.is_list_id_domain_resolved = false;
    m_.g_hash_eligible = (bool) orig_xasl_tree_for_domain_resolve->proc.buildlist.g_hash_eligible;
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    m_.list_id_headers.resize (parallelism);
    m_.list_id_header_index.store (0);
    m_.current_read_spec = nullptr;
    for (list_id_header &list_id_header : m_.list_id_headers)
      {
        VPID64_t vpid;
        vpid.vpid.pageid = NULL_PAGEID;
        vpid.vpid.volid = NULL_VOLID;
        list_id_header.m_first_vpid.store (vpid);
        list_id_header.m_last_vpid.store (vpid);
        list_id_header.m_list_closed.store (false);
        list_id_header.m_valid.store (false);
        list_id_header.m_list_id_p = nullptr;
        list_id_header.m_type_list.resize (0);
        list_id_header.m_type_cnt = 0;
      }
    m_.read_specs.resize (parallelism);
    for (int i = 0; i < parallelism; i++)
      {
        m_.read_specs[i].list_id_header_p = &m_.list_id_headers[i];
        m_.read_specs[i].read_ended = false;
        m_.read_specs[i].list_scan_id_opened = false;
      }
      }
    else
      {
    assert (false);
      }
  }

  template <RESULT_TYPE result_type>
  void result_handler<result_type>::read_initialize (THREAD_ENTRY *thread_p)
  {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    /* do nothing */
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    tl.tpl_buf.tpl = nullptr;
    tl.tpl_buf.size = 0;
      }
    else
      {
    assert (false);
      }
  }

  template <RESULT_TYPE result_type>
  void result_handler<result_type>::read_finalize (THREAD_ENTRY *thread_p)
  {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    for (QFILE_LIST_ID *list_id : m_.writer_results)
      {
        if (list_id != nullptr && list_id->type_list.type_cnt > 0)
          {
        qfile_destroy_list (thread_p, list_id);
        QFILE_FREE_AND_INIT_LIST_ID (list_id);
          }
      }
    m_.writer_results.clear();
    for (QFILE_LIST_ID *list_id : m_.hgby_results)
      {
        if (list_id != nullptr && list_id->type_list.type_cnt > 0)
          {
        qfile_destroy_list (thread_p, list_id);
        QFILE_FREE_AND_INIT_LIST_ID (list_id);
          }
      }
    m_.hgby_results.clear();
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    for (read_spec &read_spec : m_.read_specs)
      {
        if (read_spec.list_scan_id_opened)
          {
        qfile_close_scan (thread_p, &read_spec.list_scan_id);
          }
      }
    m_.read_specs.clear();
    for (list_id_header &list_id_header : m_.list_id_headers)
      {
        if (list_id_header.m_list_id_p != nullptr)
          {
        assert (list_id_header.m_list_id_p->last_pgptr == nullptr);
        qfile_destroy_list (thread_p, list_id_header.m_list_id_p);
        list_id_header.m_list_id_p = nullptr;
          }
        for (std::atomic<TP_DOMAIN *> *type_list_p : list_id_header.m_type_list)
          {
        delete type_list_p;
          }
        list_id_header.m_type_list.clear ();
        list_id_header.m_type_cnt = 0;
      }
    m_.list_id_headers.clear();
    m_.current_read_spec = nullptr;
    if (tl.tpl_buf.size > 0 && tl.tpl_buf.tpl != nullptr)
      {
        db_private_free_and_init (thread_p, tl.tpl_buf.tpl);
        tl.tpl_buf.size = 0;
      }
      }
    else
      {
    assert (false);
      }
  }

  template <RESULT_TYPE result_type>
  void result_handler<result_type>::write_initialize (THREAD_ENTRY *thread_p, OUTPTR_LIST *outptr_list,
      XASL_NODE *curr_xasl, VAL_DESCR *vd)
  {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    int size;
    tl.vd = vd;
    {
      std::lock_guard<std::mutex> lock (m_.writer_results_mutex);
      qfile_tuple_value_type_list type_list;
      int err_code = NO_ERROR;
      QFILE_LIST_ID *list_id;
      err_code = qdata_get_valptr_type_list (thread_p, outptr_list, &type_list);
      if (err_code != NO_ERROR)
        {
          m_err_messages_p->move_top_error_message_to_this();
          m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
          /* error occurred, return false to stop the writer */
          return;
        }
      list_id = qfile_open_list (thread_p, &type_list, NULL, m_query_id, QFILE_FLAG_ALL|QFILE_NOT_USE_MEMBUF, NULL );
      if (!list_id)
        {
          m_err_messages_p->move_top_error_message_to_this();
          m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
          /* error occurred, return false to stop the writer */
          return;
        }
      m_.writer_results.push_back (list_id);
      tl.writer_result_p = list_id;
      if (type_list.domp != nullptr)
        {
          db_private_free_and_init (thread_p, type_list.domp);
        }
    }
    size = tl.writer_result_p->type_list.type_cnt * DB_SIZEOF (DB_VALUE *);
    tl.writer_result_p->tpl_descr.f_valp = (DB_VALUE **) malloc (size);
    if (tl.writer_result_p->tpl_descr.f_valp == NULL)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, size);
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        /* error occurred, return false to stop the writer */
        return;
      }
    size = tl.writer_result_p->type_list.type_cnt * sizeof (bool);
    tl.tpl_buf.tpl = (char *) db_private_alloc (thread_p, DB_PAGESIZE);
    if (tl.tpl_buf.tpl == nullptr)
      {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return;
      }
    tl.tpl_buf.size = DB_PAGESIZE;
    int total_val_cnt = 0;
    for (XASL_NODE *xasl = m_.orig_xasl; xasl != nullptr; xasl = xasl->scan_ptr)
      {
        total_val_cnt += xasl->val_list->val_cnt;
      }
    tl.dbvals_for_domain_resolve.resize (total_val_cnt);
    for (DB_VALUE &dbval : tl.dbvals_for_domain_resolve)
      {
        dbval.domain.general_info.is_null = 1;
      }
    tl.val_list_domain_resolved = false;
    tl.xasl = curr_xasl;
    tl.agg_hash_state = HS_NONE;
    tl.g_agg_domains_resolved = TRUE;
    if (m_.g_hash_eligible)
      {
        if (qexec_alloc_agg_hash_context_buildlist_xasl (thread_p, curr_xasl, vd->xasl_state, true) != NO_ERROR)
          {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return;
          }
        tl.agg_hash_state = HS_ACCEPT_ALL;
        tl.g_agg_domains_resolved = FALSE;
      }
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    int index;
    tl.tpl_buf.tpl = (char *)db_private_alloc (thread_p, DB_PAGESIZE);
    if (tl.tpl_buf.tpl == nullptr)
      {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return;
      }
    tl.tpl_buf.size = DB_PAGESIZE;
    index = m_.list_id_header_index.fetch_add (1, std::memory_order_acq_rel);
    tl.list_id_header_p = &m_.list_id_headers[index];
      }
    else
      {
    assert (false);
      }
  }

  template <RESULT_TYPE result_type>
  void result_handler<result_type>::write_finalize (THREAD_ENTRY *thread_p)
  {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    AGGREGATE_HASH_CONTEXT *context = tl.xasl->proc.buildlist.agg_hash_context;
    bool hash_aggregate_append = m_.g_hash_eligible;
    if (hash_aggregate_append)
      {
        if (qdata_save_agg_htable_to_list (thread_p, context->hash_table, tl.writer_result_p,
                           context->part_list_id, context->temp_dbval_array) != NO_ERROR)
          {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        /* for prevent data corruption at m_.hgby_results;
         * context->part_list_id will be destroyed in thread's qexec_clear_xasl */
        hash_aggregate_append = false;
          }
        if (context->part_list_id != NULL)
          {
        qfile_close_list (thread_p, context->part_list_id);
          }
      }
    qfile_close_list (thread_p, tl.writer_result_p);

    assert (tl.writer_result_p->last_pgptr == nullptr);
    if (tl.writer_result_p != nullptr && tl.writer_result_p->tpl_descr.f_valp != nullptr)
      {
        free_and_init (tl.writer_result_p->tpl_descr.f_valp);
      }
    tl.writer_result_p = nullptr;
    if (tl.tpl_buf.tpl != nullptr)
      {
        db_private_free (thread_p, tl.tpl_buf.tpl);
        tl.tpl_buf.tpl = nullptr;
      }
    tl.vd = nullptr;
    {
      std::lock_guard<std::mutex> lock (m_result_mutex);

      HL_HEAPID heap_id = db_change_private_heap (thread_p, 0);
      XASL_NODE *xptr = m_.orig_xasl;
      int i = 0;
      for (; xptr != nullptr; xptr = xptr->scan_ptr)
        {
          QPROC_DB_VALUE_LIST orig_valp = xptr->val_list->valp;
          int end = i + xptr->val_list->val_cnt;
          for (; i < end; i++)
        {
          if (orig_valp->val->domain.general_info.is_null && !tl.dbvals_for_domain_resolve[i].domain.general_info.is_null)
            {
              pr_clone_value (&tl.dbvals_for_domain_resolve[i], orig_valp->val);
            }
          orig_valp = orig_valp->next;
        }
        }

      db_change_private_heap (thread_p, heap_id);
      for (DB_VALUE &dbval : tl.dbvals_for_domain_resolve)
        {
          pr_clear_value (&dbval);
        }
      tl.dbvals_for_domain_resolve.clear();

      if (hash_aggregate_append)
        {
          m_.hgby_results.push_back (context->part_list_id);
          context->part_list_id = NULL;
        }

      m_.active_results--;
      if (m_.active_results == 0)
        {
          m_result_cv.notify_all();
        }
    }
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    VPID64_t last_vpid;
    if (tl.tpl_buf.size > 0 && tl.tpl_buf.tpl != nullptr)
      {
        db_private_free_and_init (thread_p, tl.tpl_buf.tpl);
        tl.tpl_buf.size = 0;
      }
    assert (tl.list_id_header_p != nullptr);
    if (tl.list_id_header_p->m_list_id_p != nullptr)
      {
        qfile_close_list (thread_p, tl.list_id_header_p->m_list_id_p);
        for (int i = 0; i < tl.list_id_header_p->m_type_cnt; i++)
          {
        tl.list_id_header_p->m_type_list[i]->store ((TP_DOMAIN *)tl.list_id_header_p->m_list_id_p->type_list.domp[i],
            std::memory_order_release);
          }
        last_vpid.vpid = tl.list_id_header_p->m_list_id_p->last_vpid;
        tl.list_id_header_p->m_last_vpid.store (last_vpid, std::memory_order_release);
        if (VPID_EQ (&tl.list_id_header_p->m_list_id_p->last_vpid, &tl.list_id_header_p->m_list_id_p->first_vpid))
          {
        tl.list_id_header_p->m_first_vpid.store (last_vpid, std::memory_order_release);
          }
        tl.list_id_header_p->m_valid.store (true, std::memory_order_release);
      }
    tl.list_id_header_p->m_list_closed.store (true, std::memory_order_release);
    m_result_cv.notify_all ();
    tl.list_id_header_p = nullptr;
      }
    else
      {
    assert (false);
      }
  }

  template <RESULT_TYPE result_type>
  void result_handler<result_type>::get_valid_read_spec ()
  {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    return;
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    bool found = false;
    read_spec *read_spec_p;
    list_id_header *list_id_header_p;
    int ended_count;
    VPID first_vpid, last_vpid;
    VPID next_vpid;
    do
      {
        found = false;
        ended_count = 0;
        for (int i = 0; i < m_parallelism; i++)
          {
        read_spec_p = &m_.read_specs[i];
        if (!read_spec_p->read_ended)
          {
            list_id_header_p = read_spec_p->list_id_header_p;
            if (list_id_header_p->m_valid.load (std::memory_order_acquire))
              {
            if (list_id_header_p->m_list_closed.load (std::memory_order_acquire))
              {
                m_.current_read_spec = read_spec_p;
                found = true;
                break;
              }
            else
              {
                first_vpid = list_id_header_p->m_first_vpid.load (std::memory_order_acquire).vpid;
                last_vpid = list_id_header_p->m_last_vpid.load (std::memory_order_acquire).vpid;
                if (!VPID_EQ (&first_vpid, &last_vpid))
                  {
                if (read_spec_p->list_scan_id_opened == false)
                  {
                    m_.current_read_spec = read_spec_p;
                    found = true;
                    break;
                  }
                else
                  {
                    QFILE_GET_NEXT_VPID (&next_vpid, read_spec_p->list_scan_id.curr_pgptr);
                    if (next_vpid.pageid == last_vpid.pageid && next_vpid.volid == last_vpid.volid)
                      {
                    found = false;
                    continue;
                      }
                    else
                      {
                    m_.current_read_spec = read_spec_p;
                    found = true;
                    break;
                      }
                  }
                  }
              }
              }
            else if (list_id_header_p->m_list_closed.load (std::memory_order_acquire))
              {
            ended_count++;
              }
          }
        else
          {
            ended_count++;
          }
          }
        if (ended_count == m_parallelism)
          {
        found = true;
        break;
          }
        if (!found)
          {
        std::unique_lock<std::mutex> lock (m_result_mutex);
        m_result_cv.wait_for (lock, std::chrono::microseconds (50));
          }
      }
    while (!found);
      }
    else
      {
    assert (false);
      }
  }

  void merge_list_ids (THREAD_ENTRY *thread_p, QFILE_LIST_ID *dest, std::vector<QFILE_LIST_ID *> &lists)
  {
    QFILE_LIST_ID *tmp_merged_list = nullptr;
    for (QFILE_LIST_ID *list_id : lists)
      {
    assert (list_id != nullptr);
    assert (list_id->last_pgptr == nullptr);
    if (list_id->tuple_cnt > 0)
      {
        if (tmp_merged_list == nullptr)
          {
        tmp_merged_list = list_id;
          }
        else
          {
        qfile_connect_list (thread_p, tmp_merged_list, list_id);
          }
      }
    else
      {
        qfile_destroy_list (thread_p, list_id);
        QFILE_FREE_AND_INIT_LIST_ID (list_id);
      }
    list_id = nullptr;
      }
    lists.clear();
    if (tmp_merged_list != nullptr)
      {
    if (dest->tuple_cnt > 0)
      {
        if (dest->last_pgptr != nullptr)
          {
        qfile_close_list (thread_p, dest);
          }
        qfile_connect_list (thread_p, dest, tmp_merged_list);
      }
    else
      {
        if (dest->type_list.type_cnt > 0)
          {
        qfile_destroy_list (thread_p, dest);
          }
        qfile_copy_list_id (dest, tmp_merged_list, true, QFILE_MOVE_DEPENDENT);
        QFILE_FREE_AND_INIT_LIST_ID (tmp_merged_list);
      }
      }
  }

  template <RESULT_TYPE result_type>
  SCAN_CODE result_handler<result_type>::read (THREAD_ENTRY *thread_p, read_dest_type *dest)
  {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    {
      std::unique_lock<std::mutex> lock (m_result_mutex);
      if (m_.active_results != 0)
        {
          while (m_.active_results != 0)
        {
          m_result_cv.wait_for (lock, std::chrono::microseconds (50));
          if (m_interrupt_p->get_code() != parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
            {
              return S_ERROR;
            }
        }
        }
    }

    if (m_interrupt_p->get_code() != parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
      {
        return S_ERROR;
      }

    merge_list_ids (thread_p, dest, m_.writer_results);

    if (m_.g_hash_eligible)
      {
        BUILDLIST_PROC_NODE *buildlist_proc = &m_.orig_xasl->proc.buildlist;
        merge_list_ids (thread_p, buildlist_proc->agg_hash_context->part_list_id, m_.hgby_results);
        /* Using HS_REJECT_ALL to force 'hash: partial' in trace during hash group by with part list IDs.
         * Refer to gstats in qdump_print_stats_text(). */
        m_.orig_xasl->groupby_stats.groupby_hash = HS_REJECT_ALL;
      }

    return S_END;
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    SCAN_CODE scan_code = S_SUCCESS;
    bool should_retry = false;
    QFILE_LIST_SCAN_ID *list_scan_id_p;
    QFILE_LIST_ID *list_id_p;
    list_id_header *list_id_header_p;
    VPID first_vpid, last_vpid;
    bool list_closed;
    VPID next_vpid;
    int err_code;
    TP_DOMAIN *domain_p;
    OR_BUF iterator, buf;
    QFILE_TUPLE_VALUE_FLAG flag;
    QPROC_DB_VALUE_LIST val_list_iterator;
    int val_list_index;

    do
      {
        should_retry = false;
        if (m_.current_read_spec == nullptr)
          {
        get_valid_read_spec ();
        if (m_.current_read_spec == nullptr)
          {
            return S_END;
          }
          }
        list_scan_id_p = &m_.current_read_spec->list_scan_id;
        list_id_p = m_.current_read_spec->list_id_header_p->m_list_id_p;
        list_id_header_p = m_.current_read_spec->list_id_header_p;
        assert (m_.current_read_spec != nullptr && m_.current_read_spec->list_id_header_p != nullptr
            && m_.current_read_spec->list_id_header_p->m_valid.load (std::memory_order_relaxed));

        first_vpid = list_id_header_p->m_first_vpid.load (std::memory_order_acquire).vpid;
        last_vpid = list_id_header_p->m_last_vpid.load (std::memory_order_acquire).vpid;
        list_closed = list_id_header_p->m_list_closed.load (std::memory_order_acquire);
        assert (first_vpid.pageid != NULL_PAGEID && last_vpid.pageid != NULL_PAGEID);

        if (unlikely (list_id_p == nullptr))
          {
        m_.current_read_spec->read_ended = true;
        list_id_header_p->m_valid.store (false, std::memory_order_release);
        m_.current_read_spec = nullptr;
        should_retry = true;
        continue;
          }

        if (unlikely (m_.current_read_spec->list_scan_id_opened == false))
          {
        qfile_open_list_scan (list_id_p, list_scan_id_p);
        m_.current_read_spec->list_scan_id_opened = true;
          }
        if (unlikely (!list_closed && list_scan_id_p->position == S_ON))
          {
        if (list_scan_id_p->curr_tplno >= QFILE_GET_TUPLE_COUNT (list_scan_id_p->curr_pgptr) - 1)
          {
            QFILE_GET_NEXT_VPID (&next_vpid, list_scan_id_p->curr_pgptr);
            if (next_vpid.pageid == NULL_PAGEID)
              {
            /* end of list */
              }
            else
              {
            if (next_vpid.pageid == last_vpid.pageid && next_vpid.volid == last_vpid.volid)
              {
                /* next page is in write-phase */
                should_retry = true;
                continue;
              }
              }
          }
          }

        scan_code = qfile_scan_list_next (thread_p, list_scan_id_p, &tl.tpl_buf, PEEK);
        if (unlikely (!VPID_EQ (&list_scan_id_p->curr_vpid, &first_vpid)))
          {
        VPID64_t vpid;
        vpid.vpid = list_scan_id_p->curr_vpid;
        list_id_header_p->m_first_vpid.store (vpid, std::memory_order_release);
        first_vpid = list_scan_id_p->curr_vpid;
          }

        if (unlikely (scan_code != S_SUCCESS))
          {
        if (scan_code == S_ERROR)
          {
            m_err_messages_p->move_top_error_message_to_this();
            m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            return S_ERROR;
          }
        else
          {
            m_.current_read_spec->read_ended = true;
            list_id_header_p->m_valid.store (false, std::memory_order_release);
            m_.current_read_spec = nullptr;
            should_retry = true;
            continue;
          }
          }

        or_init (&iterator, tl.tpl_buf.tpl, QFILE_GET_TUPLE_LENGTH (tl.tpl_buf.tpl));
        or_advance (&iterator, QFILE_TUPLE_LENGTH_SIZE);

        for (val_list_iterator = dest->valp, val_list_index = 0; val_list_iterator
         && val_list_index < dest->val_cnt; val_list_iterator = val_list_iterator->next, val_list_index++)
          {
        qfile_locate_tuple_next_value (&iterator, &buf, &flag);
        pr_clear_value (val_list_iterator->val);
        if (flag == V_UNBOUND)
          {
            db_make_null (val_list_iterator->val);
            continue;
          }
        domain_p = (TP_DOMAIN *)list_id_header_p->m_type_list[val_list_index]->load (std::memory_order_acquire);
        err_code = domain_p->type->data_readval (&buf, val_list_iterator->val, domain_p, -1, false, NULL, 0);
        if (err_code != NO_ERROR)
          {
            return S_ERROR;
          }
          }
        return S_SUCCESS;
      }
    while (should_retry);

    return S_SUCCESS;
      }
    else
      {
    assert (false);
    return S_ERROR;
      }
  }

  template <RESULT_TYPE result_type>
  bool result_handler<result_type>::write (THREAD_ENTRY *thread_p, write_dest_type *src)
  {
    if constexpr (result_type == RESULT_TYPE::MERGEABLE_LIST)
      {
    int err_code = NO_ERROR;
    QPROC_TPLDESCR_STATUS status;

    OUTPTR_LIST *input = (OUTPTR_LIST *)src;

    prefetch (tl.writer_result_p, PREFETCH_WRITE, PREFETCH_CACHE_L1);

    status = qdata_generate_tuple_desc_for_valptr_list (thread_p, input, tl.vd, & (tl.writer_result_p->tpl_descr));

    if (unlikely (!m_.is_list_id_domain_resolved))
      {
        qfile_update_domains_on_type_list (thread_p, tl.writer_result_p, input);
        m_.is_list_id_domain_resolved = tl.writer_result_p->is_domain_resolved;
      }
    if (unlikely (!tl.val_list_domain_resolved))
      {
        XASL_NODE *xptr = tl.xasl;
        int i = 0;
        tl.val_list_domain_resolved = true;

        for (; xptr != nullptr; xptr = xptr->scan_ptr)
          {
        QPROC_DB_VALUE_LIST valp = xptr->val_list->valp;
        int end = i + xptr->val_list->val_cnt;
        for (; i < end; i++)
          {
            if (tl.dbvals_for_domain_resolve[i].domain.general_info.is_null)
              {
            if (!valp->val->domain.general_info.is_null)
              {
                pr_clone_value (valp->val, &tl.dbvals_for_domain_resolve[i]);
              }
            else
              {
                tl.val_list_domain_resolved = false;
              }
              }
            valp = valp->next;
          }
          }
      }

    if (likely (status == QPROC_TPLDESCR_SUCCESS))
      {
        bool output_tuple = true;
        if (tl.agg_hash_state == HS_ACCEPT_ALL)
          {
        if (unlikely (!tl.g_agg_domains_resolved))
          {
            if (qexec_resolve_domains_for_aggregation_for_parallel_heap_scan_g_agg (thread_p, tl.xasl, tl.vd,
            &tl.g_agg_domains_resolved) != NO_ERROR)
              {
            m_err_messages_p->move_top_error_message_to_this();
            m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            return false;
              }
          }
        if (qexec_hash_gby_agg_tuple_public (thread_p, tl.xasl, tl.vd->xasl_state, &tl.tpl_buf,
                             & (tl.writer_result_p->tpl_descr), tl.writer_result_p, &output_tuple) != NO_ERROR)
          {
            m_err_messages_p->move_top_error_message_to_this();
            m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            return false;
          }
        tl.agg_hash_state = tl.xasl->proc.buildlist.agg_hash_context->state;
          }
        if (output_tuple)
          {
        if (unlikely (qfile_generate_tuple_into_list (thread_p, tl.writer_result_p, T_NORMAL) != NO_ERROR))
          {
            m_err_messages_p->move_top_error_message_to_this();
            m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            return false;
          }
          }
      }
    else if (unlikely (status == QPROC_TPLDESCR_FAILURE))
      {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return false;
      }
    else if (unlikely (status == QPROC_TPLDESCR_RETRY_SET_TYPE || status == QPROC_TPLDESCR_RETRY_BIG_REC))
      {
        err_code = qdata_copy_valptr_list_to_tuple (thread_p, input, tl.vd, &tl.tpl_buf);
        if (err_code != NO_ERROR)
          {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return false;
          }
        err_code = qfile_add_tuple_to_list (thread_p, tl.writer_result_p, tl.tpl_buf.tpl);
        if (err_code != NO_ERROR)
          {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return false;
          }
      }
    return true;
      }
    else if constexpr (result_type == RESULT_TYPE::XASL_SNAPSHOT)
      {
    int err_code;
    VPID old_last_vpid;
    QFILE_LIST_ID *list_id_p;
    parallel_heap_scan::list_id_header *tl_list_id_header = tl.list_id_header_p;
    QFILE_TUPLE_RECORD &tl_tpl_buf = tl.tpl_buf;
    VAL_LIST *input = src;

    if (unlikely (tl_list_id_header->m_list_id_p == nullptr))
      {
        QFILE_TUPLE_VALUE_TYPE_LIST type_list;
        err_code = qdata_get_val_list_type_list (thread_p, input, &type_list);
        if (err_code != NO_ERROR)
          {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return false;
          }
        tl_list_id_header->m_list_id_p = qfile_open_list (thread_p, &type_list, NULL, m_query_id,
                         QFILE_FLAG_ALL, NULL);
        if (tl_list_id_header->m_list_id_p == nullptr)
          {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return false;
          }
        tl_list_id_header->m_type_cnt = type_list.type_cnt;
        tl_list_id_header->m_type_list.resize (type_list.type_cnt);
        for (int i = 0; i < type_list.type_cnt; i++)
          {
        tl_list_id_header->m_type_list[i] = new std::atomic<TP_DOMAIN *>();
        tl_list_id_header->m_type_list[i]->store ((TP_DOMAIN *)type_list.domp[i], std::memory_order_release);
          }
        if (type_list.domp != nullptr)
          {
        free (type_list.domp);
          }
      }
    list_id_p = tl_list_id_header->m_list_id_p;
    err_code = qdata_copy_val_list_to_tuple (thread_p, input, &tl_tpl_buf);
    prefetch (list_id_p, PREFETCH_WRITE, PREFETCH_CACHE_L1);
    if (unlikely (err_code != NO_ERROR))
      {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return false;
      }
    old_last_vpid = tl_list_id_header->m_list_id_p->last_vpid;
    err_code = qfile_add_tuple_to_list (thread_p, tl_list_id_header->m_list_id_p, tl_tpl_buf.tpl);
    if (unlikely (err_code != NO_ERROR))
      {
        m_err_messages_p->move_top_error_message_to_this();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return false;
      }
    if (unlikely (!tl_list_id_header->m_list_id_p->is_domain_resolved))
      {
        (void) update_domains_on_type_list_by_val_list (thread_p, tl_list_id_header->m_list_id_p, input);
        for (int i = 0; i < tl_list_id_header->m_type_cnt; i++)
          {
        tl_list_id_header->m_type_list[i]->store ((TP_DOMAIN *)tl_list_id_header->m_list_id_p->type_list.domp[i],
            std::memory_order_release);
          }
      }
    if (unlikely (!VPID_EQ (&old_last_vpid, &tl_list_id_header->m_list_id_p->last_vpid)
              && old_last_vpid.pageid != NULL_PAGEID))
      {
        VPID64_t vpid;
        /* last vpid changed, send it to reader */
        if (tl_list_id_header->m_first_vpid.load (std::memory_order_acquire).vpid.pageid == NULL_PAGEID)
          {
        vpid.vpid = tl_list_id_header->m_list_id_p->first_vpid;
        tl_list_id_header->m_first_vpid.store (vpid, std::memory_order_release);
          }
        vpid.vpid = tl_list_id_header->m_list_id_p->last_vpid;
        tl_list_id_header->m_last_vpid.store (vpid, std::memory_order_release);
        tl_list_id_header->m_valid.store (true, std::memory_order_release);
        m_result_cv.notify_all ();
      }

    return true;
      }
    else
      {
    assert (false);
    return false;
      }
  }

  result_handler<RESULT_TYPE::BUILDVALUE_OPT>::result_handler (QUERY_ID query_id, interrupt *interrupt_p,
      err_messages_with_lock *err_messages_p, int parallelism, AGGREGATE_TYPE *orig_agg_list)
  {
    m_parallelism = parallelism;
    m_result_completed = 0;
    m_query_id = query_id;
    m_interrupt_p = interrupt_p;
    m_err_messages_p = err_messages_p;
    m_orig_agg_list = orig_agg_list;
  }

  void result_handler<RESULT_TYPE::BUILDVALUE_OPT>::read_initialize (THREAD_ENTRY *thread_p)
  {
    for (AGGREGATE_TYPE *orig_agg_p = m_orig_agg_list; orig_agg_p != NULL; orig_agg_p = orig_agg_p->next)
      {
    if (orig_agg_p->function == PT_COUNT_STAR || orig_agg_p->function == PT_COUNT)
      {
        orig_agg_p->accumulator_domain.value_dom = &tp_Bigint_domain;
        orig_agg_p->accumulator_domain.value2_dom = &tp_Null_domain;
      }
    if (orig_agg_p->list_id != nullptr)
      {
        qfile_close_list (thread_p, orig_agg_p->list_id);
      }
      }
  }

  SCAN_CODE result_handler<RESULT_TYPE::BUILDVALUE_OPT>::read (THREAD_ENTRY *thread_p, AGGREGATE_TYPE *dest)
  {
    std::unique_lock<std::mutex> lock (m_result_mutex);
    while (m_result_completed < m_parallelism)
      {
    m_result_cv.wait_for (lock, std::chrono::microseconds (50));
    if (m_interrupt_p->get_code() != parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
      {
        return S_ERROR;
      }
      }
    for (AGGREGATE_TYPE *orig_agg_p = m_orig_agg_list; orig_agg_p != NULL; orig_agg_p = orig_agg_p->next)
      {
    if (orig_agg_p->function == PT_COUNT_STAR)
      {
      }
    else if (orig_agg_p->option == Q_DISTINCT
         && orig_agg_p->function != PT_MIN && orig_agg_p->function != PT_MAX)
      {
      }
    else if (orig_agg_p->function == PT_COUNT)
      {
        db_make_bigint (orig_agg_p->accumulator.value, (INT64) orig_agg_p->accumulator.curr_cnt);
      }
    else
      {
        DB_VALUE tmp;
        if (!DB_IS_NULL (orig_agg_p->accumulator.value))
          {
        db_make_null (&tmp);
        if (pr_clone_value (orig_agg_p->accumulator.value, &tmp) != NO_ERROR)
          {
            return S_ERROR;
          }
        HL_HEAPID save_heap = db_change_private_heap (thread_p, 0);
        pr_clear_value (orig_agg_p->accumulator.value);
        db_change_private_heap (thread_p, save_heap);
        * (orig_agg_p->accumulator.value) = tmp;
          }
        if (orig_agg_p->accumulator.value2 != NULL && !DB_IS_NULL (orig_agg_p->accumulator.value2))
          {
        db_make_null (&tmp);
        if (pr_clone_value (orig_agg_p->accumulator.value2, &tmp) != NO_ERROR)
          {
            return S_ERROR;
          }
        HL_HEAPID save_heap = db_change_private_heap (thread_p, 0);
        pr_clear_value (orig_agg_p->accumulator.value2);
        db_change_private_heap (thread_p, save_heap);
        * (orig_agg_p->accumulator.value2) = tmp;
          }
      }
      }
    return S_END;
  }

  void result_handler<RESULT_TYPE::BUILDVALUE_OPT>::read_finalize (THREAD_ENTRY *thread_p)
  {

  }

  void result_handler<RESULT_TYPE::BUILDVALUE_OPT>::write_initialize (THREAD_ENTRY *thread_p, OUTPTR_LIST *outptr_list,
      write_dest_type *agg_p, VAL_DESCR *vd, xasl_node *xasl_p)
  {
    tl_outptr_list_p = outptr_list;
    tl_agg_p = agg_p;
    tl_vd = vd;
    tl_xasl_p = xasl_p;
    tl_tpl_buf.tpl = (char *)db_private_alloc (thread_p, DB_PAGESIZE);
    tl_tpl_buf.size = DB_PAGESIZE;
    tl_xasl_p->proc.buildvalue.agg_domains_resolved = 0;
    for (AGGREGATE_TYPE *agg_node = tl_xasl_p->proc.buildvalue.agg_list; agg_node != NULL; agg_node = agg_node->next)
      {
    if (agg_node->function == PT_COUNT_STAR)
      {
        agg_node->accumulator.curr_cnt = 0;
      }
    else if (agg_node->option == Q_DISTINCT
         && agg_node->function != PT_MIN && agg_node->function != PT_MAX)
      {
        int ls_flag = QFILE_FLAG_DISTINCT | QFILE_NOT_USE_MEMBUF;
        QFILE_TUPLE_VALUE_TYPE_LIST type_list;
        type_list.type_cnt = 1;
        type_list.domp = (TP_DOMAIN **) db_private_alloc (thread_p, sizeof (TP_DOMAIN *));
        if (type_list.domp == NULL)
          {
        m_err_messages_p->move_top_error_message_to_this ();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return;
          }
        type_list.domp[0] = agg_node->operands->value.domain;
        agg_node->list_id = qfile_open_list (thread_p, &type_list, NULL, m_query_id, ls_flag, agg_node->list_id);
        db_private_free_and_init (thread_p, type_list.domp);
        if (agg_node->list_id == nullptr)
          {
        m_err_messages_p->move_top_error_message_to_this ();
        m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        return;
          }
      }
    else
      {
        /* Non-DISTINCT: init curr_cnt for all types.
         * value/value2 initialization happens on first row in write() via curr_cnt < 1 check. */
        agg_node->accumulator.curr_cnt = 0;
      }
      }

  }

  bool result_handler<RESULT_TYPE::BUILDVALUE_OPT>::write (THREAD_ENTRY *thread_p)
  {
    if (!tl_xasl_p->proc.buildvalue.agg_domains_resolved)
      {
    if (qexec_resolve_domains_for_aggregation_for_parallel_heap_scan_buildvalue_proc (thread_p, tl_xasl_p, tl_vd,
        &tl_xasl_p->proc.buildvalue.agg_domains_resolved) != NO_ERROR)
      {
        return false;
      }
      }
    for (AGGREGATE_TYPE *agg_node = tl_xasl_p->proc.buildvalue.agg_list; agg_node != NULL; agg_node = agg_node->next)
      {
    AGGREGATE_ACCUMULATOR *acc = &agg_node->accumulator;
    AGGREGATE_ACCUMULATOR_DOMAIN *acc_dom = &agg_node->accumulator_domain;

    if (agg_node->function == PT_COUNT_STAR)
      {
        acc->curr_cnt++;
        continue;
      }

    DB_VALUE *db_value_p;
    if (agg_node->operands->value.type == TYPE_CONSTANT)
      {
        db_value_p = agg_node->operands->value.value.dbvalptr;
      }
    else
      {
        int err_code = fetch_peek_dbval (thread_p, &agg_node->operands->value, tl_vd, NULL, NULL,
                         tl_tpl_buf.tpl, &db_value_p);
        if (err_code != NO_ERROR)
          {
        return false;
          }
      }

    if (DB_IS_NULL (db_value_p))
      {
        continue;
      }

    if (agg_node->option == Q_DISTINCT
        && agg_node->function != PT_MIN && agg_node->function != PT_MAX)
      {
        for (REGU_VARIABLE_LIST operand = agg_node->operands; operand != NULL; operand = operand->next)
          {
        DB_VALUE *op_val_p;
        if (operand == agg_node->operands)
          {
            op_val_p = db_value_p;
          }
        else if (operand->value.type == TYPE_CONSTANT)
          {
            op_val_p = operand->value.value.dbvalptr;
          }
        else
          {
            int err_code = fetch_peek_dbval (thread_p, &operand->value, tl_vd, NULL, NULL,
                             tl_tpl_buf.tpl, &op_val_p);
            if (err_code != NO_ERROR)
              {
            return false;
              }
          }
        if (DB_IS_NULL (op_val_p))
          {
            continue;
          }
        DB_TYPE dbval_type = DB_VALUE_DOMAIN_TYPE (op_val_p);
        const PR_TYPE *pr_type_p = pr_type_from_id (dbval_type);
        int dbval_size = pr_data_writeval_disk_size (op_val_p);
        if (dbval_size > tl_tpl_buf.size)
          {
            char *new_tpl = (char *) db_private_realloc (thread_p, tl_tpl_buf.tpl, dbval_size);
            if (new_tpl == nullptr)
              {
            return false;
              }
            tl_tpl_buf.tpl = new_tpl;
            tl_tpl_buf.size = dbval_size;
          }
        or_init (&tl_or_buf, tl_tpl_buf.tpl, dbval_size);
        pr_type_p->data_writeval (&tl_or_buf, op_val_p);
        if (qfile_add_item_to_list (thread_p, tl_tpl_buf.tpl, dbval_size, agg_node->list_id) != NO_ERROR)
          {
            return false;
          }
          }
      }
    else
      {
        switch (agg_node->function)
          {
          case PT_COUNT:
        acc->curr_cnt++;
        break;

          case PT_MIN:
          {
        int coll_id = acc_dom->value_dom->collation_id;
        if (acc->curr_cnt < 1
            || acc_dom->value_dom->type->cmpval (acc->value, db_value_p, 1, 1, NULL, coll_id) > 0)
          {
            DB_TYPE type = DB_VALUE_DOMAIN_TYPE (db_value_p);
            pr_clear_value (acc->value);
            if (TP_DOMAIN_TYPE (acc_dom->value_dom) != type)
              {
            if (db_value_coerce (db_value_p, acc->value, acc_dom->value_dom) != NO_ERROR)
              {
                return false;
              }
              }
            else
              {
            if (pr_clone_value (db_value_p, acc->value) != NO_ERROR)
              {
                return false;
              }
              }
          }
        acc->curr_cnt++;
          }
          break;

          case PT_MAX:
          {
        int coll_id = acc_dom->value_dom->collation_id;
        if (acc->curr_cnt < 1
            || acc_dom->value_dom->type->cmpval (acc->value, db_value_p, 1, 1, NULL, coll_id) < 0)
          {
            DB_TYPE type = DB_VALUE_DOMAIN_TYPE (db_value_p);
            pr_clear_value (acc->value);
            if (TP_DOMAIN_TYPE (acc_dom->value_dom) != type)
              {
            if (db_value_coerce (db_value_p, acc->value, acc_dom->value_dom) != NO_ERROR)
              {
                return false;
              }
              }
            else
              {
            if (pr_clone_value (db_value_p, acc->value) != NO_ERROR)
              {
                return false;
              }
              }
          }
        acc->curr_cnt++;
          }
          break;

          case PT_SUM:
          case PT_AVG:
        if (acc->curr_cnt < 1)
          {
            DB_TYPE type = DB_VALUE_DOMAIN_TYPE (db_value_p);
            pr_clear_value (acc->value);
            if (TP_DOMAIN_TYPE (acc_dom->value_dom) != type)
              {
            if (db_value_coerce (db_value_p, acc->value, acc_dom->value_dom) != NO_ERROR)
              {
                return false;
              }
              }
            else
              {
            if (pr_clone_value (db_value_p, acc->value) != NO_ERROR)
              {
                return false;
              }
              }
          }
        else
          {
            if (qdata_add_dbval (acc->value, db_value_p, acc->value, acc_dom->value_dom) != NO_ERROR)
              {
            return false;
              }
          }
        acc->curr_cnt++;
        break;

          case PT_STDDEV:
          case PT_STDDEV_POP:
          case PT_STDDEV_SAMP:
          case PT_VARIANCE:
          case PT_VAR_POP:
          case PT_VAR_SAMP:
          {
        DB_VALUE coerced, squared;
        db_make_null (&coerced);
        db_make_null (&squared);

        if (tp_value_coerce (db_value_p, &coerced, acc_dom->value_dom) != DOMAIN_COMPATIBLE)
          {
            pr_clear_value (&coerced);
            return false;
          }

        if (qdata_multiply_dbval (&coerced, &coerced, &squared, acc_dom->value2_dom) != NO_ERROR)
          {
            pr_clear_value (&coerced);
            return false;
          }

        if (acc->curr_cnt < 1)
          {
            pr_clear_value (acc->value);
            pr_clear_value (acc->value2);
            acc_dom->value_dom->type->setval (acc->value, &coerced, true);
            acc_dom->value2_dom->type->setval (acc->value2, &squared, true);
          }
        else
          {
            if (qdata_add_dbval (acc->value, &coerced, acc->value, acc_dom->value_dom) != NO_ERROR)
              {
            pr_clear_value (&coerced);
            pr_clear_value (&squared);
            return false;
              }
            if (qdata_add_dbval (acc->value2, &squared, acc->value2, acc_dom->value2_dom) != NO_ERROR)
              {
            pr_clear_value (&coerced);
            pr_clear_value (&squared);
            return false;
              }
          }

        pr_clear_value (&coerced);
        pr_clear_value (&squared);
        acc->curr_cnt++;
          }
          break;

          default:
        assert (false);
        return false;
          }
      }
      }
    return true;
  }
  void result_handler<RESULT_TYPE::BUILDVALUE_OPT>::write_finalize (THREAD_ENTRY *thread_p)
  {
    {
      std::lock_guard<std::mutex> lock (writer_results_mutex);
      AGGREGATE_TYPE *orig_agg_p, *cur_agg_p = tl_xasl_p->proc.buildvalue.agg_list;
      for (orig_agg_p = m_orig_agg_list; orig_agg_p != NULL; orig_agg_p = orig_agg_p->next)
    {
      if (m_interrupt_p->get_code () != parallel_query::interrupt::interrupt_code::NO_INTERRUPT)
        {
          for (; cur_agg_p != NULL; cur_agg_p = cur_agg_p->next)
        {
          if (cur_agg_p->option == Q_DISTINCT
              && cur_agg_p->function != PT_MIN && cur_agg_p->function != PT_MAX
              && cur_agg_p->list_id != nullptr)
            {
              qfile_close_list (thread_p, cur_agg_p->list_id);
              qfile_destroy_list (thread_p, cur_agg_p->list_id);
            }
          if (cur_agg_p->accumulator.value != NULL)
            {
              pr_clear_value (cur_agg_p->accumulator.value);
            }
          if (cur_agg_p->accumulator.value2 != NULL)
            {
              pr_clear_value (cur_agg_p->accumulator.value2);
            }
        }
          break;
        }
      if (orig_agg_p->function == PT_COUNT_STAR)
        {
          orig_agg_p->accumulator.curr_cnt += cur_agg_p->accumulator.curr_cnt;
          cur_agg_p->accumulator.curr_cnt = 0;
          cur_agg_p = cur_agg_p->next;
          continue;
        }

      if (orig_agg_p->option == Q_DISTINCT
          && orig_agg_p->function != PT_MIN && orig_agg_p->function != PT_MAX)
        {
          qfile_close_list (thread_p, cur_agg_p->list_id);
          if (cur_agg_p->list_id->tuple_cnt == 0)
        {
          qfile_destroy_list (thread_p, cur_agg_p->list_id);
          cur_agg_p = cur_agg_p->next;
          continue;
        }

          if (orig_agg_p->list_id->tuple_cnt > 0)
        {
          QFILE_LIST_ID *list_id_p = (QFILE_LIST_ID *) malloc (sizeof (QFILE_LIST_ID));
          if (list_id_p == nullptr)
            {
              er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
                  (size_t) sizeof (QFILE_LIST_ID));
              m_err_messages_p->move_top_error_message_to_this ();
              m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
              qfile_destroy_list (thread_p, cur_agg_p->list_id);
              cur_agg_p = cur_agg_p->next;
              continue;
            }
          qfile_copy_list_id (list_id_p, cur_agg_p->list_id, false, QFILE_PROHIBIT_DEPENDENT);
          qfile_connect_list (thread_p, orig_agg_p->list_id, list_id_p);
          qfile_clear_list_id (cur_agg_p->list_id);
          cur_agg_p = cur_agg_p->next;
          continue;
        }
          else if (orig_agg_p->list_id->type_list.type_cnt > 0)
        {
          qfile_clear_list_id (orig_agg_p->list_id);
        }
          else
        {
          QFILE_CLEAR_LIST_ID (orig_agg_p->list_id);
        }

          qfile_copy_list_id (orig_agg_p->list_id, cur_agg_p->list_id, false, QFILE_PROHIBIT_DEPENDENT);
          qfile_clear_list_id (cur_agg_p->list_id);
        }
      else if (orig_agg_p->function == PT_COUNT)
        {
          orig_agg_p->accumulator.curr_cnt += cur_agg_p->accumulator.curr_cnt;
          cur_agg_p->accumulator.curr_cnt = 0;
        }
      else
        {
          if (orig_agg_p->accumulator_domain.value_dom == NULL && cur_agg_p->accumulator_domain.value_dom != NULL)
        {
          orig_agg_p->accumulator_domain.value_dom = cur_agg_p->accumulator_domain.value_dom;
          orig_agg_p->accumulator_domain.value2_dom = cur_agg_p->accumulator_domain.value2_dom;
        }

          HL_HEAPID prev_heap_id = db_change_private_heap (thread_p, 0);
          int err = qdata_aggregate_accumulator_to_accumulator (thread_p, &orig_agg_p->accumulator,
            &orig_agg_p->accumulator_domain, orig_agg_p->function,
            orig_agg_p->domain, &cur_agg_p->accumulator);
          db_change_private_heap (thread_p, prev_heap_id);
          if (err != NO_ERROR)
        {
          m_err_messages_p->move_top_error_message_to_this ();
          m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
        }
          /* cur_agg_p accumulator cleanup is handled by qexec_clear_xasl on the cloned XASL. */
        }
      cur_agg_p = cur_agg_p->next;
    }
    }

    {
      std::lock_guard<std::mutex> lock (m_result_mutex);
      m_result_completed++;
      m_result_cv.notify_all ();
    }
    tl_outptr_list_p = nullptr;
    tl_agg_p = nullptr;
    tl_vd = nullptr;
    tl_xasl_p = nullptr;
    if (tl_tpl_buf.tpl != nullptr)
      {
    db_private_free (thread_p, tl_tpl_buf.tpl);
    tl_tpl_buf.tpl = nullptr;
      }
    tl_tpl_buf.size = 0;
  }


// Explicit template instantiations
  template class result_handler<RESULT_TYPE::MERGEABLE_LIST>;
  template class result_handler<RESULT_TYPE::XASL_SNAPSHOT>;
  template class result_handler<RESULT_TYPE::BUILDVALUE_OPT>;
}