File px_scan_index_overflow_chain_pool.cpp¶
File List > cubrid > src > query > parallel > px_scan > index > px_scan_index_overflow_chain_pool.cpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/* multi-chain shared overflow pool v3: key-publish (Option B), fix-outside-mutex. */
#include "px_scan_index_overflow_chain_pool.hpp"
#include "btree.h"
#include "btree_load.h"
#include "dbtype.h"
#include "error_code.h"
#include "error_manager.h"
#include "memory_alloc.h"
#include "object_primitive.h"
#include "page_buffer.h"
#include "storage_common.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace parallel_index_scan
{
/* clone slot.key on common heap (id 0) so any worker thread can free it. */
int
overflow_chain_pool::clone_key_common_heap (DB_VALUE *dest, const DB_VALUE *src)
{
#if defined (SERVER_MODE)
HL_HEAPID old_heap = db_private_set_heapid_to_thread (NULL, 0);
#endif
int rc = pr_clone_value (src, dest);
#if defined (SERVER_MODE)
(void) db_private_set_heapid_to_thread (NULL, old_heap);
#endif
return rc;
}
/* free slot.key on common heap; cross-thread private-heap free (last-out helper != producer) aborts. */
void
overflow_chain_pool::clear_key_common_heap (DB_VALUE *val)
{
#if defined (SERVER_MODE)
HL_HEAPID old_heap = db_private_set_heapid_to_thread (NULL, 0);
#endif
pr_clear_value (val);
#if defined (SERVER_MODE)
(void) db_private_set_heapid_to_thread (NULL, old_heap);
#endif
}
/* m_overflow_mutex held; clears key, resets all slot fields. */
void
overflow_chain_pool::close_slot_locked (overflow_slot &slot)
{
clear_key_common_heap (&slot.key);
db_make_null (&slot.key);
slot.clear_key = false;
slot.active = false;
VPID_SET_NULL (&slot.cur_vpid);
slot.chain_walked = true;
slot.range_idx = -1;
slot.helpers = 0;
slot.claim_in_flight = false;
m_overflow_cv.notify_all ();
}
/* m_overflow_mutex held; never decrement helpers — exit_help owns the one decrement (else underflow → leak/ABA). */
void
overflow_chain_pool::mark_chain_dead_locked (overflow_slot &slot)
{
assert (slot.active);
assert (slot.helpers > 0);
VPID_SET_NULL (&slot.cur_vpid);
slot.chain_walked = true;
slot.claim_in_flight = false;
m_overflow_cv.notify_all ();
}
/* cap==parallelism + producer/late-joiner time-disjoint => -1 unreachable; -1 is defense-in-depth. */
int
overflow_chain_pool::try_publish (THREAD_ENTRY *thread_p, VPID first_ovf_vpid,
const DB_VALUE *key, int range_idx)
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
/* producer still owns an active worker slot; another worker may have already called signal_no_more_leaves. */
assert (m_active_workers > 0);
for (int i = 0; i < static_cast<int> (m_overflow_slots.size ()); i++)
{
if (!m_overflow_slots[i].active)
{
overflow_slot &slot = m_overflow_slots[i];
slot.cur_vpid = first_ovf_vpid;
slot.range_idx = range_idx;
slot.helpers = 1; /* producer counts itself */
slot.chain_walked = false;
slot.claim_in_flight = false;
slot.active = true;
/* null before clone (slot-reuse safety). */
db_make_null (&slot.key);
if (clone_key_common_heap (&slot.key, key) != NO_ERROR)
{
/* clone failed; roll back partial key. */
close_slot_locked (slot);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FAILED, 0);
return -1;
}
slot.clear_key = true;
m_overflow_cv.notify_all ();
return i;
}
}
/* invariant break: surface in release logs (er_set), trip debug CI (assert); -1 feeds caller's S_ERROR. */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FAILED, 0);
assert (false && "try_publish invariant: cap == parallelism guarantees a free slot");
return -1;
}
SCAN_CODE
overflow_chain_pool::claim_next (THREAD_ENTRY *thread_p, int slot_idx, PAGE_PTR &out_page, int &out_range_idx)
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
assert (slot_idx >= 0 && slot_idx < static_cast<int> (m_overflow_slots.size ()));
overflow_slot &slot = m_overflow_slots[slot_idx];
if (!slot.active || VPID_ISNULL (&slot.cur_vpid))
{
return S_END;
}
/* predicate-loop: shared CV wakes spuriously / for other slots — recheck before taking cursor. */
while (slot.claim_in_flight)
{
m_overflow_cv.wait (lock);
if (!slot.active || VPID_ISNULL (&slot.cur_vpid))
{
return S_END;
}
}
VPID claim_vpid = slot.cur_vpid;
slot.claim_in_flight = true;
lock.unlock ();
/* fix outside m_overflow_mutex: no latch held under the mutex → can't form a leaf<->overflow latch cycle. */
PAGE_PTR page = pgbuf_fix (thread_p, &claim_vpid, OLD_PAGE, PGBUF_LATCH_READ, PGBUF_UNCONDITIONAL_LATCH);
if (page == NULL)
{
ASSERT_ERROR ();
lock.lock ();
/* Option B: dead-mark; slot stays active till last-out exit_help closes it — no straggler recycle (ABA guard). */
mark_chain_dead_locked (slot);
return S_ERROR;
}
(void) pgbuf_check_page_ptype (thread_p, page, PAGE_BTREE);
VPID next_vpid;
if (btree_get_next_overflow_vpid (thread_p, page, &next_vpid) != NO_ERROR)
{
ASSERT_ERROR ();
pgbuf_unfix (thread_p, page);
lock.lock ();
/* Option B (see pgbuf_fix path): dead-mark, defer close to last-out exit_help. */
mark_chain_dead_locked (slot);
return S_ERROR;
}
lock.lock ();
slot.cur_vpid = next_vpid;
slot.claim_in_flight = false;
if (VPID_ISNULL (&next_vpid))
{
slot.chain_walked = true;
}
m_overflow_cv.notify_all ();
out_page = page;
out_range_idx = slot.range_idx;
return S_SUCCESS;
}
void
overflow_chain_pool::release_page (THREAD_ENTRY *thread_p, PAGE_PTR page)
{
if (page != NULL)
{
pgbuf_unfix (thread_p, page);
}
}
/* per-slot helpers decrement; last out closes chain via close_slot_locked. */
void
overflow_chain_pool::exit_help (THREAD_ENTRY *thread_p, int slot_idx)
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
assert (slot_idx >= 0 && slot_idx < static_cast<int> (m_overflow_slots.size ()));
overflow_slot &slot = m_overflow_slots[slot_idx];
if (!slot.active)
{
/* guards a redundant exit_help after last-out close; do NOT remove — stray double-exit underflows helpers. */
return;
}
assert (slot.helpers > 0); /* holds: failing thread stays counted (>=1). */
--slot.helpers;
if (slot.helpers == 0)
{
close_slot_locked (slot);
}
}
SCAN_CODE
overflow_chain_pool::wait_or_help (THREAD_ENTRY *thread_p, PAGE_PTR &out_page,
DB_VALUE *out_local_key, bool *out_local_clear_key,
int &out_range_idx, int &out_slot_idx)
{
assert (out_local_key != nullptr && out_local_clear_key != nullptr);
db_make_null (out_local_key);
*out_local_clear_key = false;
out_slot_idx = -1;
std::unique_lock<std::mutex> lock (m_overflow_mutex);
for (;;)
{
/* termination predicate: no active slots && producer-side drained && no other workers running. */
bool any_active = false;
for (const auto &s : m_overflow_slots)
{
if (s.active)
{
any_active = true;
break;
}
}
if (!any_active && m_no_more_leaves && m_active_workers == 0)
{
return S_END;
}
if (any_active)
{
/* round-robin pick; relaxed counter is best-effort; under-lock slot-scan provides actual synchronization. */
int cap = static_cast<int> (m_overflow_slots.size ());
int base = m_next_chain_to_help.fetch_add (1, std::memory_order_relaxed) % cap;
int picked = -1;
for (int i = 0; i < cap; i++)
{
int idx = (base + i) % cap;
overflow_slot &s = m_overflow_slots[idx];
if (s.active && !s.chain_walked)
{
picked = idx;
/* helpers++ under mutex pins slot across the unlock window. */
s.helpers++;
break;
}
}
if (picked < 0)
{
/* All active slots are chain_walked; wait for last-out to close them. */
m_overflow_cv.wait (lock);
continue;
}
/* Option B: clone key from slot while still under mutex (helpers++ pins slot lifetime). */
overflow_slot &ps = m_overflow_slots[picked];
if (pr_clone_value (&ps.key, out_local_key) != NO_ERROR)
{
ASSERT_ERROR ();
/* clone may leave a partial value; release here — caller won't (out_local_clear_key still false). */
pr_clear_value (out_local_key);
db_make_null (out_local_key);
/* undo the helpers++ */
--ps.helpers;
if (ps.helpers == 0)
{
close_slot_locked (ps);
}
return S_ERROR;
}
*out_local_clear_key = true;
lock.unlock ();
/* claim_next manages its own mutex; fix happens outside m_overflow_mutex (no inversion). */
SCAN_CODE sc = claim_next (thread_p, picked, out_page, out_range_idx);
if (sc == S_SUCCESS)
{
out_slot_idx = picked;
return S_SUCCESS;
}
if (sc == S_ERROR)
{
pr_clear_value (out_local_key);
db_make_null (out_local_key);
*out_local_clear_key = false;
exit_help (thread_p, picked);
return S_ERROR;
}
/* S_END: chain exhausted between pick and claim; clear owned key, release helper, retry. */
pr_clear_value (out_local_key);
db_make_null (out_local_key);
*out_local_clear_key = false;
exit_help (thread_p, picked);
lock.lock ();
continue;
}
/* no active slots but workers still running; wait. */
m_overflow_cv.wait (lock);
}
}
void
overflow_chain_pool::enter_worker ()
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
++m_active_workers;
}
void
overflow_chain_pool::leave_worker ()
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
assert (m_active_workers > 0);
--m_active_workers;
if (m_active_workers == 0)
{
m_overflow_cv.notify_all ();
}
}
void
overflow_chain_pool::signal_no_more_leaves ()
{
std::unique_lock<std::mutex> lock (m_overflow_mutex);
m_no_more_leaves = true;
m_overflow_cv.notify_all ();
}
}