Skip to content

File px_scan_input_handler_list.cpp

File List > cubrid > src > query > parallel > px_scan > px_scan_input_handler_list.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_scan_input_handler_list.cpp
 */

#include "px_scan_input_handler_list.hpp"
#include "error_code.h"
#include "error_manager.h"
#include "list_file.h"
#include "object_representation.h"  /* OR_GET_INT used by QFILE_GET_TUPLE_COUNT */
#include "query_list.h"
#include "query_manager.h"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_scan
{
  thread_local UINT64 input_handler_list::m_tl_bitmap = 0;
  thread_local VSID input_handler_list::m_tl_vsid = {NULL_SECTID, NULL_VOLID};
  thread_local QMGR_TEMP_FILE *input_handler_list::m_tl_current_tfile = nullptr;
  thread_local bool input_handler_list::m_tl_is_membuf_worker = false;
  thread_local int input_handler_list::m_tl_membuf_pageid = 0;

  int
  input_handler_list::init_on_main (THREAD_ENTRY *thread_p, QFILE_LIST_ID *list_id, int parallelism)
  {
    /* Idempotent close before re-open. */
    qfile_close_list_sector_scan (thread_p, &m_sector_scan);

    if (parallelism <= 0 || list_id == nullptr || VPID_ISNULL (&list_id->first_vpid))
      {
    m_list_id = nullptr;
    return NO_ERROR;
      }

    /* Defer publishing m_list_id until open succeeds. */
    int error_code = qfile_open_list_sector_scan (thread_p, list_id, &m_sector_scan);
    if (error_code != NO_ERROR)
      {
    m_list_id = nullptr;
    return error_code;
      }
    m_list_id = list_id;
    (void) parallelism;
    return NO_ERROR;
  }

  int
  input_handler_list::initialize (THREAD_ENTRY *thread_p, HFID *hfid, SCAN_ID *scan_id)
  {
    m_tl_bitmap = 0;
    VSID_SET_NULL (&m_tl_vsid);
    m_tl_current_tfile = nullptr;

    /* first live worker wins membuf — cf. sector_page_iterator (px_hash_join_task_manager.cpp:225) */
    m_tl_is_membuf_worker = false;
    m_tl_membuf_pageid = 0;
    if (m_sector_scan.sector_info.membuf_tfile != nullptr)
      {
    bool expected = false;
    if (m_sector_scan.membuf_claimed.compare_exchange_strong (expected, true, std::memory_order_acq_rel))
      {
        m_tl_is_membuf_worker = true;
      }
      }

    (void) hfid;
    (void) scan_id;
    return NO_ERROR;
  }

  SCAN_CODE
  input_handler_list::get_next_page_with_fix (THREAD_ENTRY *thread_p,
      PAGE_PTR &out_page,
      QMGR_TEMP_FILE *&out_tfile)
  {
    out_page = nullptr;
    out_tfile = nullptr;

    while (true)
      {
    VPID vpid;

    /* Phase 1: membuf owner drains membuf pages (NULL_VOLID, sequential pageid). */
    if (m_tl_is_membuf_worker
        && m_sector_scan.sector_info.membuf_tfile != nullptr
        && m_tl_membuf_pageid <= m_sector_scan.sector_info.membuf_tfile->membuf_last)
      {
        vpid.volid = NULL_VOLID;
        vpid.pageid = m_tl_membuf_pageid++;
        m_tl_current_tfile = m_sector_scan.sector_info.membuf_tfile;
      }
    else
      {
        /* Phase 2: refill bitmap from next sector via dynamic atomic claim. */
        if (m_tl_bitmap == 0)
          {
        int sidx = m_sector_scan.next_sector_index.fetch_add (1, std::memory_order_relaxed);
        if (sidx >= m_sector_scan.sector_info.sector_cnt)
          {
            return S_END;
          }
        m_tl_vsid = m_sector_scan.sector_info.sectors[sidx].vsid;
        m_tl_bitmap = m_sector_scan.sector_info.sectors[sidx].page_bitmap;
        m_tl_current_tfile = (QMGR_TEMP_FILE *) m_sector_scan.sector_info.tfiles[sidx];
        if (m_tl_bitmap == 0)
          {
            continue;   /* defensive: empty sector */
          }
          }

        if (!qfile_sector_bitmap_next_vpid (&m_tl_vsid, &m_tl_bitmap, &vpid))
          {
        continue;   /* defensive: helper false despite non-zero check above */
          }
      }

    /* Single READ-latch fix; transfer ownership to caller on S_SUCCESS. */
    PAGE_PTR page_p = qmgr_get_old_page_read_only (thread_p, &vpid, m_tl_current_tfile);
    if (page_p == nullptr)
      {
        assert_release_error (er_errid () != NO_ERROR);
        return S_ERROR;
      }

    /* Overflow continuations: the start-page owner walks the chain via qfile_assemble_overflow_tuple. */
    if (QFILE_GET_TUPLE_COUNT (page_p) == QFILE_OVERFLOW_TUPLE_COUNT_FLAG)
      {
        qmgr_free_old_page (thread_p, page_p, m_tl_current_tfile);
        continue;
      }

    out_page = page_p;
    out_tfile = m_tl_current_tfile;
    return S_SUCCESS;
      }
  }

  int
  input_handler_list::finalize (THREAD_ENTRY *thread_p)
  {
    m_tl_bitmap = 0;
    VSID_SET_NULL (&m_tl_vsid);
    m_tl_current_tfile = nullptr;
    m_tl_is_membuf_worker = false;
    m_tl_membuf_pageid = 0;
    return NO_ERROR;
  }

  void
  input_handler_list::cleanup_on_main (THREAD_ENTRY *thread_p)
  {
    qfile_close_list_sector_scan (thread_p, &m_sector_scan);
    m_list_id = nullptr;
  }
}