Streaming Query Flow
What this page is
- A walk-through of the new streaming read path, from command dispatch to shard merges.
- Enough detail to map runtime behavior to the coordinating and shard-side code.
When we stream
- Plain
QUERYcommands including aggregations, grouping, and time buckets (aggregate queries always use streaming). - Sequence queries use a specialized streaming path.
- Triggered by the HTTP/TCP/WebSocket
QUERYhandler when the caller asks for streaming (e.g. client-side backpressure, large scans where batching the whole result is impractical). - Falls back to the existing batch pipeline only for non-aggregate selection queries when streaming is disabled or unavailable.
Pipeline overview
-
Coordinator (
QueryExecutionPipeline::execute_streaming)- Planner builds a
PlanOutcomeexactly like the batch path (zone picks, per-shard filters, limits). StreamingShardDispatcherfans out aShardMessage::QueryStreamto every shard, bundling the plan fragments.
- Planner builds a
-
Shard execution (
scan_streaming→StreamingScan)- Each shard rebuilds a
QueryPlan, then initializes aStreamingContext(plan, passive snapshot, caches,FlowContext, effective limit =limit + offset). FlowBuildersproduces up to two flows:memtable_flowwraps the active memtable plus passive buffers viaMemTableQueryRunner::stream.segment_flowcallsbuild_segment_streamto launch a backgroundSegmentQueryRunnerstreaming columnar batches.
ShardFlowMergerfuses those flows. If the command carries anORDER BY, it spawns an ordered heap merge; otherwise, it fan-ins the channels. The result is aShardFlowHandle(receiver + schema + background tasks).
- Each shard rebuilds a
-
Coordinator merge & delivery
- The dispatcher hands the
ShardFlowHandles to the merge layer (StreamMergerKind). - For aggregate queries:
AggregateStreamMergercollects partial aggregate batches from all shards, merges them by group key, applies ORDER BY/LIMIT/OFFSET, and emits finalized results. OrderedStreamMergeruses the flow-level ordered merger to respectORDER BY field [DESC], honouringLIMIT/OFFSETat the coordinator (for non-aggregate queries).UnorderedStreamMergerforwards batches as they arrive when no ordering is requested (for non-aggregate queries).QueryBatchStreamwraps the merged receiver. Dropping it aborts all shard/background tasks to avoid leaks.
- The dispatcher hands the
Where to look in code
- Coordinator entry:
src/command/handlers/mod.rs,query/orchestrator.rs(execute_streaming). - Dispatch:
src/command/handlers/query/dispatch/streaming.rs. - Merge:
src/command/handlers/query/merge/streaming.rs,src/command/handlers/query/merge/aggregate_stream.rs,query_batch_stream.rs. - Shard message + worker:
src/engine/shard/message.rs,src/engine/shard/worker.rs. - Shard read pipeline:
src/engine/query/streaming/{scan.rs,context.rs,builders.rs,merger.rs}. - Flow primitives (channels, batches, ordered merge):
src/engine/core/read/flow/(notablycontext.rs,channel.rs,ordered_merger.rs,shard_pipeline.rs).
Operational notes
- Aggregate queries always use the streaming path - they cannot fall back to batch execution. Each shard produces partial aggregates via
AggregateOpthat are merged at the coordinator usingAggregateStreamMerger. - AVG aggregations preserve sum and count throughout the streaming pipeline, ensuring accurate merging across shards/segments. The average is only finalized at the coordinator when emitting results.
- COUNT UNIQUE aggregations preserve the actual unique values (as JSON array strings) throughout the streaming pipeline, ensuring accurate merging across shards/segments. The count is only finalized at the coordinator when emitting results.
StreamingContextsnapshots passive buffers at creation; long-lived streams do not see newer passive flushes until a new stream is opened.- Flow channels are bounded (default 32k rows per batch) to provide natural backpressure; coordinator-side consumers should
recvpromptly. - If any shard fails while constructing the stream, the dispatcher surfaces a shard-specific error and aborts the entire streaming request.