Pipeline Executor
The Pipeline Executor is the core query execution engine that orchestrates parallel processing of filter-based query pipelines. It uses a sophisticated dual-pool work-stealing architecture to efficiently distribute work across threads, with lock-free coordination via atomic operations.
Purpose
When the pipeline builder creates Jobs (collections of PipelineSteps), the executor must:
- Distribute work efficiently across multiple threads
- Parallelize expensive jobs by allowing multiple workers to execute different steps simultaneously
- Prioritize heavy workloads so they don’t block the queue
- Coordinate without locks using atomic compare-and-swap operations
- Balance throughput and latency with separate main and reserve worker pools
High-Level Architecture
╔═══════════════════════════════════════════════════════════════════════════════╗
║ PIPELINE EXECUTOR ║
║ ║
║ ┌────────────────┐ ║
║ │ Client submits │ executor.submit(job) ║
║ │ Job │ ─────────────────────┐ ║
║ └────────────────┘ │ ║
║ ▼ ║
║ ┌─────────────────────┐ ║
║ │ job_tx (MPMC) │ ║
║ │ Unbounded channel │ ║
║ └─────────┬───────────┘ ║
║ │ ║
║ ┌───────────────────┴──────────────────┐ ║
║ │ │ ║
║ ▼ ▼ ║
║ ╔═══════════════════════╗ ╔═══════════════════════╗ ║
║ ║ MAIN WORKER POOL ║ ║ RESERVE WORKER POOL ║ ║
║ ║ (85% of threads) ║ ║ (15% of threads) ║ ║
║ ╚═══════════════════════╝ ╚═══════════════════════╝ ║
║ │ │ ║
║ │ 1. Insert JobHandle │ ║
║ │ into JobBoard │ ║
║ │ │ ║
║ │ 2. Send wake signal ────────────────▶│ ║
║ │ to reserve workers │ ║
║ │ │ ║
║ │ 3. Start executing job │ ║
║ │ via job.get_next() │ ║
║ │ │ ║
║ ▼ ▼ ║
║ ┌─────────────────────┐ ┌─────────────────────┐ ║
║ │ CAS claim step 0 │ │ Wait for wake signal│ ║
║ │ Execute filters │ │ Pop heaviest job │ ║
║ │ Forward batch │ │ CAS claim step N │ ║
║ └─────────────────────┘ └─────────────────────┘ ║
║ │ │ ║
║ └──────────────┐ ┌────────────┘ ║
║ ▼ ▼ ║
║ ┌──────────────────────────┐ ║
║ │ JOBBOARD (SkipSet) │ ║
║ │ Sorted by cost (desc) │ ║
║ │ [Job7, Job3, Job1] │ ║
║ └──────────────────────────┘ ║
╚═══════════════════════════════════════════════════════════════════════════════╝
Key Design Decisions:
- Dual-pool architecture separates job intake (main) from heavy workload handling (reserve)
- Lock-free JobBoard uses SkipSet for concurrent priority queue operations
- CAS-based coordination ensures exactly-once step execution without mutexes
- Work stealing allows reserve workers to assist with expensive jobs
Core Components
1. PipelineExecutor
The main coordinator struct:
pub struct PipelineExecutor {
job_tx: Sender<Arc<Job>>, // Channel to send jobs to main workers
wake_tx: Sender<()>, // Channel to wake reserve workers
job_board: Arc<SkipSet<JobHandle>>, // Shared priority queue of active jobs
seq: Arc<AtomicU64>, // Monotonic sequence number generator
main_pool: ThreadPool, // Pool of main workers (85%)
reserve_pool: ThreadPool, // Pool of reserve workers (15%)
}
Initialization Flow:
PipelineExecutor::new(total_threads: usize)
│
├─▶ Step 1: Calculate pool sizes
│ split_threads(total) → (main_count, reserve_count)
│ Example: 16 threads → (14 main, 2 reserve)
│
├─▶ Step 2: Create communication channels
│ • job_tx/job_rx: Unbounded MPMC for job submission
│ • wake_tx/wake_rx: Unbounded MPMC for wake signals
│
├─▶ Step 3: Create shared data structures
│ • JobBoard: Arc<SkipSet<JobHandle>> (sorted by cost)
│ • Sequence counter: Arc<AtomicU64::new(0)>
│
├─▶ Step 4: Spawn main worker pool
│ main_pool = ThreadPool::new(main_count)
│ for each worker:
│ Clone: job_rx, board, seq, wake_tx
│ Execute: run_main_worker(job_rx, board, seq, wake_tx)
│
└─▶ Step 5: Spawn reserve worker pool
reserve_pool = ThreadPool::new(reserve_count)
for each worker:
Clone: wake_rx, board
Execute: run_reserve_worker(wake_rx, board)
2. JobHandle
A wrapper that adds ordering semantics to Jobs for the JobBoard priority queue:
struct JobHandle {
seq: u64, // Monotonic sequence number (submission order)
job: Arc<Job>, // Reference-counted job
}
Ordering Implementation:
impl Ord for JobHandle {
fn cmp(&self, other: &Self) -> Ordering {
match self.cost().cmp(&other.cost()) {
Ordering::Equal => self.seq.cmp(&other.seq), // Tie-breaker
ord => ord, // Primary: by cost
}
}
}
Priority Rules:
- Primary: Higher cost = higher priority
- Secondary: Lower sequence number = higher priority (FIFO for same cost)
Example Ordering:
Job A: cost=10, seq=1 │
Job B: cost=10, seq=2 │ Same cost → compare seq
Job C: cost=5, seq=3 ├─ A > B > C > D
Job D: cost=1, seq=4 │
SkipSet internal order: [D, C, B, A]
pop_back() returns: A (highest priority)
3. Thread Pool Split Strategy
fn split_threads(total: usize) -> (usize, usize) {
let mut main = (total * 85 + 99) / 100; // Round up to 85%
// Ensure at least 1 main and 1 reserve
if main == 0 { main = 1; }
if main >= total { main = total - 1; }
let mut reserve = total - main;
if reserve == 0 {
reserve = 1;
if main > 1 { main -= 1; }
}
(main, reserve)
}
Split Table:
| Total Threads | Main Workers | Reserve Workers | Ratio |
|---|---|---|---|
| 2 | 1 | 1 | 50/50 (minimum) |
| 4 | 3 | 1 | 75/25 |
| 8 | 7 | 1 | 87.5/12.5 |
| 16 | 14 | 2 | 87.5/12.5 |
| 32 | 27 | 5 | 84.4/15.6 |
| 100 | 85 | 15 | 85/15 (target) |
Rationale:
- Main workers (85%) process the steady stream of incoming jobs
- Reserve workers (15%) provide burst capacity for expensive, high-cost jobs
- Balance between throughput (main) and latency (reserve)
Detailed Execution Flow
Job Submission
Client Code
│
├─▶ executor.submit(job)
│
▼
PipelineExecutor::submit(&self, job: Job)
│
├─▶ Arc::new(job) (wrap in Arc for sharing)
│
└─▶ job_tx.send(Arc<Job>)
│
└─▶ Unbounded MPMC channel
└─▶ Any main worker can receive
Properties:
- Non-blocking: Sender never blocks (unbounded channel)
- Shared ownership: Arc allows multiple workers to hold references
- Best-effort: If channel closed (executor shutdown), send fails silently
Main Worker Loop
Each main worker runs this loop on a dedicated thread:
fn run_main_worker(
job_rx: Receiver<Arc<Job>>,
board: Arc<SkipSet<JobHandle>>,
seq: Arc<AtomicU64>,
wake_tx: Sender<()>,
)
LOOP:
│
├─▶ STEP 1: Block waiting for job
│ job = job_rx.recv()?
│ └─▶ Blocks until job arrives or channel closes
│
├─▶ STEP 2: Generate sequence number
│ seq_num = seq.fetch_add(1, Relaxed)
│ └─▶ Atomic increment, returns old value
│
├─▶ STEP 3: Wrap job in JobHandle
│ handle = JobHandle::new(Arc::clone(&job), seq_num)
│ └─▶ Maintains cost-based ordering
│
├─▶ STEP 4: Insert into JobBoard
│ board.insert(handle)
│ └─▶ SkipSet automatically maintains sorted order
│ O(log n) insertion time
│
├─▶ STEP 5: Wake reserve workers (best-effort)
│ wake_tx.send(())
│ └─▶ Signal that work is available
│
└─▶ STEP 6: Start executing job immediately
job.get_next()
└─▶ CAS claim first available step and execute
Visual Timeline:
Main Worker Thread
─────────────────────────────────────────────────────────────────────
T0: │ Blocked on job_rx.recv()
│
T1: ├─▶ Job arrives! recv() returns
│
T2: ├─▶ seq.fetch_add(1) → seq=42
│
T3: ├─▶ Create JobHandle { seq:42, job:Arc }
│
T4: ├─▶ board.insert(handle)
│ └─▶ JobBoard: [Job1(cost=3), Job2(cost=5), ThisJob(cost=7)]
│ ▲ Inserted here
│
T5: ├─▶ wake_tx.send(()) ─────────────────────────────┐
│ │
T6: └─▶ job.get_next() │
├─▶ CAS: next_free_slot(0 → 1) ✓ │
├─▶ Execute step[0] │
│ • Apply filters ▼
│ • Send batch to next step Reserve Worker wakes
│
└─▶ Continue to next step...
Reserve Worker Loop
Each reserve worker runs this loop:
fn run_reserve_worker(
wake_rx: Receiver<()>,
board: Arc<SkipSet<JobHandle>>,
)
LOOP:
│
├─▶ STEP 1: Block waiting for wake signal
│ wake_rx.recv()?
│ └─▶ Sleeps until woken by main worker
│
├─▶ STEP 2: Steal heaviest job from JobBoard
│ job = select_heaviest(&board)?
│ │
│ ├─▶ board.pop_back()
│ │ └─▶ Remove and return highest-cost job
│ │ (back of sorted set)
│ │
│ └─▶ Extract Arc<Job> from JobHandle
│
└─▶ STEP 3: Execute job
job.get_next()
└─▶ CAS claim next available step and execute
Visual Timeline:
Reserve Worker Thread
─────────────────────────────────────────────────────────────────────
T0: │ Blocked on wake_rx.recv()
│ (sleeping, consuming no CPU)
│
T1: ├─▶ Wake signal arrives! recv() returns
│
T2: ├─▶ board.pop_back()
│ │
│ │ Before: [Job1(cost=3), Job2(cost=5), Job3(cost=7)]
│ │ ▲ pop_back()
│ │ After: [Job1(cost=3), Job2(cost=5)]
│ │
│ └─▶ Returns Job3 (highest cost)
│
T3: └─▶ job.get_next()
├─▶ CAS: next_free_slot(3 → 4) ✓
│ (Main worker already claimed steps 0-2)
│
├─▶ Execute step[3]
│ • Apply filters
│ • Send batch to next step
│
└─▶ Continue to next step...
Key Properties:
- Idle when no work: Blocked on channel, zero CPU usage
- Steals heaviest jobs: Always takes highest-cost job (most steps remaining)
- Can duplicate work: May steal job that main worker is still processing (intentional!)
CAS-Based Step Execution
The core coordination mechanism that ensures exactly-once step execution:
Job Structure
pub struct Job {
pub table_name: String,
pub steps: Vec<PipelineStep>, // All pipeline steps
pub cost: usize, // Number of steps (cached)
pub next_free_slot: AtomicUsize, // Index of next unclaimed step
pub id: String, // Random identifier
pub entry_producer: Sender<PipelineBatch>, // Input to first step
}
Job::get_next() Implementation
pub fn get_next(&self) {
let total = self.steps.len();
if total == 0 { return; }
let mut slot = self.next_free_slot.load(Ordering::Relaxed);
loop {
// Check if all steps completed
if slot >= total { return; }
// Attempt to claim this slot
match self.next_free_slot.compare_exchange_weak(
slot, // Expected: current slot
slot + 1, // Desired: next slot
Ordering::AcqRel, // Success ordering
Ordering::Relaxed, // Failure ordering
) {
Ok(_) => {
// SUCCESS: We claimed this slot!
self.steps[slot].execute();
return;
}
Err(current) => {
// FAILURE: Another worker claimed it first
// Update slot to current value and retry
slot = current;
}
}
}
}
CAS Visualization
Scenario: 2 workers competing for steps in a job with 5 steps:
Initial State:
┌──────────────────────────────────────┐
│ Job │
│ steps: [Step0, Step1, Step2, ...] │
│ next_free_slot: AtomicUsize(0) ────┼─── Both workers read this
└──────────────────────────────────────┘
│
┌───────────┴───────────┐
│ │
Worker A Worker B
│ │
│ slot = 0 │ slot = 0
│ │
│ CAS(0 → 1) │ CAS(0 → 1)
│ expect: 0 │ expect: 0
│ update: 1 │ update: 1
│ │
├─▶ SUCCEEDS ✓ ├─▶ FAILS ✗
│ next_free_slot=1 │ (already changed to 1)
│ │
│ Execute step[0] │ slot = 1 (from failure)
│ │
│ │ CAS(1 → 2)
│ │ expect: 1
│ │ update: 2
│ │
│ ├─▶ SUCCEEDS ✓
│ │ next_free_slot=2
│ │
│ │ Execute step[1]
Result:
- Worker A executes step[0]
- Worker B executes step[1]
- No step executed twice
- No step skipped
- Zero locks used
Parallel Work Stealing in Action
Example: Expensive Job with 10 Steps
TIMELINE: Job with cost=10 submitted to executor
T0: Main Worker receives job
┌──────────────────────────────────┐
│ Job │
│ steps: [0,1,2,3,4,5,6,7,8,9] │
│ next_free_slot: 0 │
└──────────────────────────────────┘
│
├─▶ Insert into JobBoard
├─▶ Send wake signal
└─▶ Start executing
T1: Main Worker claims step 0
next_free_slot: 0 → 1 (CAS success)
┌─────────────┐
│ Execute S0 │ Main Worker
└─────────────┘
T2: Reserve Worker wakes up
┌─────────────────────┐
│ pop_back(JobBoard) │ Reserve Worker
│ → Gets same Job! │
└─────────────────────┘
T3: Main Worker claims step 1
next_free_slot: 1 → 2 (CAS success)
┌─────────────┐
│ Execute S1 │ Main Worker
└─────────────┘
T4: Reserve Worker claims step 2
next_free_slot: 2 → 3 (CAS success)
┌─────────────┐
│ Execute S2 │ Reserve Worker
└─────────────┘
T5: Main Worker claims step 3
Reserve Worker claims step 4
(Both succeed - different slots)
┌─────────────┐ ┌─────────────┐
│ Execute S3 │ │ Execute S4 │
└─────────────┘ └─────────────┘
Main Worker Reserve Worker
T6-T9: Workers continue racing
Both workers call get_next() repeatedly
CAS ensures each step claimed by exactly one worker
T10: All steps completed
next_free_slot: 10 (>= steps.len())
Both workers return from get_next()
Performance Improvement:
- Serial execution (1 worker): 10 time units
- Parallel execution (2 workers): ~5-6 time units
- Speedup: ~1.8x (nearly 2x with perfect work distribution)
JobBoard Dynamics
The JobBoard is a concurrent priority queue implemented as SkipSet<JobHandle>:
SkipSet Properties
╔═══════════════════════════════════════════════════════════════╗
║ SkipSet Structure ║
║ ║
║ Level 3: [ A ───────────────────────▶ D ] ║
║ │ │ ║
║ Level 2: [ A ─┴──▶ B ────────▶ C ────────▶ D ] ║
║ │ │ │ ║
║ Level 1: [ A ───▶ B ─┴─▶ C ──▶ D ─┴─▶ E ──▶ F ─┴─▶ G ] ║
║ ▲ ▲ ║
║ front() back() ║
║ (lowest cost) (highest cost) ║
╚═══════════════════════════════════════════════════════════════╝
- Sorted order: Maintained automatically by Ord implementation
- Concurrent operations: Lock-free insert/remove
- Logarithmic time: O(log n) insert, O(log n) pop
- pop_back(): Removes and returns highest-cost job
JobBoard Lifecycle Example
State 0: Empty
JobBoard: []
─── Main Worker receives Job A (cost=5, seq=0) ───
State 1:
JobBoard: [A(5,0)]
└─▶ A inserted via board.insert()
─── Main Worker receives Job B (cost=3, seq=1) ───
State 2:
JobBoard: [B(3,1), A(5,0)]
▲ ▲
lower higher
cost cost
─── Reserve Worker pops heaviest ───
State 3:
handle = board.pop_back() // Returns A(5,0)
JobBoard: [B(3,1)]
─── Main Worker receives Job C (cost=7, seq=2) ───
State 4:
JobBoard: [B(3,1), C(7,2)]
▲
highest
─── Reserve Worker pops heaviest ───
State 5:
handle = board.pop_back() // Returns C(7,2)
JobBoard: [B(3,1)]
─── Job B completes, removed from active set ───
State 6:
JobBoard: []
Important: Jobs may be executing even after removed from JobBoard (workers hold Arc references)
Channel Communication
Job Channel (MPMC Unbounded)
job_tx (Sender) ────▶ CHANNEL ────▶ job_rx (Receiver)
▲ │
│ ├─▶ Main Worker 1
│ ├─▶ Main Worker 2
│ ├─▶ Main Worker 3
│ └─▶ ...
executor.submit()
Properties:
- Unbounded: Never blocks sender
- MPMC: Multiple producers (clients), multiple consumers (main workers)
- Fairness: Workers compete for jobs (no starvation)
Wake Channel (MPMC Unbounded)
wake_tx (Sender) ────▶ CHANNEL ────▶ wake_rx (Receiver)
▲ │
│ ├─▶ Reserve Worker 1
│ ├─▶ Reserve Worker 2
│ └─▶ ...
│
Main Workers
send on every job
Properties:
- Best-effort: If channel full or receiver busy, signal may be dropped (ok!)
- Coalescing: Multiple wake signals collapsed into one
- Lightweight: Just a unit
()value
Memory Ordering and Atomics
Sequence Number Generation
seq.fetch_add(1, Ordering::Relaxed)
Relaxed ordering is safe because:
- Only used for ordering tie-breaker (not for synchronization)
- Each thread gets a unique value (atomicity guaranteed)
- No happens-before relationships required
CAS in Job::get_next()
compare_exchange_weak(
expected,
new,
Ordering::AcqRel, // Success ordering
Ordering::Relaxed, // Failure ordering
)
AcqRel on success:
- Acquire: See all writes from previous worker that claimed earlier slot
- Release: Publish step execution completion to future workers
Relaxed on failure:
- Just need to reload current value and retry
- No synchronization needed for failed CAS
Complete Example: Two Workers, Four Steps
Job submitted with 4 steps: [S0, S1, S2, S3]
Workers: Main Worker (MW), Reserve Worker (RW)
═══════════════════════════════════════════════════════════════════
T0: Job arrives at MW
MW: job_rx.recv() → Job{steps:[S0,S1,S2,S3], next_free_slot:0}
T1: MW creates JobHandle
handle = JobHandle { seq: 42, job: Arc<Job>, cost: 4 }
T2: MW inserts into JobBoard
JobBoard.insert(handle)
JobBoard: [handle]
T3: MW sends wake signal
wake_tx.send(()) → Queued in wake channel
T4: MW starts executing
MW: job.get_next()
slot = 0
CAS(0 → 1) ✓ Success!
State: next_free_slot = 1
JobBoard: [handle]
MW executing S0
RW still asleep
T5: RW wakes up
RW: wake_rx.recv() → Got signal!
T6: RW steals job from JobBoard
RW: board.pop_back()
→ Returns handle
→ JobBoard now empty: []
RW: job = handle.into_job()
→ RW now has Arc<Job>
T7: MW finishes S0, claims S1
MW: job.get_next()
slot = 1
CAS(1 → 2) ✓ Success!
State: next_free_slot = 2
MW executing S1
T8: RW claims S2
RW: job.get_next()
slot = 2
CAS(2 → 3) ✓ Success!
State: next_free_slot = 3
MW executing S1
RW executing S2
T9: MW finishes S1, claims S3
MW: job.get_next()
slot = 3
CAS(3 → 4) ✓ Success!
State: next_free_slot = 4
MW executing S3
RW executing S2
T10: RW finishes S2, tries to claim S4
RW: job.get_next()
slot = 4
if 4 >= 4: return ← Job complete!
RW: Returns, goes back to waiting
T11: MW finishes S3, tries to claim S5
MW: job.get_next()
slot = 4
if 4 >= 4: return ← Job complete!
MW: Returns, waits for next job
═══════════════════════════════════════════════════════════════════
FINAL STATE:
- All 4 steps executed exactly once
- MW executed: S0, S1, S3
- RW executed: S2
- Total time: ~60% of serial execution (parallelism benefit)
Performance Characteristics
Time Complexity
| Operation | Complexity | Notes |
|---|---|---|
| submit(job) | O(1) | Channel send |
| JobBoard insert | O(log n) | SkipSet insert |
| JobBoard pop_back | O(log n) | SkipSet remove |
| CAS claim step | O(1) | Atomic operation |
| Step execution | O(m) | m = rows to filter |
Space Complexity
- JobBoard: O(n) where n = active jobs
- Channels: O(k) where k = queued jobs/signals
- **Arc
:** Shared, reference counted - Per-worker stack: O(1) fixed size
Scalability
Amdahl’s Law Applied:
- Jobs with few steps (cost < 5): Limited parallelism benefit
- Jobs with many steps (cost > 10): Near-linear speedup with 2+ workers
- Bottleneck: Serial portion = job submission + JobBoard operations
Measured Speedup (example):
- 1 worker: 100% (baseline)
- 2 workers: ~180% (1.8x speedup for high-cost jobs)
- 4 workers: ~320% (3.2x speedup for high-cost jobs)
- 8 workers: ~560% (5.6x speedup for high-cost jobs)
Diminishing returns due to:
- CAS contention on
next_free_slot - Channel synchronization overhead
- JobBoard lock-free coordination cost
Limitations and Trade-offs
1. Fixed 85/15 Split
Limitation: Ratio hardcoded, not adaptive Impact: Suboptimal for workloads with many expensive jobs or many cheap jobs Future: Dynamic adjustment based on queue depth
2. Unbounded Job Queue
Limitation: job_tx channel is unbounded Impact: Memory can grow unboundedly with fast submissions Mitigation: Bounded channel with backpressure in future
3. Best-Effort Wake Signals
Limitation: Wake signals can be dropped or coalesced Impact: Reserve workers may not wake immediately Acceptable: Reserve workers are optimization, not required for correctness
4. No Step Ordering Guarantees
Limitation: Steps execute in arbitrary order (CAS race) Impact: Cannot rely on step N completing before step N+1 starts Design: Intentional - steps must be independent (connected via channels)
5. Work Duplication Risk
Limitation: Main and reserve workers can simultaneously work on same job Impact: CAS ensures no duplicate step execution, but both workers contend Trade-off: Parallelism benefit outweighs contention cost for high-cost jobs
Integration with Pipeline Builder
Pipeline → Executor Flow
SQL Query
│
▼
Query Planner
│
▼
QueryPlan { tables, filters, ... }
│
▼
Pipeline Builder
│
├─▶ Group filters by column
├─▶ Create PipelineSteps
├─▶ Wire crossbeam channels
└─▶ Build Job
│
▼
Job {
table_name: "users",
steps: Vec<PipelineStep>, // One per column filter
cost: 5, // Number of steps
next_free_slot: AtomicUsize(0),
id: "pipe-xyz...",
entry_producer: Sender<Batch>,
}
│
▼
executor.submit(job)
│
▼
[Execution flow from earlier sections]
Module Location
- Source:
src/executor.rs - Dependencies:
crate::pipeline::Job- Job struct definitioncrate::pool::scheduler::ThreadPool- Worker pool implementationcrossbeam::channel- MPMC channelscrossbeam_skiplist::SkipSet- Concurrent sorted set
- Tests:
src/executor.rs(inline unit tests)
Related Documentation
- Pipeline Builder - Creates Jobs consumed by executor
- Thread Pool Scheduler - ThreadPool implementation details
- Query Planner - Generates QueryPlan
- Architecture Overview - System-wide context