Skip to content

File px_heap_scan_input_handler_ftabs.cpp

File List > cubrid > src > query > parallel > px_heap_scan > px_heap_scan_input_handler_ftabs.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_heap_scan_input_handler_ftabs.cpp
 */


#include "px_heap_scan_input_handler_ftabs.hpp"
#include "error_code.h"
#include "bit.h"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_heap_scan
{
  thread_local HEAP_SCANCACHE *input_handler_ftabs::m_tl_scan_cache = NULL;
  thread_local PGBUF_WATCHER input_handler_ftabs::m_tl_old_page_watcher = {0};
  thread_local ftab_set *input_handler_ftabs::m_tl_ftab_set = NULL;
  thread_local VPID input_handler_ftabs::m_tl_vpid = VPID_INITIALIZER;
  thread_local size_t input_handler_ftabs::m_tl_pgoffset = 0;
  thread_local FILE_PARTIAL_SECTOR input_handler_ftabs::m_tl_ftab = FILE_PARTIAL_SECTOR_INITIALIZER;


  int input_handler_ftabs::initialize (THREAD_ENTRY *thread_p, HFID *hfid, SCAN_ID *scan_id)
  {
    m_tl_scan_cache = &scan_id->s.hsid.scan_cache;
    /* open_scan should have succeeded */
    assert (m_tl_scan_cache->debug_initpattern == 12345);
    PGBUF_INIT_WATCHER (&m_tl_old_page_watcher, PGBUF_ORDERED_HEAP_NORMAL, hfid);
    int idx = m_splited_ftab_set_idx.fetch_add (1);
    if (idx < 0 || (size_t) idx >= m_splited_ftab_set.size ())
      {
    assert_release (false);
    return ER_FAILED;
      }
    m_tl_ftab_set = &m_splited_ftab_set[idx];
    m_tl_vpid = VPID_INITIALIZER;
    m_tl_pgoffset = 0;
    m_tl_ftab = FILE_PARTIAL_SECTOR_INITIALIZER;
    return NO_ERROR;
  }

  int input_handler_ftabs::init_on_main (THREAD_ENTRY *thread_p, HFID hfid, int parallelism)
  {
    FILE_FTAB_COLLECTOR collector;
    int error_code;
    m_hfid = hfid;

    error_code = file_get_all_data_sectors (thread_p, &m_hfid.vfid, &collector);
    if (error_code != NO_ERROR)
      {
    if (collector.partsect_ftab != NULL)
      {
        db_private_free_and_init (thread_p, collector.partsect_ftab);
      }
    return error_code;
      }
    m_ftab_set.convert (&collector);
    m_splited_ftab_set = m_ftab_set.split (parallelism);
    m_splited_ftab_set_idx.store (0);
    m_ftab_set.clear();

    if (collector.partsect_ftab != NULL)
      {
    db_private_free_and_init (thread_p, collector.partsect_ftab);
      }
    return NO_ERROR;
  }

  SCAN_CODE input_handler_ftabs::get_next_vpid_with_fix (THREAD_ENTRY *thread_p, VPID *vpid)
  {
    int error_code = NO_ERROR;

    bool found = false;
    while (!found)
      {
    if (VPID_ISNULL (&m_tl_vpid))
      {
        m_tl_ftab = m_tl_ftab_set->get_next();
        if (VSID_IS_NULL (&m_tl_ftab.vsid))
          {
        if (m_tl_old_page_watcher.pgptr != NULL)
          {
            pgbuf_ordered_unfix (thread_p, &m_tl_old_page_watcher);
          }
        return S_END;
          }
        m_tl_pgoffset = 0;
        m_tl_vpid.volid = m_tl_ftab.vsid.volid;
        m_tl_vpid.pageid = SECTOR_FIRST_PAGEID (m_tl_ftab.vsid.sectid);
        if (m_tl_vpid.volid == m_hfid.vfid.volid && m_tl_vpid.pageid == m_hfid.vfid.fileid)
          {
        /* skip heap header page */
        m_tl_pgoffset++;
        m_tl_vpid.pageid++;
          }
      }

    for (; m_tl_pgoffset < DISK_SECTOR_NPAGES; m_tl_pgoffset++, m_tl_vpid.pageid++)
      {
        if (bit64_is_set (m_tl_ftab.page_bitmap, (int) m_tl_pgoffset))
          {
        found = true;

        if (m_tl_scan_cache->page_watcher.pgptr != NULL)
          {
            pgbuf_replace_watcher (thread_p, &m_tl_scan_cache->page_watcher, &m_tl_old_page_watcher);
          }

        error_code = pgbuf_ordered_fix (thread_p, &m_tl_vpid, OLD_PAGE_MAYBE_DEALLOCATED, PGBUF_LATCH_READ,
                        &m_tl_scan_cache->page_watcher);

        if (m_tl_scan_cache->page_watcher.pgptr == NULL)
          {
            if (error_code != NO_ERROR && error_code != ER_PB_BAD_PAGEID)
              {
            /* non-dealloc error (e.g. ER_INTERRUPTED): propagate */
            m_err_messages_p->move_top_error_message_to_this ();
            m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            return S_ERROR;
              }
            /* when bitmap is built, that page was valid.
             * but now, it's deallocated in some reasons.
             * this is not error, it can be ignored */
            if (m_tl_old_page_watcher.pgptr != NULL)
              {
            pgbuf_ordered_unfix (thread_p, &m_tl_old_page_watcher);
              }
            er_clear ();
            found = false;
            continue;
          }

        if (m_tl_old_page_watcher.pgptr != NULL)
          {
            pgbuf_ordered_unfix (thread_p, &m_tl_old_page_watcher);
          }

        assert (pgbuf_get_page_ptype (thread_p, m_tl_scan_cache->page_watcher.pgptr) == PAGE_HEAP);

        if (error_code != NO_ERROR)
          {
            m_err_messages_p->move_top_error_message_to_this();
            m_interrupt_p->set_code (parallel_query::interrupt::interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD);
            return S_ERROR;
          }

        *vpid = m_tl_vpid;
        m_tl_pgoffset++;
        m_tl_vpid.pageid++;
        return S_SUCCESS;
          }
      }

    if (m_tl_pgoffset >= DISK_SECTOR_NPAGES)
      {
        VPID_SET_NULL (&m_tl_vpid);
      }
      }
    return S_ERROR; /* unreachable */
  }

  int input_handler_ftabs::finalize (THREAD_ENTRY *thread_p)
  {
    if (m_tl_old_page_watcher.pgptr != NULL)
      {
    pgbuf_ordered_unfix (thread_p, &m_tl_old_page_watcher);
      }
    if (m_tl_scan_cache->page_watcher.pgptr != NULL)
      {
    pgbuf_ordered_unfix (thread_p, &m_tl_scan_cache->page_watcher);
      }
    m_tl_scan_cache = NULL;
    m_tl_old_page_watcher.pgptr = NULL;
    return NO_ERROR;
  }
}