-
-
Notifications
You must be signed in to change notification settings - Fork 138
feat: query parquet files still in staging #1199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes enhance staging data processing in the query module by introducing an asynchronous method Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant STP as StandardTableProvider
participant SA as StagingArea
participant M as MemoryTable
participant PP as ParquetPhysicalPlan
C->>STP: Call get_staging_execution_plan(filters, projection, etc.)
STP->>SA: Retrieve staging stream
SA-->>STP: Return staging records
STP->>M: Build reversed memory table
M-->>STP: Reversed memory table
STP->>PP: Create parquet physical plan
PP-->>STP: Return execution plan
STP->>C: Respond with execution plan
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
⏰ Context from checks skipped due to timeout of 90000ms (10)
🔇 Additional comments (4)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/query/stream_schema_provider.rs (1)
225-267
: Well-implemented staging data handling with good error management.The implementation:
- Gracefully handles missing streams and file metadata errors
- Efficiently partitions files across CPU cores
- Uses reversed memory table for optimized querying
However, consider adding the following improvements:
- Add a size limit or cleanup mechanism for the staging area to prevent unbounded memory growth
- Consider implementing batch processing for large staging areas
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/query/stream_schema_provider.rs
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (2)
src/query/stream_schema_provider.rs (2)
19-70
: LGTM! Import changes are well-organized.The new imports are necessary for the added functionality and are properly organized by crate.
488-498
: LGTM! Scan method changes properly integrate staging data.The changes effectively integrate staging data querying while maintaining backward compatibility and following existing error handling patterns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/query/stream_schema_provider.rs (1)
256-258
: Consider adding retry logic for parquet file access.There's a potential race condition where parquet files might be moved to object store between listing and access. The comment acknowledges this, but we should consider implementing retry logic.
Consider implementing a retry mechanism with exponential backoff for file access operations.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/query/stream_schema_provider.rs
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (2)
src/query/stream_schema_provider.rs (2)
19-70
: LGTM! Well-organized imports.The imports are logically grouped and include all necessary dependencies for the new staging query functionality.
493-502
: LGTM! Clean integration of staging execution plan.The integration of
get_staging_execution_plan
into the scan method is well-implemented and maintains consistent error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/query/stream_schema_provider.rs (1)
241-250
: Extract partitioning logic into a separate function.The partitioning logic could be reused elsewhere and would benefit from being a separate function.
Consider extracting the partitioning logic:
+fn partition_files<T>(files: impl Iterator<Item = T>, target_partition: usize) -> Vec<Vec<T>> { + let mut partitioned = Vec::from_iter((0..target_partition).map(|_| Vec::new())); + for (index, file) in files.enumerate() { + partitioned[index % target_partition].push(file); + } + partitioned +} // In get_staging_execution_plan: -let target_partition = num_cpus::get(); -let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); -for (index, file_path) in staging.parquet_files().into_iter().enumerate() { - let Ok(file_meta) = file_path.metadata() else { - continue; - }; - let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len()); - partitioned_files[index % target_partition].push(file) -} +let partitioned_files = partition_files( + staging.parquet_files().into_iter().filter_map(|file_path| { + file_path.metadata().ok().map(|meta| + PartitionedFile::new(file_path.display().to_string(), meta.len()) + ) + }), + num_cpus::get() +);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/query/stream_schema_provider.rs
(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/query/stream_schema_provider.rs (2)
235-235
: Add error handling for recordbatches_cloned.The
recordbatches_cloned
call could potentially fail, but the error is not being handled.
230-232
: LGTM! Graceful error handling.The early return with
Ok(())
when the stream is not found is a clean approach that prevents unnecessary processing.
@de-sh in case of multiple parquet files present in staging, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/query/stream_schema_provider.rs (1)
236-236
:⚠️ Potential issueAdd error handling for recordbatches_cloned.
The
recordbatches_cloned
call could potentially fail, but the error is not being handled.Apply this diff:
-let records = staging.recordbatches_cloned(&self.schema); +let records = staging.recordbatches_cloned(&self.schema) + .map_err(|e| DataFusionError::External(Box::new(e)))?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/query/stream_schema_provider.rs
(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
src/query/stream_schema_provider.rs
Outdated
for (index, file_path) in staging.parquet_files().into_iter().enumerate() { | ||
let Ok(file_meta) = file_path.metadata() else { | ||
continue; | ||
}; | ||
let file = PartitionedFile::new(file_path.display().to_string(), file_meta.len()); | ||
partitioned_files[index % target_partition].push(file) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Consider sorting parquet files by timestamp.
Based on the user feedback about query results not including records from the most recently created parquet file, the files should be sorted by timestamp before partitioning to ensure consistent query results.
Let's verify if there's a timestamp field in the parquet files that we can use for sorting:
🏁 Script executed:
#!/bin/bash
# Description: Check for timestamp fields in parquet files
# Look for timestamp-related field definitions in the code
rg -A 5 'timestamp|time|_at' --type rust
Length of output: 71541
Action: Sort Parquet Files by Timestamp
It appears that the current loop in src/query/stream_schema_provider.rs
(lines 245–251) enumerates the parquet files without any ordering. Given the user feedback about recent records being missed, we recommend sorting these files by their creation or modification timestamp before partitioning. Note that elsewhere in the repository (e.g., through usage of DEFAULT_TIMESTAMP_KEY
and timestamp fields in the schema), there is clear support for time-based ordering.
- Location:
src/query/stream_schema_provider.rs
, lines 245–251. - Issue: The parquet files are partitioned using their original order (via
enumerate()
and modulo assignment), which may inadvertently skip the most recently created file. - Recommendation: Sort the parquet files—using a timestamp extracted from file metadata (e.g., by calling
file_path.metadata()?.modified()
) or an equivalent field—prior to applying the partitioning logic. Make sure to handle potential errors when retrieving the timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
src/query/stream_schema_provider.rs (2)
236-236
: 🛠️ Refactor suggestionAdd error handling for
recordbatches_cloned
.
This call may fail, and the code currently ignores that possibility.-let records = staging.recordbatches_cloned(&self.schema); +let records = staging.recordbatches_cloned(&self.schema) + .map_err(|e| DataFusionError::External(Box::new(e)))?;
242-245
: 🛠️ Refactor suggestionSort by modification time rather than filename, if possible.
Relying on reversed filename order might skip the newest file, per user feedback.- parquet_files.sort_by(|a, b| a.cmp(b).reverse()); + // Example approach using modification times: + parquet_files.sort_by_key(|f| f.metadata().and_then(|m| m.modified()).ok()); + parquet_files.reverse();
🧹 Nitpick comments (1)
src/query/stream_schema_provider.rs (1)
221-230
: Fix minor spelling in documentation.
“exection” should be spelled “execution.”- /// Create an exection plan over the records in arrows... + /// Create an execution plan over the records in arrows...
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/query/stream_schema_provider.rs
(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (13)
src/query/stream_schema_provider.rs (13)
19-20
: No issues found.
Imports look consistent with the rest of the file.
26-26
: No issues found.
The addition ofSession
appears appropriate for upcoming usage.
28-28
: No issues found.
ThePrecision
import is relevant for column statistics usage.
40-43
: No issues found.
These new imports (BinaryExpr
,Operator
, etc.) match the DataFusion expressions used below.
64-65
: No issues found.
Inclusion ofMode
andSTREAM_EXISTS
aligns with the code’s logic.
231-233
: Consider returning an error or logging a warning when the stream doesn't exist.
Currently, this block silently returnsOk(())
. Confirm whether ignoring a missing staging stream is intended.
246-255
: No issues found.
Skipping inaccessible files aligns with the ephemeral nature of staging.
257-272
: No issues found.
This logic correctly callscreate_parquet_physical_plan
to handle any staged parquet files.
328-328
: No issues found.
Changing the parameter type toVec<File>
is consistent with the updated references.
332-332
: No issues found.
Storing column statistics in aHashMap
remains a clear approach.
340-340
: No issues found.
DeconstructingFile
matches the rest of the changes in this refactor.
407-412
: No issues found.
Adjusting the signature to returnVec<File>
aligns with the type refactor.
898-898
: No issues found.
The reference toColumn
is consistent with the updated imports.
Signed-off-by: Devdutt Shenoi <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #XXXX.
Description
Currently parquet files that are still lying around in staging are not query-able, this PR fixes it.
This PR has:
Summary by CodeRabbit
New Features
Refactor