Skip to content

CUBRID Parallel Query — Intra-Query Parallelism Across Heap Scan, Hash Join, and Query Execute

Contents:

A relational executor turns a query plan into a stream of tuples. When the plan or its inputs are large, doing that on one CPU leaves cores idle and stretches latency. Intra-query parallelism runs several copies of one of the plan’s operators (or its leaves) concurrently and recombines the results so the consumer above sees the same output a serial plan would have produced.

The founding statement is Graefe’s Encapsulation of Parallelism in the Volcano Query Processing System (SIGMOD 1990). Graefe observed that the serial iterator (open/next/close) is a sufficient contract for parallelism if you add one new operator: the exchange. Exchange owns the queue between producer threads and consumer threads, the partitioning policy that maps inputs to producers, and the flow control. Four flavours fall out — Gather (N→1), Scatter (1→N hash- or range-partitioned), Repartition (N→N with a new function), Replicate (1→N broadcast). Most engines pick a subset.

Three concepts inside that framework matter. Degree of parallelism (DOP) is the integer N bounded by hardware (cores), software (a global thread budget), and data (no point starting eight workers to scan four pages). Partitioning is block-range (N contiguous regions of a heap), hash (by join key into N buckets), or round-robin; the choice constrains what the exchange above can do — hash is partition-aware so a join above can run per partition, block-range is not. Scheduling is long-lived workers (Postgres BackgroundWorker, Oracle PX servers) versus short-lived tasks recycled across operators (DuckDB, CUBRID).

Two refinements close the loop. The shared-build pattern (Anatomy of a Database System, Red Book Ch. 4) for parallel hash join: build the inner once into a shared structure, let N probe workers read concurrently. CUBRID realises a weaker form — the shared structure is a partition pool, not a shared hash table. The two-phase parallel sort (Database Systems: The Complete Book §15.4): N workers each sort a slice, then merge. CUBRID’s parallel sort is exactly slice-and-merge.

Where the layer sits fixes its responsibilities. Above: qexec_execute_mainblock, scan_next_scan, qexec_hash_join, sort_listfile_internal ask for tuples in the same domain (DB_VALUE, XASL_NODE, QFILE_LIST_ID) the serial path produces, so the consumer cannot tell the difference. Below: access methods expose page-at-a-time interfaces; the parallel layer must coordinate fixings, latches, and pinning across N threads without violating the serial latch protocol. The thread pool, error context, perf monitor, and interrupt machinery all have to fan out and fan back in on completion. The interesting part of CUBRID’s design is exactly that fan-out / fan-in plumbing.

Every modern DBMS that scales beyond one core picks an implementation strategy. The variants are consistent at the abstract level (Graefe exchange everywhere) but differ in threading model, scheduling, and coverage.

PostgreSQL uses parallel-aware nodes plus a BackgroundWorker pool. The planner emits Gather / Gather Merge at the top; below it, each parallel-eligible operator is its own Parallel* node (ParallelSeqScan, ParallelIndexScan, ParallelBitmapHeapScan, ParallelHashJoin, ParallelAppend, ParallelAggregate). The leader forks N short-lived processes per gather; coordination is via shared memory. DOP is set by max_parallel_workers_per_gather, clipped by min_parallel_table_scan_size. Threading is process-per-worker.

Oracle uses parallel execution slaves drawn from a shared pool of long-lived PX servers. The plan carries Px Block Iterator / Px Partition Iterator plus Px Send/Px Receive pairs. DOP is bounded by parallel_max_servers and tuned per query by Auto DOP since 11gR2. Coverage is the broadest of any mainstream engine — scan, join, sort, group-by, top-N, parallel DML, parallel index build. Shared-build is realised by broadcast distribution or hash-hash distribution.

SQL Server uses parallel-aware operators on top of the SQLOS user-mode scheduler. DOP is bounded by max degree of parallelism and MAXDOP. SQLOS keeps context-switch cost low and lets the engine over-commit threads.

MySQL/InnoDB historically had no intra-query parallelism. From 8.0.14 InnoDB ships parallel read — multi-threaded clustered- index scan used for CHECK TABLE, SELECT COUNT(*), some DDL paths. The SQL layer above remains single-threaded; parallel join, sort, and aggregation are not in mainline as of 8.4.

DuckDB embraces the morsel-driven model (Leis et al., SIGMOD 2014). The scheduler partitions sources into morsels, assigns them to threads from a fixed-size pool, and operators chain via push-based emission of data chunks.

CUBRID picks a fourth point: per-operator parallel orchestrators on top of a shared, fixed-size worker pool. No global exchange operator, no parallel-aware planner output. Three operators — heap scan, hash join, top-level query execute — each ship their own manager class that decides at open / execute time whether to go parallel, partitions its work, reserves N workers from the global pool, pushes N tasks, and joins. The pool — one cubthread::worker_pool named “parallel-query” — is shared. Coverage today: parallel sequential heap scan, parallel hash join (Grace-style two-phase build/probe), parallel uncorrelated subqueries (BUILDLIST_PROC / BUILDVALUE_PROC / UNION_PROC / HASHJOIN_PROC / MERGELIST_PROC), parallel slice-and-merge sort. Trade-off: less coverage than Postgres or Oracle (no parallel index scan, no parallel aggregate above heap scan, no parallel append) for a much thinner coordination layer — three orchestrators, one pool, one DOP function, one S_PARALLEL_HEAP_SCAN arm.

The parallel_query namespace and the global worker pool

Section titled “The parallel_query namespace and the global worker pool”

Everything lives under three sibling namespaces (parallel_query, parallel_query_execute, parallel_heap_scan). The anchor is a single class — worker_manager_global — a Meyers-singleton that owns the OS-thread pool every parallel feature shares. It carries cubthread::worker_pool_type *m_worker_pool, std::once_flag m_init_flag, std::atomic<int> m_available, and an int m_capacity; the public API is init / destroy, and the internal API is try_reserve_workers / release_workers / push_task.

Initialisation hooks into the project-wide worker-pool registry through REGISTER_WORKERPOOL(parallel_query, ...) — every named pool in cubthread is created at server start by walking that registry, and the parallel-query pool is one entry sized at PRM_ID_MAX_PARALLEL_WORKERS. Inside init(), a std::call_once guard reads the parameter and either disables the feature (max_parallel_workers < 2) or creates the pool via thread_create_worker_pool(max_parallel_workers, 1, "parallel-query", thread_get_entry_manager()). The pool is constant-sized (no autoscaling), single-purpose (task_max_count = 1 per worker), and named so CPU dumps attribute to it.

The interesting field is m_available: a single atomic counter of workers not currently reserved. Every operator reserves before pushing tasks (decrementing by N); on release it increments back. The reservation is the engine-wide admission control:

// worker_manager_global::try_reserve_workers — src/query/parallel/px_worker_manager_global.cpp
int worker_manager_global::try_reserve_workers (const int num_workers)
{
int requested = MIN (num_workers, PRM_MAX_PARALLELISM);
const int min_degree = (requested == 1) ? 1 : 2;
int available = m_available.load ();
while (true) {
if (available < min_degree) return 0;
int reserved = (requested <= available) ? requested : available;
if (m_available.compare_exchange_weak (available, available - reserved))
return reserved;
std::this_thread::yield (); // CAS lost, retry
}
}

Two design choices stand out. First, the CAS grants partial reservations: if asked for 8 and only 5 are available, it returns 5; the operator must decide whether 5 is enough or fall back. Second, the minimum degree is 2 for “parallel execution” (heap scan, hash join, sort) — 1 worker is treated as 0, because main + 1 worker = serial-like plus overhead — but 1 for “parallel subquery” (uncorrelated aptr on a worker while main keeps going).

flowchart TB
    subgraph "process-wide"
        REG["REGISTER_WORKERPOOL(parallel_query, ...)"]
        POOL["cubthread::worker_pool 'parallel-query'<br/>capacity = PRM_ID_MAX_PARALLEL_WORKERS"]
        AVAIL["std::atomic&lt;int&gt; m_available"]
        REG --> POOL --> AVAIL
    end
    subgraph "per-operator"
        WMP1["worker_manager (heap scan)"]
        WMP2["worker_manager (hash join)"]
        WMP3["worker_manager (query exec)"]
        WMP4["worker_manager (sort)"]
    end
    AVAIL -. "try_reserve_workers(N)" .-> WMP1
    AVAIL -. " " .-> WMP2
    AVAIL -. " " .-> WMP3
    AVAIL -. " " .-> WMP4
    WMP1 -. "release_workers" .-> AVAIL
    WMP2 -. " " .-> AVAIL
    WMP3 -. " " .-> AVAIL
    WMP4 -. " " .-> AVAIL
    POOL --> TASKQ["task queue (cubthread)"]
    WMP1 -- "push_task" --> TASKQ
    WMP2 -- " " --> TASKQ
    WMP3 -- " " --> TASKQ
    WMP4 -- " " --> TASKQ
    TASKQ --> W1[worker 1]
    TASKQ --> W2[worker 2]
    TASKQ --> Wn[worker N]

compute_parallel_degree — log2 of page-count over threshold

Section titled “compute_parallel_degree — log2 of page-count over threshold”

The DOP is decided by a single function compute_parallel_degree (parallel_type type, UINT64 num_pages, int hint_degree). The four parallel_type values — HEAP_SCAN, HASH_JOIN, SORT, SUBQUERY — each get their own page-count threshold sourced from PRM_ID_PARALLEL_HEAP_SCAN_PAGE_THRESHOLD, PRM_ID_PARALLEL_HASH_JOIN_PAGE_THRESHOLD, and PRM_ID_PARALLEL_SORT_PAGE_THRESHOLD. The function returns 0 (disabled) when num_pages < page_threshold. Otherwise:

// compute_parallel_degree — src/query/parallel/px_parallel.cpp
UINT64 x = num_pages / page_threshold;
auto_degree = (63 - __builtin_clzll (x)) + start_degree; // log2(x) + 2
return MIN (auto_degree, (UINT32) parallelism);

The formula is log2(num_pages / threshold) + 2, capped by the global parallelism parameter and the machine’s core count. Each doubling of input above the threshold buys one more worker; on an 8-core machine with default thresholds the DOP saturates near the core count well before pages run out. The SUBQUERY case always returns 1, because the subquery executor does main + 1-worker fan-out per uncorrelated aptr, not N-way fan-out per aptr. Hint handling: hint_degree == -1 → auto-compute; >= 2 → clamp to system_core_count and use; 0 or 1 → disable.

worker_manager — per-operator reservation handle

Section titled “worker_manager — per-operator reservation handle”

Between the global pool and the operator orchestrator sits a small RAII handle (parallel_query::worker_manager) carrying std::atomic<int> m_active_tasks and int m_reserved_workers. Lifecycle: try_reserve_workers(N) wraps the global CAS (failure → nullptr, operator falls back to serial); push_task bumps m_active_tasks and pushes onto the global pool (the task must call pop_task() on retire); wait_workers busy-yields (std::this_thread::yield()) until m_active_tasks is 0; release_workers calls wait_workers first, then returns the reserved workers to the global pool. The yield-loop is intentional: workers are typically still doing useful work, so the parent should not block on a condvar.

Shared task plumbing — callable_task, interrupt, err_messages_with_lock

Section titled “Shared task plumbing — callable_task, interrupt, err_messages_with_lock”

callable_task adapts std::function<void(cubthread::entry &)> into cubthread::task<entry> plus a retire functor. Most heap-scan and hash-join tasks subclass cubthread::entry_task directly; the sort module uses callable_task because its bodies are convenient std::bind expressions. On retire the task delegates back to its parent worker_manager via pop_task(), then runs the user retire functor (typically delete this).

interrupt is a single shared atomic enum that propagates “why we’re stopping” between threads. The seven states (NO_INTERRUPT, USER_INTERRUPTED_FROM_MAIN_THREAD, USER_INTERRUPTED_FROM_WORKER_THREAD, ERROR_INTERRUPTED_FROM_MAIN_THREAD, ERROR_INTERRUPTED_FROM_WORKER_THREAD, INST_NUM_SATISFIED, JOB_ENDED) capture both the direction (so the receiver knows whether to swap a remote er_message in or just break) and the reason (user vs error vs limit-satisfied, so the result handler can return S_END vs S_ERROR). Every long-running worker loop checks m_interrupt->get_code() once per iteration.

err_messages_with_lock carries a vector of cuberr::er_message. Workers that hit an error swap their thread-local error context into the vector under a mutex; the main thread, on join, picks the first non-ER_INTERRUPTED message and swaps it back into its own error context. The companion atomic_instnum handles LIMIT N: any worker that pushes the N-th tuple fans an INST_NUM_SATISFIED interrupt to its peers.

This replaces the serial heap-scan iterator (scan_open_heap_scan / scan_next_scan on a HEAP_SCAN_ID) with a parallel one (scan_open_parallel_heap_scan / scan_next_parallel_heap_scan on a PARALLEL_HEAP_SCAN_ID). The scan manager dispatches via the standard SCAN_TYPE switch — S_PARALLEL_HEAP_SCAN is one arm alongside S_HEAP_SCAN, S_INDX_SCAN, etc.

scan_open_parallel_heap_scan is the entry. The optimiser sets ACCESS_SPEC_FLAG_NUM_PARALLEL_THREADS when parallel is plausible on a TARGET_CLASS access-spec; the function decides whether to go parallel now. It starts by setting scan_id->type = S_HEAP_SCAN (optimistic fallback), then closes five gates: (1) system classes — too small, too read-by-internals; (2) MVCC-disabled classes (catalog, transient) — the parallel scan assumes per-worker visibility check is identical to serial; (3) select-lock-needed (serializable / FOR UPDATE) — parallel scan does not propagate row locks; (4) private_heap_id == 0 excludes non-main-thread scans (no nested parallel); (5) empty HFID. After the gates, it calls file_get_num_user_pages and compute_parallel_degree(parallel_type::HEAP_SCAN, ...) for the page-count threshold; then worker_manager::try_reserve_workers for the global reservation. On any failure the function falls back to serial by leaving scan_id->type at S_HEAP_SCAN. On success it picks one of the three result types (MERGEABLE_LIST / BUILDVALUE_OPT / XASL_SNAPSHOT), placement-new’s manager<RESULT_TYPE>, calls its open(), and sets scan_id->type = S_PARALLEL_HEAP_SCAN. Note the MERGEABLE_LIST flag is unset when xasl->topn_items or XASL_TO_BE_CACHED is set — those features need stable addresses across the private-heap shuttle, so the worker output must go through XASL_SNAPSHOT instead.

The three result types capture three caller shapes. MERGEABLE_LIST is the common case: each worker writes a QFILE_LIST_ID; main thread merges on read. XASL_SNAPSHOT is row-by-row — used when the parent wants tuples one at a time without intermediate list files (e.g., XASL_TO_BE_CACHED or topn_items). BUILDVALUE_OPT is the aggregation fast path — each worker builds a partial aggregate and main merges partials at end. The result type is encoded in PARALLEL_HEAP_SCAN_ID and template-specialises the manager<RESULT_TYPE> / task<RESULT_TYPE> classes (whose public surface is open, start_tasks, next, reset, merge_stats, end, close).

Partitioning: heap-file sectors via ftab_set

Section titled “Partitioning: heap-file sectors via ftab_set”

The partitioning unit is the heap-file sector. The heap manager keeps a per-file FILE_FTAB_COLLECTOR of sectors owning data pages; input_handler_ftabs::init_on_main calls file_get_all_data_sectors on the main thread, copies the result into m_ftab_set (via ftab_set::convert), then split (n_sets) into N evenly-sized slices. Page-level coordination after that is wholly per-worker — no shared bitmap, no work-stealing.

Each worker, on initialize(), claims one slice via fetch_add on m_splited_ftab_set_idx. Per-worker state lives in static thread_local members on input_handler_ftabs (m_tl_scan_cache, m_tl_old_page_watcher, m_tl_ftab_set, m_tl_vpid, m_tl_pgoffset, m_tl_ftab). The page-fix watcher pattern (alternating m_tl_old_page_watcher and m_tl_scan_cache->page_watcher) implements page-at-a-time scan with the buffer pool’s ordered-fix protocol — pages are unfixed only after the next page is fixed. input_handler_ftabs::get_next_vpid_with_fix is the worker hot path: pop the next sector, walk its 64-bit page_bitmap finding data pages, ordered-fix each with OLD_PAGE_MAYBE_DEALLOCATED (the race against deallocation between bitmap-build and page-fix is expected — silently skip).

Tasks: per-worker XASL clone and per-page slot iteration

Section titled “Tasks: per-worker XASL clone and per-page slot iteration”

Every worker runs its own parallel_heap_scan::task<RESULT_TYPE> on the worker pool. execute is three phases: initialize (on failure, swap the error message into m_err_messages and set interrupt_code::ERROR_INTERRUPTED_FROM_WORKER_THREAD), loop, finalize.

initialize copies the parent’s connection / transaction context, clones the XASL tree (via xcache_find_xasl_id_for_execute when the cache holds a clonable XASL or stx_map_stream_to_xasl when not), opens a per-worker HEAP_SCAN_ID on the cloned spec, opens any nested list / index scans needed by deeper XASL levels (subqueries, joins above), and sets up the input handler’s sector slice and result handler’s writer state. m_px_orig_thread_entry is set to the parent pointer — the engine-wide hook for “is this a parallel worker?”. Code that needs to behave differently in a worker (perfmon, log retry, lock-wait timeouts) checks this.

loop per outer iteration: check m_interrupt->get_code(), check logtb_is_interrupted_tran, ask the input handler for the next VPID, and on success drive the slot iterator through every qualified slot on that page. For each qualified slot evaluate if_pred; if the XASL has a scan_ptr (a nested level — list or index scan participating in a join above), recurse through qexec_execute_scan_ptr per qualified outer row before calling result_handler_p->write(...). The result-type specialisation chooses between m_xasl->outptr_list (mergeable list / buildvalue) and m_xasl->val_list (xasl snapshot). After each row, clear_xasl_dptr_list clears per-row dynamic sub-XASL state. Errors set ERROR_INTERRUPTED_FROM_WORKER_THREAD; user interrupts set USER_INTERRUPTED_FROM_WORKER_THREAD.

finalize is the inverse: write-finalises the result handler, finalises the input handler and slot iterator, records join info, cleans cloned XASL state, and either retires the XASL clone (xcache_retire_clone) or frees the unpacked tree.

While workers fan out, the parent’s scan_next_parallel_heap_scan keeps calling manager->next(). The first call lazily kicks the workers (start_tasks()); subsequent calls delegate to the result handler’s read(). Mergeable-list reads from m_xasl->list_id and re-clones values across the private-heap boundary via fetch_val_list; xasl-snapshot reads directly into m_xasl->val_list; buildvalue-opt reads into m_xasl->proc.buildvalue.agg_list. Interrupt fan-in is at the end: if any worker raised an error, swap its er_message into the main thread’s context and return S_ERROR.

The MERGEABLE_LIST path is the most intricate: the worker writes into a private-heap-allocated QFILE_LIST_ID but the reader must hand values back through the main thread’s private heap. The shuttle is pr_clone_value calls between two db_change_private_heap boundaries — clone out of worker heap, clear worker-side, switch heap, clone into main-side. This is the reason MERGEABLE_LIST cannot be combined with topn_items or XASL_TO_BE_CACHED — those features need stable addresses across the boundary.

sequenceDiagram
    participant Main as Main thread<br/>(scan_next_scan)
    participant Mgr as parallel_heap_scan::<br/>manager&lt;result_type&gt;
    participant InH as input_handler_ftabs<br/>(thread_local sectors)
    participant W1 as Worker 1
    participant Wn as Worker N
    participant ResH as result_handler

    Main->>Mgr: scan_open_parallel_heap_scan
    Mgr->>InH: init_on_main (split sectors)
    Mgr->>Mgr: try_reserve_workers (N)
    Main->>Mgr: scan_next_parallel_heap_scan (1st)
    Mgr->>Mgr: start_tasks() — push N tasks
    par fan-out
        Mgr-->>W1: push_task (clone XASL)
        Mgr-->>Wn: push_task (clone XASL)
    end
    W1->>InH: claim slice
    Wn->>InH: claim slice
    loop per VPID
        W1->>InH: get_next_vpid_with_fix
        W1->>W1: slot iterator + if_pred + write
        Wn->>InH: get_next_vpid_with_fix
        Wn->>Wn: slot iterator + if_pred + write
    end
    W1-->>ResH: writer_result_p / aggregate
    Wn-->>ResH: writer_result_p / aggregate
    Main->>ResH: read (merge)
    Main->>Main: fetch_val_list (private-heap shuttle)
    Main-->>Mgr: SCAN_CODE per call
    Main->>Mgr: scan_close — release_workers, free

The parallel hash join sits inside qexec_hash_join — in the new HASHJOIN_STATUS_PARALLEL arm of the status switch. The serial path is the same as the non-parallel hash join (see cubrid-hash-join.md): manager init, empty-side check, partition decision, then single-pass classic build/probe or partitioned build/probe. The parallel arm replaces the serial partitioned build/probe with a worker-fanned version when partitioning is chosen and the per-partition page-count is large enough.

hjoin_try_parallel runs after hjoin_try_partition has decided the join needs partitioning. It computes min_page_cnt = min(outer_list_id->page_cnt, inner_list_id->page_cnt), calls compute_parallel_degree(parallel_type::HASH_JOIN, min_page_cnt, manager->num_parallel_threads), returns HASHJOIN_STATUS_PARTITION (serial fallback) on degree < 2, then clamps num_parallel_threads down to manager->context_cnt (no point in more workers than partitions) and calls worker_manager::try_reserve_workers. On success it stores the handle on manager->px_worker_manager and returns HASHJOIN_STATUS_PARALLEL; the dispatcher in qexec_hash_join then calls parallel_query::hash_join::execute_partitions(*thread_p, &manager).

build_partitions is phase 1: call hjoin_init_shared_split_info, then for both outer and inner relations: qfile_collect_list_sector_info to enumerate data sectors, push N split_task instances onto the operator’s task_manager, task_manager.join() to barrier. Each split_task reads its assigned chunk of the input list, hashes by the join key, and writes into per-partition output files (using shared membuf claims to coordinate access).

execute_partitions is phase 2 + 3: push N join_task instances (each builds an in-memory hash for one partition’s inner side and probes the corresponding outer-side partition); join; then merge per-partition list ids via hjoin_merge_qlist.

split_task and join_task both inherit from base_task : cubthread::entry_task and share a task_manager that wraps push/wait/join on top of the operator-level worker_manager. The task_manager’s push_task increments an active-task counter under a mutex before calling m_worker_manager->push_task; end_task decrements, calls pop_task, and signals m_all_tasks_done_cv.notify_all on hitting zero; join waits on the condvar then m_worker_manager->wait_workers().

The hash-join task_manager differs from the heap-scan pattern in two ways: it uses a condvar wait instead of a yield-loop because the parent has nothing useful to do during the wait (unlike the heap-scan parent, which can be reading results in parallel with worker writes); and it has an explicit handle_error that swaps a worker’s er_message into the main thread’s cuberr::context directly — bypassing the err_messages_with_lock indirection — because the hash join only surfaces one error and wants it on the main thread immediately.

task_execution_guard — RAII for thread-context fan-out

Section titled “task_execution_guard — RAII for thread-context fan-out”

Every worker starts by impersonating the parent thread. The task_execution_guard constructor sets m_thread_ref.m_px_orig_thread_entry, conn_entry, tran_index, and on_trace from the main thread’s entry, then calls push_resource_tracks(). The destructor clears conn_entry and on_trace and calls pop_resource_tracks(). Without this every worker would either run with no transaction context (tripping every assertion in the access-method layer) or with a stale context from a previous task.

spawn_manager — per-thread XASL substructure cloning

Section titled “spawn_manager — per-thread XASL substructure cloning”

Hash join uses a thread-local spawner that clones join-time substructures (val_descr, during-join predicate, outer/inner regu lists) on demand per worker. Each get_* call (get_val_descr, get_during_join_pred, get_outer_regu_list_pred, get_inner_regu_list_pred) lazily allocates and caches the cloned substructure on the worker’s db_private_alloc heap, then returns the cached pointer on subsequent calls. This is the hash-join analogue of the heap-scan task’s clone_xasl(), but at finer granularity: only the substructures each worker actually needs.

flowchart TB
    QHJ["qexec_hash_join (status switch)"]
    HTP["hjoin_try_parallel:<br/>compute_parallel_degree(HASH_JOIN)<br/>· try_reserve_workers"]
    QHJ -->|HASHJOIN_STATUS_TRY| HTP
    HTP -->|< 2 workers| Single[serial: hjoin_execute]
    HTP -->|>= 2 workers| EP[parallel_query::hash_join::execute_partitions]
    QHJ -->|HASHJOIN_STATUS_PARTITION| BP1[hjoin_execute_partitions serial]
    EP --> Phase1[build_partitions]
    Phase1 --> SplitO["N x split_task (outer)"]
    SplitO --> JoinO[task_manager::join]
    JoinO --> SplitI["N x split_task (inner)"]
    SplitI --> JoinI[task_manager::join]
    JoinI --> Phase2[execute_partitions phase 2]
    Phase2 --> JT["N x join_task (build+probe per partition)"]
    JT --> JoinJ[task_manager::join]
    JoinJ --> MergeR["hjoin_merge_qlist per partition"]
    MergeR --> Done[xasl-&gt;list_id]

The third orchestrator runs whole XASL sub-trees in parallel. Its canonical use is uncorrelated subqueries in the aptr_list of certain proc types — sub-queries that the executor would otherwise run sequentially as a prelude to the main block (UNION_PROC is a classic example, where each branch is independent).

make_parallel_query_executor_recursively — wiring on the main thread

Section titled “make_parallel_query_executor_recursively — wiring on the main thread”

The C-callable entry wires up the parallel structure before qexec_execute_mainblock is called. It early-outs on !xcache_uses_clones() (parallel query execute requires clonable XASL), sets thread_p->m_px_orig_thread_entry = thread_p, optionally initialises perf-monitor parallel stats, then walks the XASL tree twice via cubxasl::iterate_xasl_tree. The first walk counts uncorrelated aptr nodes per parallel-eligible XASL type; the second walk attaches a query_executor at every parallel-eligible XASL node with at least two non-link aptrs — the first such node gets a root executor (m_is_root_executor == true, owns the queue and worker pool); every subsequent node gets a child executor that shares the root’s queue and worker pool.

The parallel-eligible XASL types are those whose aptr_list typically contains independent subqueries: BUILDLIST_PROC / BUILDVALUE_PROC (a SELECT producing a list or scalar), UNION_PROC / INTERSECTION_PROC / DIFFERENCE_PROC (set ops over independent branches), HASHJOIN_PROC (whose sub-build sides may themselves be independent), and MERGELIST_PROC (the parallel union-all of list files used by partition-pruning queries). The entire query tree therefore shares one job queue and one set of workers, and parallelism is amortised across nested parallel-eligible operators.

query_executor::run_jobs — single shared queue, work-loop on main

Section titled “query_executor::run_jobs — single shared queue, work-loop on main”

run_jobs ensures exactly one parallel-task is on the global pool (lazy-spawning a single parallel_query_execute::task on first call), has the main thread run the pre-popped first job (stashed by add_job as m_job to avoid an empty queue on small fan-outs), and work-steals from the queue until m_join_context.get_running_jobs() == 0. Joining is a condvar wait on join_context::join_jobs. On root, after join, the function pushes a sentinel (push_last()), releases workers, performs interrupt fan-in (same pattern as heap scan), and merges perf stats.

The design choice that matters: the main thread also participates as a worker. After scheduling one task on the worker pool, main starts popping jobs from the queue and running them. Worker(s) and main race on try_pop; whichever sees a job runs it. This is work-stealing at the granularity of XASL sub-trees — and it has the nice side-effect that even when the worker pool is starved (no workers available, worker_manager returned 0), main still runs all the jobs serially through the same code path.

The join_context is a tiny condvar-based barrier: add_running_jobs / sub_running_jobs increment / decrement under a mutex (the latter notifies on hitting zero); join_jobs waits until zero. The actual job runs in execute_job_internal, which clones the XASL state, swaps the worker thread’s context (conn_entry, tran_index, on_trace, m_px_orig_thread_entry) to point at the parent, runs qexec_execute_mainblock, copies the sub-query’s list_id out, and restores.

Job queue: lock-free MPMC ring (thread_safe_queue)

Section titled “Job queue: lock-free MPMC ring (thread_safe_queue)”

The job queue is a textbook bounded MPMC ring with sequence numbers per slot. Each slot carries data, std::atomic<uint64_t> sequence, and std::atomic<bool> ready. Fast paths (try_push_fast / try_pop_fast) execute the canonical Vyukov sequence: load position, CAS the slot’s sequence (pospos + m_capacity), write data, release-store ready = true, fetch_add the position. Slow paths take a mutex and wait on a condvar for space / work, with interrupt checks per iteration. Two refinements specific to parallel-query: push_completed is a sticky shutdown flag, and reset_queue is the wraparound handler that runs when the absolute position counter approaches UINT64_MAX — paranoid guard, practically never fires. Deeper backstory in cubrid-thread-worker-pool.md.

sequenceDiagram
    participant Main as Main thread
    participant QE as query_executor
    participant Q as thread_safe_queue&lt;job&gt;
    participant Pool as parallel-query pool
    participant W as Worker

    Main->>QE: make_parallel_query_executor_recursively
    QE->>QE: walk XASL, attach px_executor
    Main->>QE: add_job (job_1) — stash in m_job
    Main->>QE: add_job (job_2..n)
    QE->>Q: push job_2..n
    Main->>QE: run_jobs()
    QE->>Pool: push_task (query_task)
    Pool->>W: dispatch
    Main->>QE: execute_job_internal (m_job)  /* main as worker */
    par main races worker on Q
        Main->>Q: try_pop -> job_k
        Main->>QE: execute_job_internal (job_k)
        W->>Q: pop -> job_m
        W->>QE: execute_job_internal (job_m)
    end
    QE->>QE: join_context.join_jobs()
    Main->>Pool: release_workers()
    Main-->>QE: aggregated stats + interrupt fan-in

The parallel sort is the oldest of the four — its DSL of macros in px_sort.h predates the cubthread::entry_task / worker_manager plumbing and integrates by pushing parallel_query::callable_task instances directly. The pattern is slice-and-merge: split the input temp file into N sub-files, sort each in a worker, then merge. The two driver macros are SORT_EXECUTE_PARALLEL(num, px_sort_param, function) (allocates N callable_task instances bound to function with each &px_sort_param[i] and pushes onto sort_param->px_worker_manager) and SORT_WAIT_PARALLEL(parallel_num, sort_param, px_sort_param) (polls every px_sort_param[i].px_status under sort_param->px_mtx until all are not PX_PROGRESS, on a condvar, then wait_workers()).

The DOP gate is sort_check_parallelism, which uses the shared compute_parallel_degree(parallel_type::SORT, ...). There are two parallel-sort callers: SORT_ORDER_BY (the executor’s ORDER BY sort over a temp file, with the per-query parallelism hint) and SORT_INDEX_LEAF (the parallel index-leaf builder for B+Tree construction, with no per-query hint). On a successful gate the function calls try_reserve_workers and returns the actual reserved count; on any failure it returns 1 so the caller falls back to single-threaded sort.

The status enum (PX_PROGRESS, PX_DONE, PX_ERR_FAILED) is the parallel-sort-specific equivalent of interrupt_code. The unique part of parallel sort is that the merge phase is itself parallel: sort_merge_nruns_parallel runs log2(N)-tournament-style merge tasks on the worker pool until a single sorted output remains.

Stable symbols, grouped by subsystem. The position-hint table at the end maps the most-cited symbols to (file, line) as of updated:.

Foundation — src/query/parallel/

  • parallel_query::compute_parallel_degree — DOP heuristic.
  • parallel_query::parallel_typeHEAP_SCAN, HASH_JOIN, SORT, SUBQUERY.
  • parallel_query::worker_manager — per-operator handle. try_reserve_workers, release_workers, wait_workers, push_task, pop_task, get_reserved_workers.
  • parallel_query::worker_manager_global — Meyers singleton. Owns m_worker_pool (the named parallel-query pool) and std::atomic<int> m_available. init, destroy, plus internal try_reserve_workers, release_workers, push_task.
  • parallel_query::interrupt — seven-state atomic enum.
  • parallel_query::atomic_instnumis_instnum_satisfies_after_1tuple_insert.
  • parallel_query::err_messages_with_lockmove_top_error_message_to_this.
  • parallel_query::callable_taskcubthread::task<entry> adapter for std::function / std::bind bodies (used by the sort macros).
  • parallel_query::ftab_setconvert, split (n_sets), get_next, append, move_from, clear.
  • parallel_query::thread_safe_queue<T> — Vyukov MPMC ring. push, pop, try_push, try_pop, push_last, is_empty, is_full, size, capacity, reset_queue.

Heap scan — src/query/parallel/px_heap_scan/

  • parallel_heap_scan::manager<RESULT_TYPE> — top-level per-scan orchestrator. open, start_tasks, next, reset, merge_stats, end, close. Templated on RESULT_TYPE::{MERGEABLE_LIST, XASL_SNAPSHOT, BUILDVALUE_OPT}.
  • parallel_heap_scan::task<RESULT_TYPE>cubthread::entry_task body. execute, retire, initialize, finalize, clone_xasl, loop.
  • parallel_heap_scan::input_handler_ftabs — sector-slice dispatcher. init_on_main, initialize, finalize, get_next_vpid_with_fix. Thread-local page watchers.
  • parallel_heap_scan::result_handler<RESULT_TYPE>read_initialize, read, read_finalize, write_initialize, write, write_finalize.
  • parallel_heap_scan::slot_iteratorinitialize, finalize, set_page, next_qualified_slot_with_peek.
  • parallel_heap_scan::join_infocapture_join_info, record_join_info, apply_join_info, get_scan_info.
  • parallel_heap_scan::trace_handler, accumulative_trace_storage — perf counter aggregation.
  • C entry points: scan_open_parallel_heap_scan, scan_start_parallel_heap_scan, scan_next_parallel_heap_scan, scan_reset_scan_block_parallel_heap_scan, scan_end_parallel_heap_scan, scan_close_parallel_heap_scan. Checker: scan_check_parallel_heap_scan_possible.

Hash join — src/query/parallel/px_hash_join/

  • parallel_query::hash_join::build_partitions — phase 1 (split outer, then split inner).
  • parallel_query::hash_join::execute_partitions — phase 2 + 3 (per-partition build/probe, then merge).
  • parallel_query::hash_join::task_managerpush_task, end_task, join, handle_error, notify_stop, check_interrupt, clear_interrupt.
  • parallel_query::hash_join::task_execution_guard — RAII context fan-out.
  • parallel_query::hash_join::base_task, split_task, join_task — task hierarchy.
  • parallel_query::hash_join::spawn_manager — TLS substructure cloner. get_val_descr, get_during_join_pred, get_outer_regu_list_pred, get_inner_regu_list_pred, get_instance, destroy_instance.
  • C-side in query_hash_join.c: hjoin_try_parallel (gate), qexec_hash_join’s HASHJOIN_STATUS_PARALLEL arm, hjoin_init_shared_split_info, hjoin_clear_shared_split_info, hjoin_trace_*, hjoin_merge_qlist.

Query execute — src/query/parallel/px_query_execute/

  • parallel_query_execute::query_executor — top-level (root + child constructors). add_job, run_jobs, get_parallelism, get_stats.
  • parallel_query_execute::taskcubthread::entry_task body. execute, retire, init, get_job, execute_job, end.
  • parallel_query_execute::execute_job_internal — clones XASL state, swaps thread context, calls qexec_execute_mainblock, copies list_id out, restores.
  • parallel_query_execute::job(xasl_node*, xasl_state*, join_context*, trace_context*) queue payload.
  • parallel_query_execute::join_context — running-job barrier (add_running_jobs, sub_running_jobs, get_running_jobs, join_jobs).
  • parallel_query_execute::trace_context — per-job perf-stat vector.
  • make_parallel_query_executor_recursively — wires xasl_p->px_executor for every parallel-eligible XASL node.
  • Checker: check_parallel_subquery_possible.

Sort — src/query/parallel/px_sort.{h,c} and src/storage/external_sort.c

  • Macros: SORT_IS_PARALLEL, SORT_EXECUTE_PARALLEL, SORT_WAIT_PARALLEL.
  • Status enum: PX_ERR_FAILED, PX_DONE, PX_PROGRESS. Type enum: PARALLEL_TYPE::{PX_SINGLE, PX_MAIN_IN_PARALLEL, PX_THREAD_IN_PARALLEL}.
  • Functions: sort_check_parallelism, sort_start_parallelism, sort_end_parallelism, sort_listfile_execute, sort_copy_sort_param, sort_copy_sort_info, sort_split_input_temp_file, sort_merge_run_for_parallel, sort_merge_nruns, sort_put_result_for_parallel, sort_merge_nruns_parallel, sort_split_last_run, sort_put_result_from_tmpfile.

Cross-module integration

  • src/query/scan_manager.c — every public scan entry has an S_PARALLEL_HEAP_SCAN switch arm that forwards to the C wrappers in px_heap_scan.cpp (scan_start_scan, scan_reset_scan_block, scan_end_scan, scan_close_scan, scan_next_scan_local).
  • src/query/query_hash_join.cqexec_hash_join adds a HASHJOIN_STATUS_PARALLEL arm calling parallel_query::hash_join::execute_partitions. The status is decided by hjoin_try_parallel.
  • src/storage/external_sort.csort_listfile_internal calls sort_check_parallelism and dispatches via the macros on ≥ 2.
  • src/thread/thread_worker_pool*.{cpp,hpp} — the worker_pool template the global parallel-query pool instantiates.
  • THREAD_ENTRY fields used: m_px_orig_thread_entry (parent pointer), m_uses_px_stats, m_px_stats (per-thread perf buffer), m_px_stats_mutex, m_px_lock_mutex (locks the cloned XASL during heap-scan task initialise/finalise).
symbolfileline
compute_parallel_degreesrc/query/parallel/px_parallel.cpp36
worker_manager::try_reserve_workerssrc/query/parallel/px_worker_manager.cpp49
worker_manager::wait_workerssrc/query/parallel/px_worker_manager.cpp97
worker_manager_global::initsrc/query/parallel/px_worker_manager_global.cpp53
worker_manager_global::try_reserve_workerssrc/query/parallel/px_worker_manager_global.cpp97
REGISTER_WORKERPOOL(parallel_query)src/query/parallel/px_worker_manager_global.cpp48
interrupt::interrupt_codesrc/query/parallel/px_interrupt.hpp31
err_messages_with_lock::move_top_error_message_to_thissrc/query/parallel/px_interrupt.hpp100
ftab_set::splitsrc/query/parallel/px_ftab_set.hpp106
thread_safe_queue<T>::try_push_fastsrc/query/parallel/px_thread_safe_queue.cpp172
thread_safe_queue<T>::try_pop_fastsrc/query/parallel/px_thread_safe_queue.cpp221
scan_open_parallel_heap_scanpx_heap_scan/px_heap_scan.cpp349
scan_next_parallel_heap_scanpx_heap_scan/px_heap_scan.cpp44
scan_close_parallel_heap_scanpx_heap_scan/px_heap_scan.cpp242
parallel_heap_scan::manager<>::openpx_heap_scan/px_heap_scan.cpp632
parallel_heap_scan::manager<>::start_taskspx_heap_scan/px_heap_scan.cpp795
parallel_heap_scan::manager<>::nextpx_heap_scan/px_heap_scan.cpp817
parallel_heap_scan::task<>::executepx_heap_scan/px_heap_scan_task.cpp44
parallel_heap_scan::task<>::initializepx_heap_scan/px_heap_scan_task.cpp71
parallel_heap_scan::task<>::clone_xaslpx_heap_scan/px_heap_scan_task.cpp439
parallel_heap_scan::task<>::looppx_heap_scan/px_heap_scan_task.cpp510
input_handler_ftabs::init_on_mainpx_heap_scan/px_heap_scan_input_handler_ftabs.cpp60
input_handler_ftabs::get_next_vpid_with_fixpx_heap_scan/px_heap_scan_input_handler_ftabs.cpp87
hash_join::build_partitionspx_hash_join/px_hash_join.cpp43
hash_join::execute_partitionspx_hash_join/px_hash_join.cpp157
hash_join::task_manager::push_taskpx_hash_join/px_hash_join_task_manager.cpp58
hash_join::task_manager::joinpx_hash_join/px_hash_join_task_manager.cpp81
hash_join::task_execution_guardpx_hash_join/px_hash_join_task_manager.hpp105
hash_join::spawn_manager::spawnpx_hash_join/px_hash_join_spawn_manager.hpp85
make_parallel_query_executor_recursivelypx_query_execute/px_query_executor.cpp279
query_executor::run_jobspx_query_execute/px_query_executor.cpp115
execute_job_internalpx_query_execute/px_query_task.cpp91
join_context::join_jobspx_query_execute/px_query_job.hpp70
hjoin_try_parallelsrc/query/query_hash_join.c1965
qexec_hash_join HASHJOIN_STATUS_PARALLEL armsrc/query/query_hash_join.c230
sort_check_parallelismsrc/storage/external_sort.c4936
SORT_EXECUTE_PARALLEL / SORT_WAIT_PARALLELsrc/query/parallel/px_sort.h41, 52
  • parallel_heap_scan::manager is templated on RESULT_TYPE, but the C API is non-templated and the runtime dispatch uses a result_type field on PARALLEL_HEAP_SCAN_ID. Adding a fourth result type requires five touches: the enum, the manager instantiation, the task instantiation, the runtime switches in scan_next_parallel_heap_scan / scan_reset_scan_block_parallel_heap_scan / scan_close_parallel_heap_scan, and the result-type selection in scan_open_parallel_heap_scan.
  • The DOP heuristic auto_degree = log2(num_pages / page_threshold) + 2 is documented nowhere except the comment in compute_parallel_degree; it is not a Postgres-style linear ramp. For very large tables the DOP saturates at parallelism long before num_pages does — conservative for huge scans, aggressive for medium scans.
  • The five “no parallel” gates inside scan_open_parallel_heap_scan (system class, MVCC-disabled, select-lock-needed, private_heap_id == 0, partitioned-class) are checked once at open. The comment “DB_PARTITION_CLASS will be parallel-heap-scanned, not DB_PARTITIONED_CLASS” implies the right unit is the leaf partition, not the parent.
  • make_parallel_query_executor_recursively early-outs on !xcache_uses_clones() — parallel query execute requires clonable XASL. Heap scan also benefits but its clone_xasl has both branches (cache and non-cache via stx_map_stream_to_xasl), so it can function without the cache; the query executor cannot.
  • The hash-join task_manager uses a condvar wait but the heap-scan worker_manager uses a yield-loop wait. The heap-scan main thread is not idle during fan-out — it is reading results in parallel with worker writes — so it cannot block. The hash-join phase 1 main thread is idle and benefits from a real condvar. The query-executor main thread participates as a worker and waits on join_context::join_jobs only after exhausting the queue.
  • The interrupt enum has seven states (not binary on/off) because the parallel layer needs both direction and reason.
  • THREAD_ENTRY::m_px_lock_mutex is unusual — a per-thread mutex whose only callers are other threads (the main thread’s m_px_lock_mutex is locked by workers in clone_xasl() and finalize()). It serialises XASL-clone allocation / retirement against the main thread’s xasl-cache operations. It is a parent-side lock for child-thread access, not a per-thread self-lock.
  • m_px_orig_thread_entry is the engine-wide hook for “is this a parallel worker?”. Code that needs to behave differently in a worker (perfmon, log-acquisition retry, lock-wait timeouts) checks this and uses the parent’s context. New cross-cutting features (e.g., a per-query memory limit) should plug into this same hook rather than threading a new field through every worker boundary.
  • Adaptive DOP. compute_parallel_degree runs once at scan open / hash-join admission / sort start; the DOP cannot be adjusted afterwards. If contention or page-buffer pressure spikes mid-query, the engine cannot scale back. Postgres has the same limitation; Oracle’s “in-memory parallel execution” has dynamic re-sharding.
  • Parallel index scan. The catalogue lists S_INDX_SCAN with no parallel sibling. Adding an S_PARALLEL_INDX_SCAN would require partitioning the B+Tree leaf level and a per-worker BTREE_SCAN clone with shared cursor state.
  • Parallel aggregation above heap scan. BUILDVALUE_OPT partials are merged at the main thread’s read() — parallel but not at the operator level. For high-cardinality GROUP BY the merge becomes the bottleneck. A parallel merge (tournament tree of partial aggregates, or hash-partitioned merge) is a known literature win but not implemented.
  • Nested parallelism. Each orchestrator reserves from the same pool; if a hash join calls into a heap scan that also wants to go parallel, the inner call sees a depleted pool and silently falls back to serial. The private_heap_id == 0 gate is the explicit guard. Whether to allow controlled nesting (e.g., reserve fewer workers per level) is an open choice.
  • Memory accounting. Each worker uses db_private_alloc for scratch — XASL clones, cloned val_descr, partial aggregates — but the budget is per-thread, not per-query. A query with high parallelism multiplies its memory footprint by N. There is no per-query memory limit at the parallel level today.
  • Source files (under /data/hgryoo/references/cubrid/): src/query/parallel/px_parallel.{hpp,cpp}, px_worker_manager{,_global}.{hpp,cpp}, px_callable_task.{hpp,cpp}, px_thread_safe_queue.{hpp,cpp}, px_interrupt.hpp, px_ftab_set.hpp, px_sort.{h,c}, px_heap_scan/*.{hpp,cpp}, px_hash_join/*.{hpp,cpp}, px_query_execute/*.{hpp,cpp}, src/thread/thread_worker_pool*.{hpp,cpp}, src/query/scan_manager.c, src/query/query_hash_join.c, src/storage/external_sort.c.
  • Related curated docs: cubrid-scan-manager.md, cubrid-hash-join.md, cubrid-thread-worker-pool.md, cubrid-external-sort.md.
  • Theoretical references: Graefe, Encapsulation of Parallelism in the Volcano Query Processing System (SIGMOD 1990); Graefe, Query Evaluation Techniques for Large Databases (CSUR 1993) §3; Leis et al., Morsel-Driven Parallelism (SIGMOD 2014); Anatomy of a Database System (Red Book Ch. 4) for shared build; Garcia-Molina/Ullman/Widom, Database Systems: The Complete Book §15.4 (two-phase parallel sort); Vyukov, Bounded MPMC queue.