-
-
Notifications
You must be signed in to change notification settings - Fork 147
fix: ensure current slot data goes into current slot file and isn't flushed until end of the slot #1238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
DiskWriter
abstraction to handle write to arrows filesDiskWriter
abstraction to handle write to arrows files
WalkthroughThe changes introduce a new Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
src/parseable/streams.rs (2)
135-136
: Avoid panicking on file creation to ensure graceful error handling.Using
.expect("File and RecordBatch both are checked")
will terminate the program on failure. Consider returning aStagingError
instead of panicking to allow the caller to handle file-creation errors more gracefully.-let mut writer = DiskWriter::new(file_path, &record.schema()) - .expect("File and RecordBatch both are checked"); +let mut writer = match DiskWriter::new(file_path, &record.schema()) { + Ok(dw) => dw, + Err(e) => { + return Err(e); + } +};
361-365
: Consider capturing errors fromDiskWriter::finish()
rather than just draining.Draining the
disk
HashMap relies on theDrop
for eachDiskWriter
to finalize writes, meaning any finalization errors are only logged and not propagated. If you need stronger reliability or post-flush checks, consider a synchronous finish call for each writer so that errors can be handled.src/parseable/staging/writer.rs (2)
45-48
: Make fields private to preserve encapsulation unless they must be public.Having
inner
andpath
as public could allow unwanted external manipulation. If only used internally, consider making them private for better maintainability.-pub struct DiskWriter { - pub inner: StreamWriter<BufWriter<File>>, - pub path: PathBuf, +pub struct DiskWriter { + inner: StreamWriter<BufWriter<File>>, + path: PathBuf, }
63-74
: Surface rename failures more visibly if needed.If renaming the file fails, the error is merely logged. If critical, consider re-trying or signaling the caller. Otherwise, the silent fallback is acceptable for non-blocking logs.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/parseable/staging/writer.rs
(2 hunks)src/parseable/streams.rs
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (7)
src/parseable/streams.rs (1)
61-61
: Good import of the newDiskWriter
.By re-exporting both
DiskWriter
andWriter
from the same module, you keep the code organized while enabling the new abstraction to coexist with the existing logic.src/parseable/staging/writer.rs (6)
22-24
: Import changes for file handling and buffering look good.Using
File
,OpenOptions
, andBufWriter
aligns well with your new approach for disk-based writes.
33-33
: Additional imports for logging, constants, and errors are properly included.Bringing in
tracing::error
,ARROW_FILE_EXTENSION
,adapt_batch
, andStagingError
is consistent with the extended functionality needed for disk-based writes and improved logging.Also applies to: 35-35, 37-37
42-43
: Replacing theHashMap<String, StreamWriter<File>>
withHashMap<String, DiskWriter>
is a solid abstraction step.This change centralizes the on-disk streaming logic in a dedicated struct, making it clearer to track and finalize writes.
59-61
: Delegate writing logic with minimal overhead.Forwarding to
self.inner.write(rb)
is concise. The direct mapping of the error toStagingError
is equally straightforward.
77-80
: Automatic resource finalization on drop is a useful safeguard.Calling
finish()
withinDrop
helps ensure streams are properly closed. Just note that any final error remains in logs without the chance to recover.
50-57
:❓ Verification inconclusive
Be cautious with
append(true)
to avoid data collisions in existing files.Currently, you open files in append mode, which might unintentionally append to a pre-existing file rather than fully replacing it. Verify this matches the intended behavior (e.g., continuing partial batch writes), and consider
truncate(true)
if complete overwrites are desired.
🏁 Script executed:
#!/bin/bash # Ensure DiskWriter is only used for newly created files or handle existing data intentionally. rg -A 3 "DiskWriter::new"Length of output: 325
Attention: Verify File Open Mode Usage in DiskWriter
The current implementation of
DiskWriter::new
in src/parseable/staging/writer.rs (lines 50–57) usesappend(true)
when opening the file. This behavior means that new data will be added to any pre-existing content, which can lead to unintended data collisions if a stale file is present. In src/parseable/streams.rs, we see thatDiskWriter::new
is invoked consistently without any additional safeguards around file state.
- Confirm whether the intent is to continuously write to an existing file (e.g., for accumulating partial batch writes).
- If the design requires a fresh file for each operation, consider using
truncate(true)
to replace any existing content instead of appending.Please review the design intent and update accordingly if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/parseable/staging/writer.rs (1)
73-85
: Consider returning Result from finish method instead of swallowing errorsThe
finish
method currently logs errors but doesn't propagate them, which might hide problems and make debugging difficult. While logging is good, callers might benefit from being able to handle or react to these errors.Additionally, the file renaming operation could fail if the target file already exists - consider adding additional error handling or cleanup to address this edge case.
- pub fn finish(&mut self) { + pub fn finish(&mut self) -> Result<(), std::io::Error> { - if let Err(err) = self.inner.finish() { - error!("Couldn't finish arrow file {:?}, error = {err}", self.path); - return; - } + self.inner.finish().map_err(|err| { + error!("Couldn't finish arrow file {:?}, error = {err}", self.path); + std::io::Error::new(std::io::ErrorKind::Other, err) + })?; let mut arrow_path = self.path.to_owned(); arrow_path.set_extension(ARROW_FILE_EXTENSION); - if let Err(err) = std::fs::rename(&self.path, &arrow_path) { - error!("Couldn't rename file {:?}, error = {err}", self.path); - } + std::fs::rename(&self.path, &arrow_path).map_err(|err| { + error!("Couldn't rename file {:?}, error = {err}", self.path); + err + }) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/staging/writer.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (6)
src/parseable/staging/writer.rs (6)
48-51
: Well-designed DiskWriter struct with appropriate fieldsThe
DiskWriter
struct is well-designed with two essential fields:
inner
: A buffered stream writer for performance optimizationpath
: The file path stored for later operations like renamingThis encapsulation properly separates concerns and provides a cleaner API than directly using
StreamWriter<File>
.
55-66
: Good initialization pattern with proper error handlingThe
try_new
function follows best practices by:
- Converting the input path to a standardized type
- Setting the temporary extension before file creation
- Using appropriate OpenOptions for file creation
- Properly propagating errors with the correct error type
- Using buffered writing for performance
The builder pattern for OpenOptions is used correctly and the function returns a well-structured Result.
68-71
: Simple and effective write methodThis wrapper method correctly maps errors to the appropriate domain-specific error type, maintaining a clean abstraction boundary.
88-92
: Good use of Drop trait for resource cleanupImplementing
Drop
ensures that files are properly finalized even if clients forget to callfinish()
explicitly. This helps prevent resource leaks and incomplete files.Note that if you modify
finish()
to return aResult
as suggested, thedrop
implementation would still need to swallow errors sincedrop
cannot return values.
22-38
: Clean import organization with appropriate dependenciesThe imports have been properly organized and include all necessary components for file handling, buffering, error logging, and file extensions. The reorganization of imports from the crate helps maintain a clean structure.
45-46
: Good abstraction update in Writer structChanging from
HashMap<String, StreamWriter<File>>
toHashMap<String, DiskWriter>
improves the code by encapsulating file handling logic and providing a cleaner API. This change is consistent with the refactoring goal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/staging/writer.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (6)
src/parseable/staging/writer.rs (6)
22-24
: New imports support buffered file operationsThese imports are necessary for the new
DiskWriter
struct's file handling capabilities, providing access to file operations, buffering, and path manipulation.
33-33
: Good addition of error loggingThe
tracing::error
import is appropriately added for logging file operation errors during the cleanup phase in theDrop
implementation.
35-38
: Well-organized imports for file extensionsNice reorganization of imports to bring in the necessary file extension constants that will be used for file naming in the
DiskWriter
implementation.
45-45
: Updated disk field to use new DiskWriter abstractionThe change from
StreamWriter<File>
toDiskWriter
for the disk field is appropriate, reflecting the shift to a more structured approach for file handling.
48-51
: Well-designed DiskWriter structGood design for the new struct that wraps the
StreamWriter
with a buffered writer and keeps track of the file path, enabling proper file management.
53-72
: Clean implementation of DiskWriter creation and writingThe implementation is well-structured with good error handling. The
try_new
method appropriately sets up files with the temporary extension, and thewrite
method correctly maps errors to the appropriate type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
src/parseable/staging/writer.rs (2)
74-93
:⚠️ Potential issueIncomplete error handling in Drop implementation
While the code correctly handles file renaming and checks for existing files, it doesn't clean up partial files if
finish()
fails. This could leave corrupted.part
files in the filesystem.Add cleanup for the part file when
finish()
fails:fn drop(&mut self) { if let Err(err) = self.inner.finish() { error!("Couldn't finish arrow file {:?}, error = {err}", self.path); + // Clean up the corrupted part file + if let Err(clean_err) = std::fs::remove_file(&self.path) { + error!("Failed to clean up corrupted part file {:?}, error = {clean_err}", self.path); + } else { + warn!("Removed corrupted part file {:?}", self.path); + } return; } let mut arrow_path = self.path.to_owned(); arrow_path.set_extension(ARROW_FILE_EXTENSION); if arrow_path.exists() { warn!("File {arrow_path:?} exists and will be overwritten"); } if let Err(err) = std::fs::rename(&self.path, &arrow_path) { error!("Couldn't rename file {:?}, error = {err}", self.path); + // Try to clean up the part file on rename failure + if let Err(clean_err) = std::fs::remove_file(&self.path) { + error!("Failed to clean up part file after rename failure {:?}, error = {clean_err}", self.path); + } } }
89-91
: 🛠️ Refactor suggestionRename error handling could be improved
When renaming fails, the error is logged but the partial file is left behind, potentially causing confusion or disk space issues.
if let Err(err) = std::fs::rename(&self.path, &arrow_path) { error!("Couldn't rename file {:?}, error = {err}", self.path); + // Clean up the part file on rename failure to avoid leaving partial files + if let Err(clean_err) = std::fs::remove_file(&self.path) { + error!("Failed to clean up part file after rename failure {:?}, error = {clean_err}", self.path); + } else { + warn!("Removed part file {:?} after rename failure", self.path); + } }
🧹 Nitpick comments (2)
src/parseable/staging/writer.rs (2)
53-66
: Well-structured initialization methodThe
try_new
method properly handles file creation with appropriate options and error handling. One suggestion would be to add a check for an existing.part
file before opening.pub fn try_new(path: impl Into<PathBuf>, schema: &Schema) -> Result<Self, StagingError> { let mut path = path.into(); path.set_extension(PART_FILE_EXTENSION); + + // Check if a part file already exists - this could indicate a previous failure + if path.exists() { + warn!("Found existing part file {:?}, it will be overwritten", path); + } let file = OpenOptions::new() .write(true) .truncate(true) .create(true) .open(&path)?; let inner = StreamWriter::try_new_buffered(file, schema)?; Ok(Self { inner, path }) }
85-87
: Target file existence check is good but handling could be improvedThe code correctly checks if the target file exists before renaming, but only logs a warning without offering special handling like backup or unique naming.
Consider implementing a more robust strategy for handling existing files:
if arrow_path.exists() { warn!("File {arrow_path:?} exists and will be overwritten"); + // Option 1: Create a backup of the existing file + let backup_path = arrow_path.with_extension(format!("{}.bak", ARROW_FILE_EXTENSION)); + if let Err(err) = std::fs::rename(&arrow_path, &backup_path) { + error!("Failed to backup existing file {:?}, error = {err}", arrow_path); + // Continue with overwrite + } else { + warn!("Created backup of existing file at {:?}", backup_path); + } + + // Option 2: Use a unique name by appending timestamp + // let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); + // arrow_path = arrow_path.with_extension(format!("{}.{}", ARROW_FILE_EXTENSION, now)); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/staging/writer.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (5)
src/parseable/staging/writer.rs (5)
22-24
: Import additions align with new DiskWriter designThe added imports for
File
,OpenOptions
,BufWriter
, andPathBuf
support the newDiskWriter
implementation which encapsulates file I/O operations.
35-38
: Good organization of importsThe reorganized imports improve clarity by grouping related items and explicitly importing the needed constants.
45-45
: HashMap type change reflects new abstractionThe change from
StreamWriter<File>
toDiskWriter
properly encapsulates the Arrow file writing functionality.
48-51
: Good encapsulation in DiskWriter structThe
DiskWriter
struct effectively encapsulates both the writer and file path, facilitating proper cleanup on drop.
68-71
: Straightforward write implementationThe
write
method is concise and maps errors appropriately to the StagingError type.
DiskWriter
abstraction to handle write to arrows filesThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/utils/time.rs (1)
265-282
: The granularity_range method implementation looks good.This method correctly calculates the time range for a given timestamp based on the specified data granularity. It ensures that seconds and nanoseconds are set to 0, and properly computes the start and end times of the containing block.
However, consider adding validation for the
data_granularity
parameter to handle edge cases like 0 or negative values.pub fn granularity_range(timestamp: DateTime<Utc>, data_granularity: u32) -> Self { + assert!(data_granularity > 0, "data_granularity must be positive"); let time = timestamp .time() .with_second(0) .and_then(|time| time.with_nanosecond(0)) .expect("Within expected time range"); let timestamp = timestamp.with_time(time).unwrap(); let block_n = timestamp.minute() / data_granularity; let block_start = block_n * data_granularity; let start = timestamp .with_minute(block_start) .expect("Within minute range"); let end = start + TimeDelta::minutes(data_granularity as i64); Self { start, end } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/utils/time.rs
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (3)
src/utils/time.rs (3)
284-287
: LGTM! The contains method is well-implemented.This method correctly checks if a timestamp falls within the time range using a half-open interval [start, end), which is the standard approach for time range checks.
349-349
: LGTM! Import addition is necessary.The addition of
TimeZone
to the imports is necessary for the new test methods that useUtc.with_ymd_and_hms()
.
540-616
: Comprehensive test coverage for granularity_range.These tests thoroughly validate the
granularity_range
method across different granularity values (1, 5, 15, 30 minutes) and edge cases. The test cases provide good coverage for boundary conditions and hour transitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
Fixes #1240
Summary by CodeRabbit
New Features
TimeRange
functionality with methods for generating granular time ranges and checking timestamp containment.Refactor