File px_scan_index_overflow_chain_pool.hpp¶
File List > cubrid > src > query > parallel > px_scan > index > px_scan_index_overflow_chain_pool.hpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* px_scan_index_overflow_chain_pool.hpp
*/
#ifndef _PX_SCAN_INDEX_OVERFLOW_CHAIN_POOL_HPP_
#define _PX_SCAN_INDEX_OVERFLOW_CHAIN_POOL_HPP_
#include "btree.h"
#include "dbtype.h"
#include "scan_manager.h"
#include "storage_common.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
namespace parallel_index_scan
{
/* Per-chain shared overflow descriptor; lifetime bounded to [publish, helpers==0 && chain_walked]. */
struct overflow_slot
{
VPID cur_vpid; /* chain cursor; VPID_ISNULL after chain_walked. */
int range_idx; /* owning range. */
int helpers; /* drainers; producer counts itself. helpers==0 + chain_walked => releasable. */
bool chain_walked; /* cur_vpid hit VPID_ISNULL. */
bool active; /* slot in use; gates round-robin pick + termination predicate. */
bool claim_in_flight; /* one in-flight claim at a time; predicate-loop in claim_next. */
DB_VALUE key; /* deep-copied from producer's m_slot_key at publish; slot-owned. */
bool clear_key; /* slot.key owns var-length storage; init() frees pre-existing slots on reinit. */
};
/* (A) Multi-chain shared overflow pool (v2): cap = parallelism slots, helper supply matches chain demand. */
class overflow_chain_pool
{
public:
overflow_chain_pool ()
: m_overflow_slots (),
m_next_chain_to_help (0),
m_active_workers (0),
m_no_more_leaves (false)
{
}
/* main-thread: reset slot vector to cap. */
void init (int parallelism)
{
/* Clear variable-length key storage before overwriting slots (reinit / abort-cleanup safety). */
for (auto &slot : m_overflow_slots)
{
if (slot.clear_key)
{
clear_key_common_heap (&slot.key);
}
}
m_overflow_slots.assign (parallelism, overflow_slot {});
for (auto &slot : m_overflow_slots)
{
VPID_SET_NULL (&slot.cur_vpid);
slot.range_idx = -1;
slot.helpers = 0;
slot.chain_walked = false;
slot.active = false;
slot.claim_in_flight = false;
db_make_null (&slot.key);
slot.clear_key = false;
}
m_next_chain_to_help.store (0, std::memory_order_relaxed);
m_active_workers = 0;
m_no_more_leaves = false;
}
/* returns slot_idx>=0; -1 only on broken cap==parallelism invariant; key deep-copied from producer. */
int try_publish (THREAD_ENTRY *thread_p, VPID first_ovf_vpid,
const DB_VALUE *key, int range_idx);
/* slot_idx mandatory: identifies which chain to advance. */
SCAN_CODE claim_next (THREAD_ENTRY *thread_p, int slot_idx, PAGE_PTR &out_page, int &out_range_idx);
void release_page (THREAD_ENTRY *thread_p, PAGE_PTR page);
/* slot_idx mandatory: decrements helpers on the specific slot. */
void exit_help (THREAD_ENTRY *thread_p, int slot_idx);
/* caller owns out_local_key on S_SUCCESS (per out_local_clear_key); cleared on S_END/S_ERROR. */
SCAN_CODE wait_or_help (THREAD_ENTRY *thread_p, PAGE_PTR &out_page,
DB_VALUE *out_local_key, bool *out_local_clear_key,
int &out_range_idx, int &out_slot_idx);
void enter_worker ();
void leave_worker ();
void signal_no_more_leaves ();
private:
/* m_overflow_mutex held; clears key, resets slot fields, notifies waiters. */
void close_slot_locked (overflow_slot &slot);
/* m_overflow_mutex held; marks chain dead, slot stays occupied (ABA guard) — last-out exit_help closes it. */
void mark_chain_dead_locked (overflow_slot &slot);
/* slot.key cloned by producer, freed by another (last-out) thread; use common heap (id 0). */
static int clone_key_common_heap (DB_VALUE *dest, const DB_VALUE *src);
static void clear_key_common_heap (DB_VALUE *val);
std::mutex m_overflow_mutex;
std::condition_variable m_overflow_cv;
std::vector<overflow_slot> m_overflow_slots; /* size == parallelism; cap = helper supply. */
std::atomic<int> m_next_chain_to_help; /* round-robin cursor; fetch_add(1) % cap. */
/* Late-joiner termination tracking (under m_overflow_mutex) */
int m_active_workers; /* workers currently inside loop body */
bool m_no_more_leaves; /* set when last get_next_page_with_fix returned S_END */
};
}
#endif /* _PX_SCAN_INDEX_OVERFLOW_CHAIN_POOL_HPP_ */