CUBRID Parallel Query — Intra-Query Parallelism Across Heap Scan, Hash Join, and Query Execute
Contents:
- Theoretical Background
- Common DBMS Design
- CUBRID’s Approach
- The
parallel_querynamespace and the global worker pool compute_parallel_degree— log2 of page-count over thresholdworker_manager— per-operator reservation handle- Shared task plumbing —
callable_task,interrupt,err_messages_with_lock - Parallel heap scan
- Parallel hash join
- Parallel query execute
- Parallel sort
- The
- Source Walkthrough
- Cross-check Notes
- Open Questions
- Sources
Theoretical Background
Section titled “Theoretical Background”A relational executor turns a query plan into a stream of tuples. When the plan or its inputs are large, doing that on one CPU leaves cores idle and stretches latency. Intra-query parallelism runs several copies of one of the plan’s operators (or its leaves) concurrently and recombines the results so the consumer above sees the same output a serial plan would have produced.
The founding statement is Graefe’s Encapsulation of Parallelism
in the Volcano Query Processing System (SIGMOD 1990). Graefe
observed that the serial iterator (open/next/close) is a
sufficient contract for parallelism if you add one new operator:
the exchange. Exchange owns the queue between producer threads
and consumer threads, the partitioning policy that maps inputs to
producers, and the flow control. Four flavours fall out — Gather
(N→1), Scatter (1→N hash- or range-partitioned), Repartition
(N→N with a new function), Replicate (1→N broadcast). Most
engines pick a subset.
Three concepts inside that framework matter. Degree of
parallelism (DOP) is the integer N bounded by hardware (cores),
software (a global thread budget), and data (no point starting
eight workers to scan four pages). Partitioning is block-range
(N contiguous regions of a heap), hash (by join key into N
buckets), or round-robin; the choice constrains what the exchange
above can do — hash is partition-aware so a join above can run
per partition, block-range is not. Scheduling is long-lived
workers (Postgres BackgroundWorker, Oracle PX servers) versus
short-lived tasks recycled across operators (DuckDB, CUBRID).
Two refinements close the loop. The shared-build pattern (Anatomy of a Database System, Red Book Ch. 4) for parallel hash join: build the inner once into a shared structure, let N probe workers read concurrently. CUBRID realises a weaker form — the shared structure is a partition pool, not a shared hash table. The two-phase parallel sort (Database Systems: The Complete Book §15.4): N workers each sort a slice, then merge. CUBRID’s parallel sort is exactly slice-and-merge.
Where the layer sits fixes its responsibilities. Above:
qexec_execute_mainblock, scan_next_scan, qexec_hash_join,
sort_listfile_internal ask for tuples in the same domain
(DB_VALUE, XASL_NODE, QFILE_LIST_ID) the serial path
produces, so the consumer cannot tell the difference. Below:
access methods expose page-at-a-time interfaces; the parallel
layer must coordinate fixings, latches, and pinning across N
threads without violating the serial latch protocol. The thread
pool, error context, perf monitor, and interrupt machinery all
have to fan out and fan back in on completion. The interesting
part of CUBRID’s design is exactly that fan-out / fan-in plumbing.
Common DBMS Design
Section titled “Common DBMS Design”Every modern DBMS that scales beyond one core picks an implementation strategy. The variants are consistent at the abstract level (Graefe exchange everywhere) but differ in threading model, scheduling, and coverage.
PostgreSQL uses parallel-aware nodes plus a
BackgroundWorker pool. The planner emits Gather / Gather Merge at the top; below it, each parallel-eligible operator is
its own Parallel* node (ParallelSeqScan, ParallelIndexScan,
ParallelBitmapHeapScan, ParallelHashJoin, ParallelAppend,
ParallelAggregate). The leader forks N short-lived processes per
gather; coordination is via shared memory. DOP is set by
max_parallel_workers_per_gather, clipped by
min_parallel_table_scan_size. Threading is process-per-worker.
Oracle uses parallel execution slaves drawn from a shared
pool of long-lived PX servers. The plan carries Px Block Iterator / Px Partition Iterator plus Px Send/Px Receive
pairs. DOP is bounded by parallel_max_servers and tuned per
query by Auto DOP since 11gR2. Coverage is the broadest of any
mainstream engine — scan, join, sort, group-by, top-N, parallel
DML, parallel index build. Shared-build is realised by broadcast
distribution or hash-hash distribution.
SQL Server uses parallel-aware operators on top of the SQLOS
user-mode scheduler. DOP is bounded by max degree of parallelism
and MAXDOP. SQLOS keeps context-switch cost low and lets the
engine over-commit threads.
MySQL/InnoDB historically had no intra-query parallelism. From
8.0.14 InnoDB ships parallel read — multi-threaded clustered-
index scan used for CHECK TABLE, SELECT COUNT(*), some DDL
paths. The SQL layer above remains single-threaded; parallel join,
sort, and aggregation are not in mainline as of 8.4.
DuckDB embraces the morsel-driven model (Leis et al., SIGMOD 2014). The scheduler partitions sources into morsels, assigns them to threads from a fixed-size pool, and operators chain via push-based emission of data chunks.
CUBRID picks a fourth point: per-operator parallel
orchestrators on top of a shared, fixed-size worker pool. No
global exchange operator, no parallel-aware planner output. Three
operators — heap scan, hash join, top-level query execute — each
ship their own manager class that decides at open / execute
time whether to go parallel, partitions its work, reserves N
workers from the global pool, pushes N tasks, and joins. The pool
— one cubthread::worker_pool named “parallel-query” — is
shared. Coverage today: parallel sequential heap scan, parallel
hash join (Grace-style two-phase build/probe), parallel
uncorrelated subqueries (BUILDLIST_PROC / BUILDVALUE_PROC /
UNION_PROC / HASHJOIN_PROC / MERGELIST_PROC), parallel
slice-and-merge sort. Trade-off: less coverage than Postgres or
Oracle (no parallel index scan, no parallel aggregate above heap
scan, no parallel append) for a much thinner coordination layer —
three orchestrators, one pool, one DOP function, one
S_PARALLEL_HEAP_SCAN arm.
CUBRID’s Approach
Section titled “CUBRID’s Approach”The parallel_query namespace and the global worker pool
Section titled “The parallel_query namespace and the global worker pool”Everything lives under three sibling namespaces (parallel_query,
parallel_query_execute, parallel_heap_scan). The anchor is a
single class — worker_manager_global — a Meyers-singleton that
owns the OS-thread pool every parallel feature shares. It carries
cubthread::worker_pool_type *m_worker_pool, std::once_flag m_init_flag, std::atomic<int> m_available, and an int m_capacity; the public API is init / destroy, and the
internal API is try_reserve_workers / release_workers /
push_task.
Initialisation hooks into the project-wide worker-pool registry
through REGISTER_WORKERPOOL(parallel_query, ...) — every named
pool in cubthread is created at server start by walking that
registry, and the parallel-query pool is one entry sized at
PRM_ID_MAX_PARALLEL_WORKERS. Inside init(), a std::call_once
guard reads the parameter and either disables the feature
(max_parallel_workers < 2) or creates the pool via
thread_create_worker_pool(max_parallel_workers, 1, "parallel-query", thread_get_entry_manager()). The pool is
constant-sized (no autoscaling), single-purpose (task_max_count = 1 per worker), and named so CPU dumps attribute to it.
The interesting field is m_available: a single atomic counter of
workers not currently reserved. Every operator reserves before
pushing tasks (decrementing by N); on release it increments back.
The reservation is the engine-wide admission control:
// worker_manager_global::try_reserve_workers — src/query/parallel/px_worker_manager_global.cppint worker_manager_global::try_reserve_workers (const int num_workers){ int requested = MIN (num_workers, PRM_MAX_PARALLELISM); const int min_degree = (requested == 1) ? 1 : 2; int available = m_available.load (); while (true) { if (available < min_degree) return 0; int reserved = (requested <= available) ? requested : available; if (m_available.compare_exchange_weak (available, available - reserved)) return reserved; std::this_thread::yield (); // CAS lost, retry }}Two design choices stand out. First, the CAS grants partial reservations: if asked for 8 and only 5 are available, it returns 5; the operator must decide whether 5 is enough or fall back. Second, the minimum degree is 2 for “parallel execution” (heap scan, hash join, sort) — 1 worker is treated as 0, because main + 1 worker = serial-like plus overhead — but 1 for “parallel subquery” (uncorrelated aptr on a worker while main keeps going).
flowchart TB
subgraph "process-wide"
REG["REGISTER_WORKERPOOL(parallel_query, ...)"]
POOL["cubthread::worker_pool 'parallel-query'<br/>capacity = PRM_ID_MAX_PARALLEL_WORKERS"]
AVAIL["std::atomic<int> m_available"]
REG --> POOL --> AVAIL
end
subgraph "per-operator"
WMP1["worker_manager (heap scan)"]
WMP2["worker_manager (hash join)"]
WMP3["worker_manager (query exec)"]
WMP4["worker_manager (sort)"]
end
AVAIL -. "try_reserve_workers(N)" .-> WMP1
AVAIL -. " " .-> WMP2
AVAIL -. " " .-> WMP3
AVAIL -. " " .-> WMP4
WMP1 -. "release_workers" .-> AVAIL
WMP2 -. " " .-> AVAIL
WMP3 -. " " .-> AVAIL
WMP4 -. " " .-> AVAIL
POOL --> TASKQ["task queue (cubthread)"]
WMP1 -- "push_task" --> TASKQ
WMP2 -- " " --> TASKQ
WMP3 -- " " --> TASKQ
WMP4 -- " " --> TASKQ
TASKQ --> W1[worker 1]
TASKQ --> W2[worker 2]
TASKQ --> Wn[worker N]
compute_parallel_degree — log2 of page-count over threshold
Section titled “compute_parallel_degree — log2 of page-count over threshold”The DOP is decided by a single function compute_parallel_degree (parallel_type type, UINT64 num_pages, int hint_degree). The four
parallel_type values — HEAP_SCAN, HASH_JOIN, SORT,
SUBQUERY — each get their own page-count threshold sourced from
PRM_ID_PARALLEL_HEAP_SCAN_PAGE_THRESHOLD,
PRM_ID_PARALLEL_HASH_JOIN_PAGE_THRESHOLD, and
PRM_ID_PARALLEL_SORT_PAGE_THRESHOLD. The function returns 0
(disabled) when num_pages < page_threshold. Otherwise:
// compute_parallel_degree — src/query/parallel/px_parallel.cppUINT64 x = num_pages / page_threshold;auto_degree = (63 - __builtin_clzll (x)) + start_degree; // log2(x) + 2return MIN (auto_degree, (UINT32) parallelism);The formula is log2(num_pages / threshold) + 2, capped by the
global parallelism parameter and the machine’s core count. Each
doubling of input above the threshold buys one more worker; on an
8-core machine with default thresholds the DOP saturates near the
core count well before pages run out. The SUBQUERY case always
returns 1, because the subquery executor does main + 1-worker
fan-out per uncorrelated aptr, not N-way fan-out per aptr. Hint
handling: hint_degree == -1 → auto-compute; >= 2 → clamp to
system_core_count and use; 0 or 1 → disable.
worker_manager — per-operator reservation handle
Section titled “worker_manager — per-operator reservation handle”Between the global pool and the operator orchestrator sits a small
RAII handle (parallel_query::worker_manager) carrying
std::atomic<int> m_active_tasks and int m_reserved_workers.
Lifecycle: try_reserve_workers(N) wraps the global CAS
(failure → nullptr, operator falls back to serial); push_task
bumps m_active_tasks and pushes onto the global pool (the task
must call pop_task() on retire); wait_workers busy-yields
(std::this_thread::yield()) until m_active_tasks is 0;
release_workers calls wait_workers first, then returns the
reserved workers to the global pool. The yield-loop is
intentional: workers are typically still doing useful work, so
the parent should not block on a condvar.
Shared task plumbing — callable_task, interrupt, err_messages_with_lock
Section titled “Shared task plumbing — callable_task, interrupt, err_messages_with_lock”callable_task adapts
std::function<void(cubthread::entry &)> into
cubthread::task<entry> plus a retire functor. Most heap-scan
and hash-join tasks subclass cubthread::entry_task directly;
the sort module uses callable_task because its bodies are
convenient std::bind expressions. On retire the task delegates
back to its parent worker_manager via pop_task(), then runs
the user retire functor (typically delete this).
interrupt is a single shared atomic enum that propagates
“why we’re stopping” between threads. The seven states
(NO_INTERRUPT, USER_INTERRUPTED_FROM_MAIN_THREAD,
USER_INTERRUPTED_FROM_WORKER_THREAD,
ERROR_INTERRUPTED_FROM_MAIN_THREAD,
ERROR_INTERRUPTED_FROM_WORKER_THREAD, INST_NUM_SATISFIED,
JOB_ENDED) capture both the direction (so the receiver knows
whether to swap a remote er_message in or just break) and the
reason (user vs error vs limit-satisfied, so the result handler
can return S_END vs S_ERROR). Every long-running worker loop
checks m_interrupt->get_code() once per iteration.
err_messages_with_lock carries a vector of
cuberr::er_message. Workers that hit an error swap their
thread-local error context into the vector under a mutex; the
main thread, on join, picks the first non-ER_INTERRUPTED
message and swaps it back into its own error context. The
companion atomic_instnum handles LIMIT N: any worker that
pushes the N-th tuple fans an INST_NUM_SATISFIED interrupt to
its peers.
Parallel heap scan
Section titled “Parallel heap scan”This replaces the serial heap-scan iterator (scan_open_heap_scan
/ scan_next_scan on a HEAP_SCAN_ID) with a parallel one
(scan_open_parallel_heap_scan / scan_next_parallel_heap_scan
on a PARALLEL_HEAP_SCAN_ID). The scan manager dispatches via
the standard SCAN_TYPE switch — S_PARALLEL_HEAP_SCAN is one
arm alongside S_HEAP_SCAN, S_INDX_SCAN, etc.
Open: decide and reserve
Section titled “Open: decide and reserve”scan_open_parallel_heap_scan is the entry. The optimiser sets
ACCESS_SPEC_FLAG_NUM_PARALLEL_THREADS when parallel is plausible
on a TARGET_CLASS access-spec; the function decides whether to
go parallel now. It starts by setting scan_id->type = S_HEAP_SCAN (optimistic fallback), then closes five gates: (1)
system classes — too small, too read-by-internals; (2)
MVCC-disabled classes (catalog, transient) — the parallel scan
assumes per-worker visibility check is identical to serial;
(3) select-lock-needed (serializable / FOR UPDATE) — parallel
scan does not propagate row locks; (4) private_heap_id == 0
excludes non-main-thread scans (no nested parallel); (5) empty
HFID. After the gates, it calls file_get_num_user_pages and
compute_parallel_degree(parallel_type::HEAP_SCAN, ...) for the
page-count threshold; then worker_manager::try_reserve_workers
for the global reservation. On any failure the function falls
back to serial by leaving scan_id->type at S_HEAP_SCAN. On
success it picks one of the three result types
(MERGEABLE_LIST / BUILDVALUE_OPT / XASL_SNAPSHOT),
placement-new’s manager<RESULT_TYPE>, calls its open(), and
sets scan_id->type = S_PARALLEL_HEAP_SCAN. Note the
MERGEABLE_LIST flag is unset when xasl->topn_items or
XASL_TO_BE_CACHED is set — those features need stable addresses
across the private-heap shuttle, so the worker output must go
through XASL_SNAPSHOT instead.
The three result types capture three caller shapes.
MERGEABLE_LIST is the common case: each worker writes a
QFILE_LIST_ID; main thread merges on read. XASL_SNAPSHOT is
row-by-row — used when the parent wants tuples one at a time
without intermediate list files (e.g., XASL_TO_BE_CACHED or
topn_items). BUILDVALUE_OPT is the aggregation fast path —
each worker builds a partial aggregate and main merges partials
at end. The result type is encoded in PARALLEL_HEAP_SCAN_ID and
template-specialises the manager<RESULT_TYPE> /
task<RESULT_TYPE> classes (whose public surface is open,
start_tasks, next, reset, merge_stats, end, close).
Partitioning: heap-file sectors via ftab_set
Section titled “Partitioning: heap-file sectors via ftab_set”The partitioning unit is the heap-file sector. The heap manager
keeps a per-file FILE_FTAB_COLLECTOR of sectors owning data
pages; input_handler_ftabs::init_on_main calls
file_get_all_data_sectors on the main thread, copies the result
into m_ftab_set (via ftab_set::convert), then split (n_sets)
into N evenly-sized slices. Page-level coordination after that is
wholly per-worker — no shared bitmap, no work-stealing.
Each worker, on initialize(), claims one slice via fetch_add
on m_splited_ftab_set_idx. Per-worker state lives in static
thread_local members on input_handler_ftabs (m_tl_scan_cache,
m_tl_old_page_watcher, m_tl_ftab_set, m_tl_vpid,
m_tl_pgoffset, m_tl_ftab). The page-fix watcher pattern
(alternating m_tl_old_page_watcher and
m_tl_scan_cache->page_watcher) implements page-at-a-time scan
with the buffer pool’s ordered-fix protocol — pages are unfixed
only after the next page is fixed.
input_handler_ftabs::get_next_vpid_with_fix is the worker hot
path: pop the next sector, walk its 64-bit page_bitmap finding
data pages, ordered-fix each with OLD_PAGE_MAYBE_DEALLOCATED
(the race against deallocation between bitmap-build and page-fix
is expected — silently skip).
Tasks: per-worker XASL clone and per-page slot iteration
Section titled “Tasks: per-worker XASL clone and per-page slot iteration”Every worker runs its own parallel_heap_scan::task<RESULT_TYPE>
on the worker pool. execute is three phases: initialize (on
failure, swap the error message into m_err_messages and set
interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD), loop,
finalize.
initialize copies the parent’s connection / transaction context,
clones the XASL tree (via xcache_find_xasl_id_for_execute when
the cache holds a clonable XASL or stx_map_stream_to_xasl when
not), opens a per-worker HEAP_SCAN_ID on the cloned spec, opens
any nested list / index scans needed by deeper XASL levels
(subqueries, joins above), and sets up the input handler’s sector
slice and result handler’s writer state. m_px_orig_thread_entry
is set to the parent pointer — the engine-wide hook for “is this a
parallel worker?”. Code that needs to behave differently in a
worker (perfmon, log retry, lock-wait timeouts) checks this.
loop per outer iteration: check m_interrupt->get_code(), check
logtb_is_interrupted_tran, ask the input handler for the next
VPID, and on success drive the slot iterator through every
qualified slot on that page. For each qualified slot evaluate
if_pred; if the XASL has a scan_ptr (a nested level — list or
index scan participating in a join above), recurse through
qexec_execute_scan_ptr per qualified outer row before calling
result_handler_p->write(...). The result-type specialisation
chooses between m_xasl->outptr_list (mergeable list /
buildvalue) and m_xasl->val_list (xasl snapshot). After each row,
clear_xasl_dptr_list clears per-row dynamic sub-XASL state.
Errors set ERROR_INTERRUPTED_FROM_WORKER_THREAD; user interrupts
set USER_INTERRUPTED_FROM_WORKER_THREAD.
finalize is the inverse: write-finalises the result handler,
finalises the input handler and slot iterator, records join info,
cleans cloned XASL state, and either retires the XASL clone
(xcache_retire_clone) or frees the unpacked tree.
Read: the main thread’s next()
Section titled “Read: the main thread’s next()”While workers fan out, the parent’s
scan_next_parallel_heap_scan keeps calling manager->next().
The first call lazily kicks the workers (start_tasks());
subsequent calls delegate to the result handler’s read().
Mergeable-list reads from m_xasl->list_id and re-clones values
across the private-heap boundary via fetch_val_list;
xasl-snapshot reads directly into m_xasl->val_list;
buildvalue-opt reads into m_xasl->proc.buildvalue.agg_list.
Interrupt fan-in is at the end: if any worker raised an error,
swap its er_message into the main thread’s context and return
S_ERROR.
The MERGEABLE_LIST path is the most intricate: the worker writes
into a private-heap-allocated QFILE_LIST_ID but the reader must
hand values back through the main thread’s private heap. The
shuttle is pr_clone_value calls between two
db_change_private_heap boundaries — clone out of worker heap,
clear worker-side, switch heap, clone into main-side. This is the
reason MERGEABLE_LIST cannot be combined with topn_items or
XASL_TO_BE_CACHED — those features need stable addresses across
the boundary.
sequenceDiagram
participant Main as Main thread<br/>(scan_next_scan)
participant Mgr as parallel_heap_scan::<br/>manager<result_type>
participant InH as input_handler_ftabs<br/>(thread_local sectors)
participant W1 as Worker 1
participant Wn as Worker N
participant ResH as result_handler
Main->>Mgr: scan_open_parallel_heap_scan
Mgr->>InH: init_on_main (split sectors)
Mgr->>Mgr: try_reserve_workers (N)
Main->>Mgr: scan_next_parallel_heap_scan (1st)
Mgr->>Mgr: start_tasks() — push N tasks
par fan-out
Mgr-->>W1: push_task (clone XASL)
Mgr-->>Wn: push_task (clone XASL)
end
W1->>InH: claim slice
Wn->>InH: claim slice
loop per VPID
W1->>InH: get_next_vpid_with_fix
W1->>W1: slot iterator + if_pred + write
Wn->>InH: get_next_vpid_with_fix
Wn->>Wn: slot iterator + if_pred + write
end
W1-->>ResH: writer_result_p / aggregate
Wn-->>ResH: writer_result_p / aggregate
Main->>ResH: read (merge)
Main->>Main: fetch_val_list (private-heap shuttle)
Main-->>Mgr: SCAN_CODE per call
Main->>Mgr: scan_close — release_workers, free
Parallel hash join
Section titled “Parallel hash join”The parallel hash join sits inside qexec_hash_join — in the new
HASHJOIN_STATUS_PARALLEL arm of the status switch. The serial
path is the same as the non-parallel hash join (see
cubrid-hash-join.md): manager init, empty-side check, partition
decision, then single-pass classic build/probe or partitioned
build/probe. The parallel arm replaces the serial partitioned
build/probe with a worker-fanned version when partitioning is
chosen and the per-partition page-count is large enough.
hjoin_try_parallel — gate and reserve
Section titled “hjoin_try_parallel — gate and reserve”hjoin_try_parallel runs after hjoin_try_partition has decided
the join needs partitioning. It computes
min_page_cnt = min(outer_list_id->page_cnt, inner_list_id->page_cnt), calls
compute_parallel_degree(parallel_type::HASH_JOIN, min_page_cnt, manager->num_parallel_threads), returns
HASHJOIN_STATUS_PARTITION (serial fallback) on degree < 2, then
clamps num_parallel_threads down to manager->context_cnt
(no point in more workers than partitions) and calls
worker_manager::try_reserve_workers. On success it stores the
handle on manager->px_worker_manager and returns
HASHJOIN_STATUS_PARALLEL; the dispatcher in qexec_hash_join
then calls
parallel_query::hash_join::execute_partitions(*thread_p, &manager).
build_partitions and execute_partitions
Section titled “build_partitions and execute_partitions”build_partitions is phase 1: call hjoin_init_shared_split_info,
then for both outer and inner relations:
qfile_collect_list_sector_info to enumerate data sectors, push
N split_task instances onto the operator’s task_manager,
task_manager.join() to barrier. Each split_task reads its
assigned chunk of the input list, hashes by the join key, and
writes into per-partition output files (using shared membuf claims
to coordinate access).
execute_partitions is phase 2 + 3: push N join_task instances
(each builds an in-memory hash for one partition’s inner side and
probes the corresponding outer-side partition); join; then merge
per-partition list ids via hjoin_merge_qlist.
split_task and join_task both inherit from base_task : cubthread::entry_task and share a task_manager that wraps
push/wait/join on top of the operator-level worker_manager. The
task_manager’s push_task increments an active-task counter
under a mutex before calling m_worker_manager->push_task;
end_task decrements, calls pop_task, and signals
m_all_tasks_done_cv.notify_all on hitting zero; join waits on
the condvar then m_worker_manager->wait_workers().
The hash-join task_manager differs from the heap-scan pattern in
two ways: it uses a condvar wait instead of a yield-loop because
the parent has nothing useful to do during the wait (unlike the
heap-scan parent, which can be reading results in parallel with
worker writes); and it has an explicit handle_error that swaps a
worker’s er_message into the main thread’s cuberr::context
directly — bypassing the err_messages_with_lock indirection —
because the hash join only surfaces one error and wants it on the
main thread immediately.
task_execution_guard — RAII for thread-context fan-out
Section titled “task_execution_guard — RAII for thread-context fan-out”Every worker starts by impersonating the parent thread. The
task_execution_guard constructor sets
m_thread_ref.m_px_orig_thread_entry, conn_entry, tran_index,
and on_trace from the main thread’s entry, then calls
push_resource_tracks(). The destructor clears conn_entry and
on_trace and calls pop_resource_tracks(). Without this every
worker would either run with no transaction context (tripping
every assertion in the access-method layer) or with a stale
context from a previous task.
spawn_manager — per-thread XASL substructure cloning
Section titled “spawn_manager — per-thread XASL substructure cloning”Hash join uses a thread-local spawner that clones join-time
substructures (val_descr, during-join predicate, outer/inner
regu lists) on demand per worker. Each get_* call (get_val_descr,
get_during_join_pred, get_outer_regu_list_pred,
get_inner_regu_list_pred) lazily allocates and caches the cloned
substructure on the worker’s db_private_alloc heap, then returns
the cached pointer on subsequent calls. This is the hash-join
analogue of the heap-scan task’s clone_xasl(), but at finer
granularity: only the substructures each worker actually needs.
flowchart TB
QHJ["qexec_hash_join (status switch)"]
HTP["hjoin_try_parallel:<br/>compute_parallel_degree(HASH_JOIN)<br/>· try_reserve_workers"]
QHJ -->|HASHJOIN_STATUS_TRY| HTP
HTP -->|< 2 workers| Single[serial: hjoin_execute]
HTP -->|>= 2 workers| EP[parallel_query::hash_join::execute_partitions]
QHJ -->|HASHJOIN_STATUS_PARTITION| BP1[hjoin_execute_partitions serial]
EP --> Phase1[build_partitions]
Phase1 --> SplitO["N x split_task (outer)"]
SplitO --> JoinO[task_manager::join]
JoinO --> SplitI["N x split_task (inner)"]
SplitI --> JoinI[task_manager::join]
JoinI --> Phase2[execute_partitions phase 2]
Phase2 --> JT["N x join_task (build+probe per partition)"]
JT --> JoinJ[task_manager::join]
JoinJ --> MergeR["hjoin_merge_qlist per partition"]
MergeR --> Done[xasl->list_id]
Parallel query execute
Section titled “Parallel query execute”The third orchestrator runs whole XASL sub-trees in parallel. Its
canonical use is uncorrelated subqueries in the aptr_list of
certain proc types — sub-queries that the executor would otherwise
run sequentially as a prelude to the main block (UNION_PROC is a
classic example, where each branch is independent).
make_parallel_query_executor_recursively — wiring on the main thread
Section titled “make_parallel_query_executor_recursively — wiring on the main thread”The C-callable entry wires up the parallel structure before
qexec_execute_mainblock is called. It early-outs on
!xcache_uses_clones() (parallel query execute requires clonable
XASL), sets thread_p->m_px_orig_thread_entry = thread_p,
optionally initialises perf-monitor parallel stats, then walks the
XASL tree twice via cubxasl::iterate_xasl_tree. The first walk
counts uncorrelated aptr nodes per parallel-eligible XASL type;
the second walk attaches a query_executor at every
parallel-eligible XASL node with at least two non-link aptrs — the
first such node gets a root executor (m_is_root_executor == true, owns the queue and worker pool); every subsequent node
gets a child executor that shares the root’s queue and worker
pool.
The parallel-eligible XASL types are those whose aptr_list
typically contains independent subqueries: BUILDLIST_PROC /
BUILDVALUE_PROC (a SELECT producing a list or scalar),
UNION_PROC / INTERSECTION_PROC / DIFFERENCE_PROC (set ops
over independent branches), HASHJOIN_PROC (whose sub-build
sides may themselves be independent), and MERGELIST_PROC (the
parallel union-all of list files used by partition-pruning
queries). The entire query tree therefore shares one job queue
and one set of workers, and parallelism is amortised across
nested parallel-eligible operators.
query_executor::run_jobs — single shared queue, work-loop on main
Section titled “query_executor::run_jobs — single shared queue, work-loop on main”run_jobs ensures exactly one parallel-task is on the global pool
(lazy-spawning a single parallel_query_execute::task on first
call), has the main thread run the pre-popped first job (stashed
by add_job as m_job to avoid an empty queue on small
fan-outs), and work-steals from the queue until
m_join_context.get_running_jobs() == 0. Joining is a condvar
wait on join_context::join_jobs. On root, after join, the
function pushes a sentinel (push_last()), releases workers,
performs interrupt fan-in (same pattern as heap scan), and merges
perf stats.
The design choice that matters: the main thread also
participates as a worker. After scheduling one task on the
worker pool, main starts popping jobs from the queue and running
them. Worker(s) and main race on try_pop; whichever sees a job
runs it. This is work-stealing at the granularity of XASL
sub-trees — and it has the nice side-effect that even when the
worker pool is starved (no workers available, worker_manager
returned 0), main still runs all the jobs serially through the
same code path.
The join_context is a tiny condvar-based barrier:
add_running_jobs / sub_running_jobs increment / decrement
under a mutex (the latter notifies on hitting zero); join_jobs
waits until zero. The actual job runs in execute_job_internal,
which clones the XASL state, swaps the worker thread’s context
(conn_entry, tran_index, on_trace, m_px_orig_thread_entry)
to point at the parent, runs qexec_execute_mainblock, copies
the sub-query’s list_id out, and restores.
Job queue: lock-free MPMC ring (thread_safe_queue)
Section titled “Job queue: lock-free MPMC ring (thread_safe_queue)”The job queue is a textbook bounded MPMC ring with sequence
numbers per slot. Each slot carries data, std::atomic<uint64_t> sequence, and std::atomic<bool> ready. Fast paths
(try_push_fast / try_pop_fast) execute the canonical Vyukov
sequence: load position, CAS the slot’s sequence (pos →
pos + m_capacity), write data, release-store ready = true,
fetch_add the position. Slow paths take a mutex and wait on a
condvar for space / work, with interrupt checks per iteration.
Two refinements specific to parallel-query: push_completed is a
sticky shutdown flag, and reset_queue is the wraparound handler
that runs when the absolute position counter approaches
UINT64_MAX — paranoid guard, practically never fires. Deeper
backstory in cubrid-thread-worker-pool.md.
sequenceDiagram
participant Main as Main thread
participant QE as query_executor
participant Q as thread_safe_queue<job>
participant Pool as parallel-query pool
participant W as Worker
Main->>QE: make_parallel_query_executor_recursively
QE->>QE: walk XASL, attach px_executor
Main->>QE: add_job (job_1) — stash in m_job
Main->>QE: add_job (job_2..n)
QE->>Q: push job_2..n
Main->>QE: run_jobs()
QE->>Pool: push_task (query_task)
Pool->>W: dispatch
Main->>QE: execute_job_internal (m_job) /* main as worker */
par main races worker on Q
Main->>Q: try_pop -> job_k
Main->>QE: execute_job_internal (job_k)
W->>Q: pop -> job_m
W->>QE: execute_job_internal (job_m)
end
QE->>QE: join_context.join_jobs()
Main->>Pool: release_workers()
Main-->>QE: aggregated stats + interrupt fan-in
Parallel sort
Section titled “Parallel sort”The parallel sort is the oldest of the four — its DSL of macros in
px_sort.h predates the cubthread::entry_task / worker_manager
plumbing and integrates by pushing parallel_query::callable_task
instances directly. The pattern is slice-and-merge: split the
input temp file into N sub-files, sort each in a worker, then
merge. The two driver macros are
SORT_EXECUTE_PARALLEL(num, px_sort_param, function) (allocates N
callable_task instances bound to function with each
&px_sort_param[i] and pushes onto
sort_param->px_worker_manager) and
SORT_WAIT_PARALLEL(parallel_num, sort_param, px_sort_param)
(polls every px_sort_param[i].px_status under
sort_param->px_mtx until all are not PX_PROGRESS, on a condvar,
then wait_workers()).
The DOP gate is sort_check_parallelism, which uses the shared
compute_parallel_degree(parallel_type::SORT, ...). There are two
parallel-sort callers: SORT_ORDER_BY (the executor’s ORDER BY
sort over a temp file, with the per-query parallelism hint) and
SORT_INDEX_LEAF (the parallel index-leaf builder for B+Tree
construction, with no per-query hint). On a successful gate the
function calls try_reserve_workers and returns the actual
reserved count; on any failure it returns 1 so the caller falls
back to single-threaded sort.
The status enum (PX_PROGRESS, PX_DONE, PX_ERR_FAILED) is the
parallel-sort-specific equivalent of interrupt_code. The unique
part of parallel sort is that the merge phase is itself
parallel: sort_merge_nruns_parallel runs log2(N)-tournament-style
merge tasks on the worker pool until a single sorted output
remains.
Source Walkthrough
Section titled “Source Walkthrough”Stable symbols, grouped by subsystem. The position-hint table at
the end maps the most-cited symbols to (file, line) as of
updated:.
Foundation — src/query/parallel/
parallel_query::compute_parallel_degree— DOP heuristic.parallel_query::parallel_type—HEAP_SCAN,HASH_JOIN,SORT,SUBQUERY.parallel_query::worker_manager— per-operator handle.try_reserve_workers,release_workers,wait_workers,push_task,pop_task,get_reserved_workers.parallel_query::worker_manager_global— Meyers singleton. Ownsm_worker_pool(the namedparallel-querypool) andstd::atomic<int> m_available.init,destroy, plus internaltry_reserve_workers,release_workers,push_task.parallel_query::interrupt— seven-state atomic enum.parallel_query::atomic_instnum—is_instnum_satisfies_after_1tuple_insert.parallel_query::err_messages_with_lock—move_top_error_message_to_this.parallel_query::callable_task—cubthread::task<entry>adapter forstd::function/std::bindbodies (used by the sort macros).parallel_query::ftab_set—convert,split (n_sets),get_next,append,move_from,clear.parallel_query::thread_safe_queue<T>— Vyukov MPMC ring.push,pop,try_push,try_pop,push_last,is_empty,is_full,size,capacity,reset_queue.
Heap scan — src/query/parallel/px_heap_scan/
parallel_heap_scan::manager<RESULT_TYPE>— top-level per-scan orchestrator.open,start_tasks,next,reset,merge_stats,end,close. Templated onRESULT_TYPE::{MERGEABLE_LIST, XASL_SNAPSHOT, BUILDVALUE_OPT}.parallel_heap_scan::task<RESULT_TYPE>—cubthread::entry_taskbody.execute,retire,initialize,finalize,clone_xasl,loop.parallel_heap_scan::input_handler_ftabs— sector-slice dispatcher.init_on_main,initialize,finalize,get_next_vpid_with_fix. Thread-local page watchers.parallel_heap_scan::result_handler<RESULT_TYPE>—read_initialize,read,read_finalize,write_initialize,write,write_finalize.parallel_heap_scan::slot_iterator—initialize,finalize,set_page,next_qualified_slot_with_peek.parallel_heap_scan::join_info—capture_join_info,record_join_info,apply_join_info,get_scan_info.parallel_heap_scan::trace_handler,accumulative_trace_storage— perf counter aggregation.- C entry points:
scan_open_parallel_heap_scan,scan_start_parallel_heap_scan,scan_next_parallel_heap_scan,scan_reset_scan_block_parallel_heap_scan,scan_end_parallel_heap_scan,scan_close_parallel_heap_scan. Checker:scan_check_parallel_heap_scan_possible.
Hash join — src/query/parallel/px_hash_join/
parallel_query::hash_join::build_partitions— phase 1 (split outer, then split inner).parallel_query::hash_join::execute_partitions— phase 2 + 3 (per-partition build/probe, then merge).parallel_query::hash_join::task_manager—push_task,end_task,join,handle_error,notify_stop,check_interrupt,clear_interrupt.parallel_query::hash_join::task_execution_guard— RAII context fan-out.parallel_query::hash_join::base_task,split_task,join_task— task hierarchy.parallel_query::hash_join::spawn_manager— TLS substructure cloner.get_val_descr,get_during_join_pred,get_outer_regu_list_pred,get_inner_regu_list_pred,get_instance,destroy_instance.- C-side in
query_hash_join.c:hjoin_try_parallel(gate),qexec_hash_join’sHASHJOIN_STATUS_PARALLELarm,hjoin_init_shared_split_info,hjoin_clear_shared_split_info,hjoin_trace_*,hjoin_merge_qlist.
Query execute — src/query/parallel/px_query_execute/
parallel_query_execute::query_executor— top-level (root + child constructors).add_job,run_jobs,get_parallelism,get_stats.parallel_query_execute::task—cubthread::entry_taskbody.execute,retire,init,get_job,execute_job,end.parallel_query_execute::execute_job_internal— clones XASL state, swaps thread context, callsqexec_execute_mainblock, copies list_id out, restores.parallel_query_execute::job—(xasl_node*, xasl_state*, join_context*, trace_context*)queue payload.parallel_query_execute::join_context— running-job barrier (add_running_jobs,sub_running_jobs,get_running_jobs,join_jobs).parallel_query_execute::trace_context— per-job perf-stat vector.make_parallel_query_executor_recursively— wiresxasl_p->px_executorfor every parallel-eligible XASL node.- Checker:
check_parallel_subquery_possible.
Sort — src/query/parallel/px_sort.{h,c} and
src/storage/external_sort.c
- Macros:
SORT_IS_PARALLEL,SORT_EXECUTE_PARALLEL,SORT_WAIT_PARALLEL. - Status enum:
PX_ERR_FAILED,PX_DONE,PX_PROGRESS. Type enum:PARALLEL_TYPE::{PX_SINGLE, PX_MAIN_IN_PARALLEL, PX_THREAD_IN_PARALLEL}. - Functions:
sort_check_parallelism,sort_start_parallelism,sort_end_parallelism,sort_listfile_execute,sort_copy_sort_param,sort_copy_sort_info,sort_split_input_temp_file,sort_merge_run_for_parallel,sort_merge_nruns,sort_put_result_for_parallel,sort_merge_nruns_parallel,sort_split_last_run,sort_put_result_from_tmpfile.
Cross-module integration
src/query/scan_manager.c— every public scan entry has anS_PARALLEL_HEAP_SCANswitch arm that forwards to the C wrappers inpx_heap_scan.cpp(scan_start_scan,scan_reset_scan_block,scan_end_scan,scan_close_scan,scan_next_scan_local).src/query/query_hash_join.c—qexec_hash_joinadds aHASHJOIN_STATUS_PARALLELarm callingparallel_query::hash_join::execute_partitions. The status is decided byhjoin_try_parallel.src/storage/external_sort.c—sort_listfile_internalcallssort_check_parallelismand dispatches via the macros on ≥ 2.src/thread/thread_worker_pool*.{cpp,hpp}— theworker_pooltemplate the globalparallel-querypool instantiates.THREAD_ENTRYfields used:m_px_orig_thread_entry(parent pointer),m_uses_px_stats,m_px_stats(per-thread perf buffer),m_px_stats_mutex,m_px_lock_mutex(locks the cloned XASL during heap-scan task initialise/finalise).
Position hints (as of updated:)
Section titled “Position hints (as of updated:)”| symbol | file | line |
|---|---|---|
compute_parallel_degree | src/query/parallel/px_parallel.cpp | 36 |
worker_manager::try_reserve_workers | src/query/parallel/px_worker_manager.cpp | 49 |
worker_manager::wait_workers | src/query/parallel/px_worker_manager.cpp | 97 |
worker_manager_global::init | src/query/parallel/px_worker_manager_global.cpp | 53 |
worker_manager_global::try_reserve_workers | src/query/parallel/px_worker_manager_global.cpp | 97 |
REGISTER_WORKERPOOL(parallel_query) | src/query/parallel/px_worker_manager_global.cpp | 48 |
interrupt::interrupt_code | src/query/parallel/px_interrupt.hpp | 31 |
err_messages_with_lock::move_top_error_message_to_this | src/query/parallel/px_interrupt.hpp | 100 |
ftab_set::split | src/query/parallel/px_ftab_set.hpp | 106 |
thread_safe_queue<T>::try_push_fast | src/query/parallel/px_thread_safe_queue.cpp | 172 |
thread_safe_queue<T>::try_pop_fast | src/query/parallel/px_thread_safe_queue.cpp | 221 |
scan_open_parallel_heap_scan | px_heap_scan/px_heap_scan.cpp | 349 |
scan_next_parallel_heap_scan | px_heap_scan/px_heap_scan.cpp | 44 |
scan_close_parallel_heap_scan | px_heap_scan/px_heap_scan.cpp | 242 |
parallel_heap_scan::manager<>::open | px_heap_scan/px_heap_scan.cpp | 632 |
parallel_heap_scan::manager<>::start_tasks | px_heap_scan/px_heap_scan.cpp | 795 |
parallel_heap_scan::manager<>::next | px_heap_scan/px_heap_scan.cpp | 817 |
parallel_heap_scan::task<>::execute | px_heap_scan/px_heap_scan_task.cpp | 44 |
parallel_heap_scan::task<>::initialize | px_heap_scan/px_heap_scan_task.cpp | 71 |
parallel_heap_scan::task<>::clone_xasl | px_heap_scan/px_heap_scan_task.cpp | 439 |
parallel_heap_scan::task<>::loop | px_heap_scan/px_heap_scan_task.cpp | 510 |
input_handler_ftabs::init_on_main | px_heap_scan/px_heap_scan_input_handler_ftabs.cpp | 60 |
input_handler_ftabs::get_next_vpid_with_fix | px_heap_scan/px_heap_scan_input_handler_ftabs.cpp | 87 |
hash_join::build_partitions | px_hash_join/px_hash_join.cpp | 43 |
hash_join::execute_partitions | px_hash_join/px_hash_join.cpp | 157 |
hash_join::task_manager::push_task | px_hash_join/px_hash_join_task_manager.cpp | 58 |
hash_join::task_manager::join | px_hash_join/px_hash_join_task_manager.cpp | 81 |
hash_join::task_execution_guard | px_hash_join/px_hash_join_task_manager.hpp | 105 |
hash_join::spawn_manager::spawn | px_hash_join/px_hash_join_spawn_manager.hpp | 85 |
make_parallel_query_executor_recursively | px_query_execute/px_query_executor.cpp | 279 |
query_executor::run_jobs | px_query_execute/px_query_executor.cpp | 115 |
execute_job_internal | px_query_execute/px_query_task.cpp | 91 |
join_context::join_jobs | px_query_execute/px_query_job.hpp | 70 |
hjoin_try_parallel | src/query/query_hash_join.c | 1965 |
qexec_hash_join HASHJOIN_STATUS_PARALLEL arm | src/query/query_hash_join.c | 230 |
sort_check_parallelism | src/storage/external_sort.c | 4936 |
SORT_EXECUTE_PARALLEL / SORT_WAIT_PARALLEL | src/query/parallel/px_sort.h | 41, 52 |
Cross-check Notes
Section titled “Cross-check Notes”parallel_heap_scan::manageris templated onRESULT_TYPE, but the C API is non-templated and the runtime dispatch uses aresult_typefield onPARALLEL_HEAP_SCAN_ID. Adding a fourth result type requires five touches: the enum, themanagerinstantiation, thetaskinstantiation, the runtime switches inscan_next_parallel_heap_scan/scan_reset_scan_block_parallel_heap_scan/scan_close_parallel_heap_scan, and the result-type selection inscan_open_parallel_heap_scan.- The DOP heuristic
auto_degree = log2(num_pages / page_threshold) + 2is documented nowhere except the comment incompute_parallel_degree; it is not a Postgres-style linear ramp. For very large tables the DOP saturates atparallelismlong beforenum_pagesdoes — conservative for huge scans, aggressive for medium scans. - The five “no parallel” gates inside
scan_open_parallel_heap_scan(system class, MVCC-disabled, select-lock-needed,private_heap_id == 0, partitioned-class) are checked once at open. The comment “DB_PARTITION_CLASS will be parallel-heap-scanned, not DB_PARTITIONED_CLASS” implies the right unit is the leaf partition, not the parent. make_parallel_query_executor_recursivelyearly-outs on!xcache_uses_clones()— parallel query execute requires clonable XASL. Heap scan also benefits but itsclone_xaslhas both branches (cache and non-cache viastx_map_stream_to_xasl), so it can function without the cache; the query executor cannot.- The hash-join
task_manageruses a condvar wait but the heap-scanworker_manageruses a yield-loop wait. The heap-scan main thread is not idle during fan-out — it is reading results in parallel with worker writes — so it cannot block. The hash-join phase 1 main thread is idle and benefits from a real condvar. The query-executor main thread participates as a worker and waits onjoin_context::join_jobsonly after exhausting the queue. - The interrupt enum has seven states (not binary on/off) because the parallel layer needs both direction and reason.
THREAD_ENTRY::m_px_lock_mutexis unusual — a per-thread mutex whose only callers are other threads (the main thread’sm_px_lock_mutexis locked by workers inclone_xasl()andfinalize()). It serialises XASL-clone allocation / retirement against the main thread’s xasl-cache operations. It is a parent-side lock for child-thread access, not a per-thread self-lock.m_px_orig_thread_entryis the engine-wide hook for “is this a parallel worker?”. Code that needs to behave differently in a worker (perfmon, log-acquisition retry, lock-wait timeouts) checks this and uses the parent’s context. New cross-cutting features (e.g., a per-query memory limit) should plug into this same hook rather than threading a new field through every worker boundary.
Open Questions
Section titled “Open Questions”- Adaptive DOP.
compute_parallel_degreeruns once at scan open / hash-join admission / sort start; the DOP cannot be adjusted afterwards. If contention or page-buffer pressure spikes mid-query, the engine cannot scale back. Postgres has the same limitation; Oracle’s “in-memory parallel execution” has dynamic re-sharding. - Parallel index scan. The catalogue lists
S_INDX_SCANwith no parallel sibling. Adding anS_PARALLEL_INDX_SCANwould require partitioning the B+Tree leaf level and a per-workerBTREE_SCANclone with shared cursor state. - Parallel aggregation above heap scan.
BUILDVALUE_OPTpartials are merged at the main thread’sread()— parallel but not at the operator level. For high-cardinalityGROUP BYthe merge becomes the bottleneck. A parallel merge (tournament tree of partial aggregates, or hash-partitioned merge) is a known literature win but not implemented. - Nested parallelism. Each orchestrator reserves from the
same pool; if a hash join calls into a heap scan that also
wants to go parallel, the inner call sees a depleted pool and
silently falls back to serial. The
private_heap_id == 0gate is the explicit guard. Whether to allow controlled nesting (e.g., reserve fewer workers per level) is an open choice. - Memory accounting. Each worker uses
db_private_allocfor scratch — XASL clones, clonedval_descr, partial aggregates — but the budget is per-thread, not per-query. A query with high parallelism multiplies its memory footprint by N. There is no per-query memory limit at the parallel level today.
Sources
Section titled “Sources”- Source files (under
/data/hgryoo/references/cubrid/):src/query/parallel/px_parallel.{hpp,cpp},px_worker_manager{,_global}.{hpp,cpp},px_callable_task.{hpp,cpp},px_thread_safe_queue.{hpp,cpp},px_interrupt.hpp,px_ftab_set.hpp,px_sort.{h,c},px_heap_scan/*.{hpp,cpp},px_hash_join/*.{hpp,cpp},px_query_execute/*.{hpp,cpp},src/thread/thread_worker_pool*.{hpp,cpp},src/query/scan_manager.c,src/query/query_hash_join.c,src/storage/external_sort.c. - Related curated docs:
cubrid-scan-manager.md,cubrid-hash-join.md,cubrid-thread-worker-pool.md,cubrid-external-sort.md. - Theoretical references: Graefe, Encapsulation of Parallelism in the Volcano Query Processing System (SIGMOD 1990); Graefe, Query Evaluation Techniques for Large Databases (CSUR 1993) §3; Leis et al., Morsel-Driven Parallelism (SIGMOD 2014); Anatomy of a Database System (Red Book Ch. 4) for shared build; Garcia-Molina/Ullman/Widom, Database Systems: The Complete Book §15.4 (two-phase parallel sort); Vyukov, Bounded MPMC queue.