File px_scan_index_overflow_chain_pool.cpp¶
File List > cubrid > src > query > parallel > px_scan > index > px_scan_index_overflow_chain_pool.cpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/* px_scan_index_overflow_chain_pool.cpp — multi-chain shared overflow share v2: per-slot active-chains, leaf re-read, round-robin. */
#include "px_scan_index_overflow_chain_pool.hpp"
#include "btree.h"
#include "btree_load.h"
#include "dbtype.h"
#include "error_code.h"
#include "error_manager.h"
#include "object_primitive.h"
#include "page_buffer.h"
#include "slotted_page.h"
#include "storage_common.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace parallel_index_scan
{
/* cap == parallelism + producer/late-joiner time-disjoint per worker => -1 unreachable; -1 path is defense-in-depth (er_set + assert) feeding the sole caller's S_ERROR. */
int
overflow_chain_pool::try_publish (THREAD_ENTRY *thread_p, VPID first_ovf_vpid,
VPID leaf_vpid, PGSLOTID leaf_slot_id, int range_idx)
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
/* producer-mode precondition: inside enter_worker, before signal_no_more_leaves. */
assert (m_active_workers > 0 && !m_no_more_leaves);
/* find free slot — O(parallelism), small N. */
for (int i = 0; i < static_cast<int> (m_overflow_slots.size ()); i++)
{
if (!m_overflow_slots[i].active)
{
overflow_slot &slot = m_overflow_slots[i];
slot.cur_vpid = first_ovf_vpid;
slot.leaf_vpid = leaf_vpid;
slot.leaf_slot_id = leaf_slot_id;
slot.range_idx = range_idx;
slot.helpers = 1; /* producer counts itself */
slot.chain_walked = false;
slot.active = true;
m_overflow_cv.notify_all ();
return i;
}
}
/* invariant break: surface in release logs (er_set), trip debug CI (assert); -1 feeds caller's S_ERROR. */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FAILED, 0);
assert (false && "try_publish invariant: cap == parallelism guarantees a free slot");
return -1;
}
SCAN_CODE
overflow_chain_pool::claim_next (THREAD_ENTRY *thread_p, int slot_idx, PAGE_PTR &out_page, int &out_range_idx)
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
assert (slot_idx >= 0 && slot_idx < static_cast<int> (m_overflow_slots.size ()));
overflow_slot &slot = m_overflow_slots[slot_idx];
if (!slot.active || VPID_ISNULL (&slot.cur_vpid))
{
return S_END;
}
VPID claim_vpid = slot.cur_vpid;
PAGE_PTR page = pgbuf_fix (thread_p, &claim_vpid, OLD_PAGE, PGBUF_LATCH_READ, PGBUF_UNCONDITIONAL_LATCH);
if (page == NULL)
{
ASSERT_ERROR ();
slot.chain_walked = true;
VPID_SET_NULL (&slot.cur_vpid);
m_overflow_cv.notify_all ();
return S_ERROR;
}
(void) pgbuf_check_page_ptype (thread_p, page, PAGE_BTREE);
VPID next_vpid;
if (btree_get_next_overflow_vpid (thread_p, page, &next_vpid) != NO_ERROR)
{
ASSERT_ERROR ();
pgbuf_unfix (thread_p, page);
slot.chain_walked = true;
VPID_SET_NULL (&slot.cur_vpid);
m_overflow_cv.notify_all ();
return S_ERROR;
}
slot.cur_vpid = next_vpid;
if (VPID_ISNULL (&next_vpid))
{
slot.chain_walked = true;
m_overflow_cv.notify_all ();
}
out_page = page;
out_range_idx = slot.range_idx;
return S_SUCCESS;
}
void
overflow_chain_pool::release_page (THREAD_ENTRY *thread_p, PAGE_PTR page)
{
if (page != NULL)
{
pgbuf_unfix (thread_p, page);
}
}
/* per-slot helpers decrement; last out closes chain unconditionally. */
void
overflow_chain_pool::exit_help (THREAD_ENTRY *thread_p, int slot_idx)
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
assert (slot_idx >= 0 && slot_idx < static_cast<int> (m_overflow_slots.size ()));
overflow_slot &slot = m_overflow_slots[slot_idx];
assert (slot.active && slot.helpers > 0);
--slot.helpers;
if (slot.helpers == 0)
{
/* helpers==0 closes the chain unconditionally (mirrors v1 error/interrupt clean-close). */
slot.active = false;
VPID_SET_NULL (&slot.cur_vpid);
slot.chain_walked = true;
slot.range_idx = -1;
m_overflow_cv.notify_all ();
}
}
SCAN_CODE
overflow_chain_pool::wait_or_help (THREAD_ENTRY *thread_p, PAGE_PTR &out_page,
DB_VALUE *out_local_key, bool *out_local_clear_key,
int &out_range_idx, int &out_slot_idx)
{
assert (out_local_key != nullptr && out_local_clear_key != nullptr);
db_make_null (out_local_key);
*out_local_clear_key = false;
out_slot_idx = -1;
BTID_INT *btid_int = m_ranges->get_btid_int ();
std::unique_lock<std::mutex> lock (m_overflow_mutex);
for (;;)
{
/* termination predicate: no active slots && producer-side drained && no other workers running. */
bool any_active = false;
for (const auto &s : m_overflow_slots)
{
if (s.active)
{
any_active = true;
break;
}
}
if (!any_active && m_no_more_leaves && m_active_workers == 0)
{
return S_END;
}
if (any_active)
{
/* round-robin pick; relaxed counter is best-effort; under-lock slot-scan provides actual synchronization. */
int cap = static_cast<int> (m_overflow_slots.size ());
int base = m_next_chain_to_help.fetch_add (1, std::memory_order_relaxed) % cap;
int picked = -1;
VPID re_leaf_vpid;
PGSLOTID re_slot_id = NULL_SLOTID;
VPID_SET_NULL (&re_leaf_vpid);
for (int i = 0; i < cap; i++)
{
int idx = (base + i) % cap;
overflow_slot &s = m_overflow_slots[idx];
if (s.active && !s.chain_walked)
{
picked = idx;
/* helpers++ under mutex pins slot active across the unlocked leaf re-read window. */
s.helpers++;
re_leaf_vpid = s.leaf_vpid;
re_slot_id = s.leaf_slot_id;
break;
}
}
if (picked < 0)
{
m_overflow_cv.wait (lock); /* every slot active && chain_walked; wait for last-out to close. */
continue;
}
/* AS1 race-window close: pin leaf S-latch INSIDE m_overflow_mutex while producer's S-hold still covers it; helper's own S then keeps any X-acquirer (split/compactify/vacuum) out. Never S_PROMOTE (C7). */
PAGE_PTR leaf_page = pgbuf_fix (thread_p, &re_leaf_vpid, OLD_PAGE, PGBUF_LATCH_READ,
PGBUF_UNCONDITIONAL_LATCH);
if (leaf_page == NULL)
{
ASSERT_ERROR ();
overflow_slot &s = m_overflow_slots[picked];
--s.helpers;
if (s.helpers == 0)
{
s.active = false;
VPID_SET_NULL (&s.cur_vpid);
s.chain_walked = true;
s.range_idx = -1;
m_overflow_cv.notify_all ();
}
return S_ERROR;
}
lock.unlock ();
(void) pgbuf_check_page_ptype (thread_p, leaf_page, PAGE_BTREE);
RECDES leaf_rec;
leaf_rec.data = nullptr;
leaf_rec.area_size = -1;
if (spage_get_record (thread_p, leaf_page, re_slot_id, &leaf_rec, PEEK) != S_SUCCESS)
{
ASSERT_ERROR ();
pgbuf_unfix (thread_p, leaf_page);
exit_help (thread_p, picked);
return S_ERROR;
}
LEAF_REC leaf_rec_info_unused;
int after_key_offset_unused = 0;
bool local_clear_key = false;
int rerr = btree_read_record (thread_p, btid_int, leaf_page, &leaf_rec, out_local_key,
&leaf_rec_info_unused, BTREE_LEAF_NODE,
&local_clear_key, &after_key_offset_unused, COPY, nullptr);
pgbuf_unfix (thread_p, leaf_page);
if (rerr != NO_ERROR)
{
ASSERT_ERROR ();
if (local_clear_key)
{
pr_clear_value (out_local_key);
}
exit_help (thread_p, picked);
return S_ERROR;
}
*out_local_clear_key = local_clear_key;
/* claim drives chain cursor; key already owned by caller. */
SCAN_CODE sc = claim_next (thread_p, picked, out_page, out_range_idx);
if (sc == S_SUCCESS)
{
out_slot_idx = picked;
return S_SUCCESS;
}
if (sc == S_ERROR)
{
if (local_clear_key)
{
pr_clear_value (out_local_key);
}
*out_local_clear_key = false;
exit_help (thread_p, picked);
return S_ERROR;
}
/* sc == S_END — chain exhausted between pick and claim; clear owned key, release helper, retry. */
if (local_clear_key)
{
pr_clear_value (out_local_key);
}
*out_local_clear_key = false;
db_make_null (out_local_key);
exit_help (thread_p, picked);
lock.lock ();
continue;
}
/* no active slots but workers still running; wait. */
m_overflow_cv.wait (lock);
}
}
void
overflow_chain_pool::enter_worker ()
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
++m_active_workers;
}
void
overflow_chain_pool::leave_worker ()
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
assert (m_active_workers > 0);
--m_active_workers;
if (m_active_workers == 0)
{
m_overflow_cv.notify_all ();
}
}
void
overflow_chain_pool::signal_no_more_leaves ()
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
m_no_more_leaves = true;
m_overflow_cv.notify_all ();
}
}