Skip to content

feat: streaming response #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 17, 2025

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented May 16, 2025

use datafusion's execute_stream function that sends streaming response
use query param streaming=true to get the streaming response
defaulted to false, for prism or other clients to work as usual

query response (with streaming=true) sends multiple batches one after the other split by new line character

Summary by CodeRabbit

  • New Features

    • Introduced streaming query support, allowing clients to receive query results as a stream.
    • Added a new query parameter to enable streaming mode for queries.
  • Refactor

    • Improved separation of query handling for count, streaming, and non-streaming queries.
    • Enhanced modularity by moving logic into specialized helper functions.
    • Updated query response handling to return JSON directly without HTTP-specific wrappers.
  • Bug Fixes

    • Improved error handling for streaming queries, ensuring errors are logged and handled gracefully.
    • Standardized error handling in flight and livetail handlers for consistent error propagation.
  • Chores

    • Updated dependencies to include the latest async-related crate.

Copy link
Contributor

coderabbitai bot commented May 16, 2025

Walkthrough

The changes introduce support for streaming query execution by refactoring the query handler into specialized async functions for count, non-streaming, and streaming queries. A new streaming flag is added to the Query struct and extracted from requests. The query module's execute functions now return either batch results or streaming results. The response module is updated to return JSON values directly instead of HTTP responses with timing headers. The dependency list is updated to include futures-core for async streaming support. Additional error handling improvements include boxing gRPC Status errors in several modules.

Changes

File(s) Change Summary
Cargo.toml Added futures-core crate dependency (version "0.3.31") alongside other async crates.
src/handlers/http/query.rs Refactored query handler into three async functions for count, non-streaming, and streaming queries. Added streaming: bool field to Query struct with extraction from requests. Implemented streaming response returning NDJSON with optional metadata and timing headers.
src/query/mod.rs Modified execute function and method to support batch or streaming execution controlled by is_streaming flag. Return type changed to Either<Vec<RecordBatch>, SendableRecordBatchStream>. Imported Either and SendableRecordBatchStream.
src/response.rs Removed total_time field from QueryResponse. Renamed to_http method to to_json returning JSON Value instead of HTTP response, removing timing headers and HTTP construction.
src/catalog/mod.rs Simplified error creation in create_manifest by replacing explicit IOError::new with IOError::other. Removed unused import of ErrorKind.
src/handlers/airplane.rs Improved error handling in do_get by mapping errors to gRPC Status variants and unwrapping boxed errors in the returned stream. Added false argument to execute call.
src/handlers/livetail.rs Changed error handling to use boxed Status errors in helper functions and unwrap them before returning. Updated function signatures accordingly.
src/utils/arrow/flight.rs Updated function signatures to return boxed Status errors instead of direct Status. Adjusted error mapping to box errors.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTPHandler
    participant QueryModule
    participant ResponseModule

    Client->>HTTPHandler: Send query (with/without streaming flag)
    HTTPHandler->>HTTPHandler: Parse and extract streaming flag
    alt Count Query
        HTTPHandler->>QueryModule: Execute count query
        QueryModule-->>HTTPHandler: Return count result
        HTTPHandler->>ResponseModule: Format count response
        ResponseModule-->>HTTPHandler: JSON value
        HTTPHandler->>Client: Return JSON response
    else Non-Streaming Query
        HTTPHandler->>QueryModule: Execute batch query
        QueryModule-->>HTTPHandler: Return record batches
        HTTPHandler->>ResponseModule: Format batch response
        ResponseModule-->>HTTPHandler: JSON value
        HTTPHandler->>Client: Return JSON response
    else Streaming Query
        HTTPHandler->>QueryModule: Execute streaming query
        QueryModule-->>HTTPHandler: Return record batch stream
        loop For each batch
            HTTPHandler->>ResponseModule: Format batch to JSON
            ResponseModule-->>HTTPHandler: JSON value
            HTTPHandler->>Client: Stream JSON batch
        end
    end
Loading

Poem

In the warren of code, a new stream flows,
Queries now hop where the streaming wind blows.
Count, batch, or a river of rows,
Each finds its path as the clever code grows.
With futures and streams, the rabbits delight—
JSON carrots for all, morning, noon, and night!
🥕💻

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
src/response.rs (1)

34-60: Method name now misleading & header constant unused inside file

to_http no longer creates an HttpResponse; it only returns a serde_json::Value.
Consider renaming it (e.g. to_json) or updating the doc-comments to avoid confusion.

Additionally, TIME_ELAPSED_HEADER is still declared here but not referenced in this module after the refactor.
If the constant is only used by HTTP-layer code (handlers/http/query.rs), move it there to keep responsibilities isolated.

src/query/mod.rs (1)

196-214: Minor memory churn – clone of fields per batch

execute_stream already copies the Vec<String> once; in the HTTP layer each batch clones it again.
If many small batches are emitted this becomes noticeable. Returning Arc<Vec<String>> or just a shared Arc<[String]> avoids the per-batch allocation:

-pub async fn execute_stream(... ) -> Result<(SendableRecordBatchStream, Vec<String>), _> {
+pub async fn execute_stream(... ) -> Result<(SendableRecordBatchStream, Arc<Vec<String>>), _> {
src/handlers/http/query.rs (2)

116-149: handle_count_query repeats field list on every call site – could reuse QueryResponse

The manual JSON building duplicates logic already present in QueryResponse::to_http.
Not blocking, but using the same helper would reduce divergence:

-let response = if query_request.fields {
-    json!({
-        "fields": [column_name],
-        "records": [json!({column_name: count})]
-    })
-} else {
-    serde_json::Value::Array(vec![json!({column_name: count})])
-};
+let response = QueryResponse {
+    records: vec![],           // fake batch converted below
+    fields: vec![column_name.to_owned()],
+    fill_null: false,
+    with_fields: query_request.fields,
+}.to_http()?; // after pushing one synthetic RecordBatch

288-291: URL-param parsing only works for streaming=true/false – accept upper-case?

The web::Query<HashMap<String, bool>> deserializer is case-sensitive; "TRUE" will fail.
Consider normalising manually or document exact expected values.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between 749a16f and 2f00ad5.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • Cargo.toml (1 hunks)
  • src/handlers/http/query.rs (6 hunks)
  • src/query/mod.rs (3 hunks)
  • src/response.rs (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/handlers/http/query.rs (1)
src/query/mod.rs (4)
  • execute (70-79)
  • execute (171-194)
  • execute_stream (81-90)
  • execute_stream (197-214)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (6)
Cargo.toml (1)

124-124:

✅ Verification successful

Potential duplicate crate – align futures-core patch-version with futures to avoid multiple copies

futures already re-exports the same public traits from its own futures-core dependency.
Pulling futures-core = "0.3.31" explicitly while futures = "0.3" (currently 0.3.30) is resolved may lead to two distinct versions of the futures_core crate in the final binary, increasing size and compile time.

Two safer options:

- futures-core = "0.3.31"
+# Option A – rely on transitive dep (simplest)
+# – remove the explicit line above
+
+# Option B – keep explicit but stay semver-loose so Cargo can unify:
+futures-core = "0.3"

If you really need the explicit import (e.g. for feature flags) pin it to "0.3" so Cargo can pick a single patch level.

You can confirm the duplication with:


🏁 Script executed:

#!/bin/bash
cargo tree -e features -i futures-core

Length of output: 136


To verify the resolved versions and detect any version skew between futures and futures-core, let’s inspect your Cargo.toml and Cargo.lock:


🏁 Script executed:

#!/bin/bash
# Check explicit futures and futures-core entries in Cargo.toml
grep -R '^futures\s*=' -n Cargo.toml
grep -R '^futures-core\s*=' -n Cargo.toml

# If a lockfile exists, list all resolved futures-core versions
if [ -f Cargo.lock ]; then
  grep -R '^name = "futures-core"' -A2 -n Cargo.lock
else
  echo "No Cargo.lock found"
fi

Length of output: 327


To confirm that futures and futures-core are both resolving to the same patch, let’s check the resolved version of futures in your lockfile:


🏁 Script executed:

#!/bin/bash
grep -R '^name = "futures"$' -A2 -n Cargo.lock

Length of output: 164


No duplicate futures-core crate—patch versions already aligned

Verified in Cargo.lock that both futures = "0.3" and your explicit futures-core = "0.3.31" resolve to 0.3.31, so Cargo will dedupe them. You can safely:

  • Remove the explicit futures-core = "0.3.31" line if you don’t need its feature flags and rely on the transitive dep.
  • Or keep it as-is (it matches the transitive version, so no duplicate crate will be pulled).
src/query/mod.rs (1)

81-90: Returning a stream captured from another runtime may dead-lock in edge cases

execute_stream spawns query.execute_stream() on QUERY_RUNTIME and then returns the SendableRecordBatchStream to the caller.
The stream is subsequently polled on Actix’s Tokio runtime (not on QUERY_RUNTIME). If DataFusion’s stream internally assumes the same runtime that created it (e.g. for cooperative scheduling or object-store IO) this separation could stall or panic under load.

At minimum, document the assumption or add an integration test that exercises large multi-batch queries over S3/GCS.
An alternative is to not spawn here – the heavy lifting happens when the stream is polled, so the extra spawn gives no benefit and avoids the cross-runtime hop:

- QUERY_RUNTIME
-     .spawn(async move { query.execute_stream(time_partition.as_ref()).await })
-     .await
-     .expect("The Join should have been successful")
+query.execute_stream(time_partition.as_ref()).await
src/handlers/http/query.rs (4)

66-68: streaming flag never accepted from JSON – confirm intended

#[serde(skip)] means JSON bodies can’t enable streaming; only the URL param ?streaming=true is honoured.
If that’s intentional (and it probably is), add a doc-comment so future contributors don’t try to set it in the body.


105-114: Branching order reads fine – but is_logical_plan_count_without_filters runs before streaming check

So a SELECT COUNT(*) …&streaming=true request will still take the optimized count path and not stream.
That’s likely desirable; just flagging for awareness.


151-174: Content-type for non-streaming path ok

Nothing major; metrics & headers preserved.


355-356: transform_query_for_ingestor forwards streaming flag – is that required?

Ingestor paths currently ignore streaming; passing the flag risks surprising behaviour if ingestors start honouring it later.
If you only need non-streamed results for ingestion, force streaming = false.

coderabbitai[bot]
coderabbitai bot previously approved these changes May 16, 2025
coderabbitai[bot]
coderabbitai bot previously approved these changes May 16, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
src/handlers/http/query.rs (1)

304-307: Duplicate error handling should be consistent

This is the same error handling pattern as in the other branch of the conditional - both should be updated consistently.

🧹 Nitpick comments (1)
src/handlers/http/query.rs (1)

261-314: Reduce code duplication in streaming handler

The two branches of the conditional have significant duplication in how they process record batches.

Consider extracting the common record batch handling logic into a separate function:

+ fn create_batch_processor(
+     send_null: bool, 
+     with_fields: bool, 
+     fields: Vec<String>
+ ) -> impl FnMut(Result<RecordBatch, ExecuteError>) -> Result<Bytes, actix_web::Error> {
+     move |batch_result| match batch_result {
+         Ok(batch) => {
+             let response = QueryResponse {
+                 records: vec![batch],
+                 fields: if with_fields { Vec::new() } else { fields.clone() },
+                 fill_null: send_null,
+                 with_fields,
+             }
+             .to_json()
+             .unwrap_or_else(|e| {
+                 error!("Failed to parse record batch into JSON: {}", e);
+                 json!({"error": format!("Failed to serialize batch: {}", e)})
+             });
+             Ok(Bytes::from(format!("{}\n", response)))
+         }
+         Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
+     }
+ }

Then use this helper in both branches.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between e60a183 and 4731d40.

📒 Files selected for processing (3)
  • src/handlers/airplane.rs (3 hunks)
  • src/handlers/http/query.rs (6 hunks)
  • src/query/mod.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/handlers/airplane.rs
  • src/query/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (6)
src/handlers/http/query.rs (6)

68-68: Good addition of streaming support to the Query struct

The streaming field properly uses #[serde(skip)] since it's extracted from query parameters rather than the request body.


105-123: Clean refactoring of query execution paths

The main query function now logically branches to specialized handlers for count queries, non-streaming, and streaming queries, improving code organization and readability.


125-172: Well-documented count query optimization

The specialized count query handler is well-documented and appropriately reuses the CountsRequest::get_bin_density method instead of executing the full query plan.


174-219: Well-structured non-streaming query handler

The non-streaming query handler properly validates the response type using the Either pattern matching and correctly handles timing metrics.


317-317: Good use of appropriate MIME type for NDJSON

Using the correct content type application/x-ndjson for newline-delimited JSON responses follows standards and helps clients properly interpret the streaming data.


391-393: Clean implementation of streaming parameter extraction

The implementation properly extracts the streaming parameter from query parameters and defaults to false when not present, ensuring backward compatibility.

Comment on lines 242 to 260
let (records_stream, fields) = execute(query, table_name, query_request.streaming).await?;
let records_stream = match records_stream {
Either::Left(_) => {
return Err(QueryError::MalformedQuery(
"Expected stream results, got batch",
))
}
Either::Right(stream) => stream,
};
let fields = fields.clone();
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.with_label_values(&[table_name])
.observe(time);

Ok(response)
let send_null = query_request.send_null;
let with_fields = query_request.fields;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify potential streaming performance characteristics

The streaming implementation looks good, but consider testing with large datasets to verify performance.


🏁 Script executed:

#!/bin/bash
# Check for any related DataFusion stream configuration settings
rg -A 3 -B 3 "stream|batch_size|chunk_size" --type rust

# Look for any existing performance benchmarks
fd -i bench -e rs -e toml

Length of output: 65621


Add streaming performance benchmarks

The DataFusion streaming path in src/handlers/http/query.rs delegates to execute_stream() with batch size controlled by
PARSEABLE.options.execution_batch_size in src/query/mod.rs, but I didn’t find any existing benchmarks under benches/ or anywhere else. To ensure streaming queries hold up on large datasets, please:

  • Create a benchmark suite (for example benches/streaming_performance.rs) using Criterion.rs or Rust’s built-in benchmarking.
  • Measure end-to-end streaming latency and throughput with large synthetic datasets.
  • Tune execution_batch_size (default 20000) and any other relevant parameters based on your findings.
🤖 Prompt for AI Agents
In src/handlers/http/query.rs around lines 242 to 260, add a new benchmark suite
file, for example benches/streaming_performance.rs, using Criterion.rs or Rust's
built-in benchmarking framework. Implement tests that measure end-to-end
streaming latency and throughput on large synthetic datasets to evaluate the
performance of the DataFusion streaming path. Adjust the execution_batch_size
parameter and other relevant settings in src/query/mod.rs based on benchmark
results to optimize streaming performance.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/handlers/http/query.rs (1)

295-315: Error handling could be more informative

While the error handling is improved from the previous implementation, consider adding more context to error messages to help with debugging streaming issues.

-                actix_web::error::ErrorInternalServerError(e)
+                actix_web::error::ErrorInternalServerError(format!("Failed to serialize batch: {}", e))
-        Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
+        Err(e) => {
+            error!("Error processing record batch in stream: {}", e);
+            Err(actix_web::error::ErrorInternalServerError(e))
+        },
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between 4731d40 and ae92749.

📒 Files selected for processing (1)
  • src/handlers/http/query.rs (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (9)
src/handlers/http/query.rs (9)

56-56: Good use of constant for header name!

Using a constant for the header name improves maintainability and ensures consistency across the codebase.


69-69: Appropriate use of #[serde(skip)] for the streaming flag

The streaming flag is correctly marked with #[serde(skip)] as it's derived from query parameters rather than being part of the serialized JSON body.


107-124: Clean separation of concerns with specialized handlers

Good refactoring by dividing the query handling into specialized functions based on query type and streaming mode. This improves readability and maintainability.


126-173: Well-documented count query handler with optimization

The count query handler is well-documented and properly optimized by using get_bin_density instead of running the full query.


175-220: Non-streaming handler appropriately handles batch results

The non-streaming handler correctly extracts batch results from the Either type and ensures that streaming results aren't accidentally processed in the wrong path.


222-293: Streaming implementation follows best practices

The streaming implementation correctly uses NDJSON format with the proper MIME type and only sends field metadata once as the first chunk when requested.


290-291: Good use of NDJSON content type

Using "application/x-ndjson" for the content type correctly indicates the newline-delimited JSON format to clients.


385-387: Default streaming to false for backward compatibility

Correctly defaults streaming to false when not specified, which maintains backward compatibility with existing clients.


452-452: Properly propagates streaming flag in transform function

The streaming flag is correctly propagated when transforming queries for the ingestor.

@nitisht nitisht merged commit c5aaed0 into parseablehq:main May 17, 2025
14 checks passed
@nikhilsinhaparseable nikhilsinhaparseable deleted the streaming-response branch May 17, 2025 05:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants