Skip to content

feat: merge finish .arrows and convert to .parquet #1200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 25, 2025

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Feb 20, 2025

Fixes #XXXX.

Description

Currently we have two parallel tasks where we first finish .arrows files and then in another, 5s later, we compact them into .parquet, with this PR we will do both tasks, one after the other and ensure that arrows are not lost due to missed sync and thus don't have to wait for the next minute.

This PR also parallelizes localsync, ensuring each stream gets flushed onto disk and compacted from arrow to parquet in a parallel fashion.


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • Refactor
    • Improved time interval management for synchronization by redefining constants for better type safety.
    • Streamlined the synchronization workflow with enhanced asynchronous processing for data conversion tasks.
    • Removed redundant constants and updated synchronization methods for clarity and efficiency.
    • Consolidated import statements for better organization and readability.
    • Enhanced error handling and logging during the shutdown process.
    • Removed the flush_all_streams method, optimizing memory management during data processing.
  • Bug Fixes
    • Removed unused constants that may have caused confusion in data granularity handling.

Copy link
Contributor

coderabbitai bot commented Feb 20, 2025

Walkthrough

The changes refactor time interval constants by replacing raw numerical values with Rust’s Duration type for improved clarity and type safety. In particular, constants in lib.rs have been updated while legacy declarations in storage/mod.rs have been removed. Import statements in related modules have been adjusted to reflect these changes. The synchronization workflow in sync.rs has been simplified by renaming and consolidating functions, along with updating task duration monitoring and removing redundant error handling. The shutdown process has been enhanced with asynchronous task management.

Changes

File(s) Change Summary
src/lib.rs, src/storage/mod.rs Constants Refactor: Removed STORAGE_CONVERSION_INTERVAL and the old STORAGE_UPLOAD_INTERVAL; redefined STORAGE_UPLOAD_INTERVAL to use Duration; added LOCAL_SYNC_INTERVAL (Duration) and OBJECT_STORE_DATA_GRANULARITY (derived from LOCAL_SYNC_INTERVAL); removed legacy constants from storage/mod.rs.
src/parseable/streams.rs, src/query/listing_table_builder.rs Import Adjustments: Removed OBJECT_STORE_DATA_GRANULARITY from the import list in streams.rs and consolidated import statements in listing_table_builder.rs, adding the OBJECT_STORE_DATA_GRANULARITY import for consistency with the refactored constants.
src/sync.rs Sync Workflow Update: Removed the run_local_sync function and replaced it with local_sync, simplified error handling for remote conversion, and updated the invocation of task duration monitoring by using the new STORAGE_UPLOAD_INTERVAL directly.
src/handlers/http/health_check.rs, src/handlers/http/modal/mod.rs Shutdown Process Improvement: Updated the shutdown function to use JoinSet for managing asynchronous tasks, replacing the previous stream flushing method. Removed the S3 synchronization logic from the init method in modal/mod.rs, affecting data persistence during shutdown.
src/parseable/mod.rs Method Removal: Removed the flush_all_streams method from the Parseable struct, which was responsible for flushing streams to disk.

Suggested reviewers

  • nikhilsinhaparseable
  • parmesant

Poem

I'm a bunny with a code delight,
Hopping through intervals, shining bright.
New Durations lead my sync on a spree,
Constants prance in a type-safe marquee.
With each hop, my code sings in pure rabbit glee!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/lib.rs (1)

66-67: Consider adding more context to the documentation.

While the constant's purpose is documented, it would be helpful to explain why the granularity is calculated by dividing by 60.

-/// Duration used to configure prefix generation.
+/// Duration used to configure prefix generation.
+/// The granularity is calculated in minutes by dividing the sync interval by 60 seconds.
src/sync.rs (1)

74-76: Update the function documentation.

The documentation still references the old STORAGE_CONVERSION_INTERVAL which has been removed.

-/// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every
-/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
+/// Flushes arrows onto disk and packs them into parquet files every `LOCAL_SYNC_INTERVAL` seconds,
+/// and uploads them to object store every `STORAGE_UPLOAD_INTERVAL` seconds.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e3577f and 6f322fc.

📒 Files selected for processing (5)
  • src/lib.rs (1 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/query/listing_table_builder.rs (1 hunks)
  • src/storage/mod.rs (0 hunks)
  • src/sync.rs (5 hunks)
💤 Files with no reviewable changes (1)
  • src/storage/mod.rs
✅ Files skipped from review due to trivial changes (2)
  • src/parseable/streams.rs
  • src/query/listing_table_builder.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (6)
src/lib.rs (2)

62-64: LGTM! Well-documented constant with improved type safety.

The change from raw integers to Duration type enhances type safety and readability. The documentation clearly explains the purpose of the constant.


69-70: LGTM! Clear documentation and type-safe constant.

The constant is well-documented and uses the Duration type for improved type safety.

src/sync.rs (4)

29-29: LGTM! Clean import of new Duration constants.

The imports align with the new type-safe constants defined in lib.rs.


78-79: LGTM! Simplified synchronization setup.

The code now uses a cleaner approach by combining arrow flushing and parquet conversion into a single local_sync operation.


121-121: LGTM! Direct usage of Duration constant.

Using STORAGE_UPLOAD_INTERVAL directly improves code clarity by removing unnecessary Duration::from_secs() conversion.


171-192: LGTM! Streamlined local sync implementation.

The new implementation successfully merges the .arrows finish and .parquet conversion into a single sequential operation, which aligns with the PR objectives. This change helps prevent potential data loss from synchronization issues.

Key improvements:

  • Clear separation of concerns with distinct logging for each step
  • Sequential execution of flush and conversion operations
  • Proper error handling and monitoring

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 20, 2025
Signed-off-by: Devdutt Shenoi <[email protected]>
coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 21, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/lib.rs (1)

66-67: Improve documentation and consider type safety.

The documentation could be more descriptive about how this constant affects prefix generation. Additionally, the type conversion from u64 to u32 could potentially truncate large values.

Consider:

  1. Expanding the documentation to explain:
    • What is prefix generation?
    • Why is it needed?
    • Why is it based on LOCAL_SYNC_INTERVAL?
  2. Adding a runtime check to ensure the value fits within u32:
-pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60;
+pub const OBJECT_STORE_DATA_GRANULARITY: u32 = {
+    let secs = LOCAL_SYNC_INTERVAL.as_secs();
+    assert!(secs <= u32::MAX as u64 * 60, "LOCAL_SYNC_INTERVAL too large");
+    (secs / 60) as u32
+};
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e3121ea and 281a1a4.

📒 Files selected for processing (1)
  • src/lib.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/lib.rs (2)

62-64: LGTM! Well-documented constant with clear purpose.

The constant effectively combines the arrows finishing and parquet conversion intervals into a single workflow, which aligns with the PR objectives to prevent data loss from synchronization issues.


69-70: LGTM! Improved type safety with Duration.

The constant has been updated as suggested in past review comments, using Duration type for better type safety and clarity.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/sync.rs (3)

172-173: Enhance function documentation.

The comment could be more descriptive about the sequential nature of the operations and their importance.

-/// Flush arrows onto disk and convert them into parquet files
+/// Sequentially flushes arrows onto disk and converts them into parquet files.
+/// This ensures data consistency by preventing potential data loss that could occur
+/// with parallel operations.

196-198: Consider enhancing error handling for task completion.

The current error handling only logs the error. Consider implementing a retry mechanism or propagating the error to trigger a recovery process.

 Some(Err(e)) = joinset.join_next(), if !joinset.is_empty() => {
-    error!("Issue joining flush+conversion: {e}")
+    error!("Issue joining flush+conversion: {e}");
+    // Consider implementing a retry mechanism
+    joinset.spawn(flush_and_convert());
 }

273-286: Consider implementing transaction-like behavior.

While the sequential execution is good, consider implementing a transaction-like behavior to ensure data consistency in case of failures.

 async fn flush_and_convert() {
+    // Track the state of operations
+    let mut flush_successful = false;
+
     trace!("Flushing Arrows to disk...");
-    PARSEABLE.flush_all_streams();
+    if let Err(e) = PARSEABLE.flush_all_streams() {
+        error!("Failed to flush streams: {e}");
+        return;
+    }
+    flush_successful = true;

     trace!("Converting Arrow to Parquet... ");
     if let Err(e) = monitor_task_duration("arrow_conversion", Duration::from_secs(30), || async {
         PARSEABLE.streams.prepare_parquet(false)
     })
     .await
     {
         warn!("failed to convert local arrow data to parquet. {e:?}");
+        if flush_successful {
+            // Consider implementing a rollback mechanism
+            warn!("Data was flushed but conversion failed. Manual intervention may be required.");
+        }
     }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 281a1a4 and 4a2a659.

📒 Files selected for processing (1)
  • src/sync.rs (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (3)
src/sync.rs (3)

23-23: LGTM! Good improvements to imports and constant management.

The changes improve code organization by:

  • Using JoinSet for better task management
  • Centralizing interval constants in the crate root

Also applies to: 30-30


75-110: LGTM! Successfully simplified the synchronization workflow.

The changes effectively merge the parallel tasks into a sequential operation while maintaining robust error handling, aligning perfectly with the PR objectives.


122-122: LGTM! Improved interval handling.

Using STORAGE_UPLOAD_INTERVAL directly improves code clarity and maintainability.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 24, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/sync.rs (3)

75-77: Update outdated documentation comment

The comment still mentions STORAGE_CONVERSION_INTERVAL which has been removed from the codebase. This comment should be updated to reflect the new implementation that combines arrow flushing and parquet conversion in a single workflow.

-/// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every
-/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
+/// Flushes arrows onto disk and converts them to parquet every `LOCAL_SYNC_INTERVAL` seconds,
+/// and uploads them to object storage every `STORAGE_UPLOAD_INTERVAL` seconds.

273-286: Consider enhancing the documentation of flush_and_convert

The function successfully combines arrow flushing and parquet conversion, but the documentation could be more detailed about error handling strategy and potential failure modes.

Consider expanding the doc comment to provide more information about:

  • What happens if either the flush or convert operations fail
  • Whether there are any side effects that callers should be aware of
  • How errors are handled and reported
-/// Asynchronously flushes all streams when called, then compacts them into parquet files ready to be pushed onto objectstore
+/// Asynchronously flushes all streams to disk and then compacts them into parquet files ready for object store upload.
+/// 
+/// This function performs both operations sequentially to ensure data integrity. Errors during the conversion 
+/// process are logged but don't interrupt the workflow. If the stream flush fails, subsequent conversion will 
+/// only process previously flushed data. The function is monitored for execution time and warns if it exceeds 
+/// 30 seconds.

279-285: Consider more specific error logging

The error logging for the conversion failure could be more specific to help with debugging. Consider including more context about which streams or files failed during conversion.

-        warn!("failed to convert local arrow data to parquet. {e:?}");
+        warn!("Failed to convert local arrow data to parquet - this may result in missing data in object storage. Error details: {e:?}");
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2a659 and 8219e43.

📒 Files selected for processing (1)
  • src/sync.rs (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (4)
src/sync.rs (4)

172-173: Good rename to improve code clarity

Renaming from run_local_sync to local_sync makes the naming more consistent with other functions in the file (like object_store_sync). This improves code readability.


186-188: Good use of constants and JoinSet

The refactoring to use the LOCAL_SYNC_INTERVAL constant improves code maintainability. The addition of JoinSet provides better task management and error handling for the spawned tasks, which helps prevent resource leaks and improves error visibility.


191-194: Successfully merged arrow flushing and parquet conversion

This change successfully addresses the PR objective by spawning a task that performs both arrow flushing and parquet conversion in a single operation, rather than as separate processes with a delay between them. This should prevent the data loss issues described in the PR objectives.


196-198: Good addition of error handling for spawned tasks

The addition of error handling for the JoinSet tasks ensures that any issues during the flush and convert operations are properly logged and don't cause the whole synchronization process to fail silently.

src/sync.rs Outdated
Comment on lines 89 to 90
error!("Error joining remote_sync_handler: {e:?}");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix incorrect log message

The error message refers to "remote_sync_handler" but it should refer to "localsync_handler" since it's reporting an error when joining that specific handler.

-                    error!("Error joining remote_sync_handler: {e:?}");
+                    error!("Error joining localsync_handler: {e:?}");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
error!("Error joining remote_sync_handler: {e:?}");
}
error!("Error joining localsync_handler: {e:?}");
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/sync.rs (1)

192-202: Check concurrency error-handling strategy.
The loop waits for each flush+conversion task, logging errors but continuing execution. This is likely intended, though consider if serious conversion failures should halt or retry.

src/handlers/http/health_check.rs (1)

77-86: Object store upload logic on shutdown.
The code logs any errors during final sync but does not retry. If reliability is critical, consider a retry or delayed re-upload.

src/parseable/streams.rs (1)

728-734: Concurrent flush_and_convert for all streams.
Spawning each stream’s flush+conversion in parallel is efficient. Ensure any critical errors are aggregated or handled, as collecting them via the JoinSet result is up to the caller.

Also applies to: 742-742

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8219e43 and 2b3b2d4.

📒 Files selected for processing (5)
  • src/handlers/http/health_check.rs (2 hunks)
  • src/handlers/http/modal/mod.rs (0 hunks)
  • src/parseable/mod.rs (0 hunks)
  • src/parseable/streams.rs (4 hunks)
  • src/sync.rs (7 hunks)
💤 Files with no reviewable changes (2)
  • src/parseable/mod.rs
  • src/handlers/http/modal/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (14)
src/sync.rs (9)

23-23: No issues with this import.


30-30: Import looks good.


77-79: Evaluate the necessity of top-level #[tokio::main].
If this entrypoint is also used from within an existing Tokio runtime, it may result in a nested runtime error. Otherwise, this seems correct for a standalone binary.


89-89: Rename the incorrect log message.
This line references localsync_handler.await but logs an error about remote_sync_handler. This was flagged earlier and remains unfixed.

-                    error!("Error joining remote_sync_handler: {e:?}");
+                    error!("Error joining localsync_handler: {e:?}");

92-92: Logging statement is correct.
This line accurately reports an error for remote_sync_handler.


104-104: Confirmed correct usage for remote_sync_handler.
No changes needed.


172-173: Doc comment & function signature.
This doc properly summarizes the new function local_sync.


186-187: Creation of interval and JoinSet is fine.
This aligns well with the local sync logic.


219-219: Error logging is sufficient for panics.

src/handlers/http/health_check.rs (2)

30-31: New imports for concurrency and logging.
No issues found.


64-75: Managing async tasks with JoinSet upon shutdown.
These lines systematically flush and convert arrow files to parquet. The error cases are logged, and the server proceeds to finalize tasks. Looks solid.

src/parseable/streams.rs (3)

44-44: Import of JoinSet.
Straightforward addition with no conflicts noted.


53-55: Updated imports.
References to StreamType and OBJECT_STORE_DATA_GRANULARITY remain consistent with the new design.


656-660: Inline flush + parquet conversion.
The flush_and_convert method within Stream effectively reduces steps by calling flush() before prepare_parquet(). Implementation appears consistent.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/sync.rs (1)

89-90: ⚠️ Potential issue

Fix incorrect log message.

The error message refers to "remote_sync_handler" but it should refer to "localsync_handler" since it's reporting an error when joining that specific handler.

-                    error!("Error joining remote_sync_handler: {e:?}");
+                    error!("Error joining localsync_handler: {e:?}");
🧹 Nitpick comments (1)
src/sync.rs (1)

77-77: Update the function documentation for accuracy.

The comment still mentions STORAGE_CONVERSION_INTERVAL which no longer exists, but the code now combines flush and convert operations.

-/// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every
-/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
+/// Flushes arrows onto disk and converts them to parquet every `LOCAL_SYNC_INTERVAL` seconds,
+/// and uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0c789eb and 71d643d.

📒 Files selected for processing (1)
  • src/sync.rs (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (10)
src/sync.rs (10)

23-23: Excellent addition of JoinSet for concurrent task management.

Using JoinSet is a good approach for managing the concurrent tasks for flushing and conversion operations. This provides better control over spawned tasks and improved error handling.


30-30: Updated imports align with the simplified workflow.

Removing STORAGE_CONVERSION_INTERVAL and only using LOCAL_SYNC_INTERVAL and STORAGE_UPLOAD_INTERVAL reflects the consolidated approach of handling both arrow finishing and parquet conversion in one workflow.


92-93: Error message is correct but repeated.

This error message correctly refers to "remote_sync_handler" since it's reporting an error when joining that specific handler.


104-105: Error message is correct and appropriate.

This error message correctly refers to "remote_sync_handler" since it's reporting an error when joining that specific handler.


122-122: Direct use of STORAGE_UPLOAD_INTERVAL improves code clarity.

Using the constant directly instead of wrapping it in Duration::from_secs() improves readability and makes the code more maintainable.


172-173: Good function rename with clear documentation.

Renaming from run_local_sync to local_sync improves code clarity while the updated comment accurately describes the function's consolidated purpose.


186-188: Excellent implementation of the consolidated workflow.

Using a single interval and introducing a JoinSet for managing flush and convert tasks aligns perfectly with the PR objective of merging these operations into a single sequential workflow.


191-193: Well-structured task spawning for the combined operation.

The comment accurately describes the new functionality, and calling flush_and_convert with the JoinSet effectively implements the merged workflow.


195-202: Comprehensive error handling for spawned tasks.

The new code block for handling task results provides good visibility into successes and failures, with appropriate logging levels for different outcomes.


219-219: Updated error message for clarity.

The error message now correctly identifies the panic as occurring in the "local sync task," which is more specific and helpful for debugging.

Signed-off-by: Devdutt Shenoi <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/sync.rs (1)

76-76: Update outdated documentation comment.

The documentation comment still references STORAGE_CONVERSION_INTERVAL which has been removed according to the file change summary. This should be updated to match the new architecture.

-/// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every
-/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
+/// Flushes arrows onto disk and converts them to parquet every `LOCAL_SYNC_INTERVAL` seconds, 
+/// and uploads them to object store every `STORAGE_UPLOAD_INTERVAL` seconds.

Also applies to: 112-170

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 71d643d and 00b5b71.

📒 Files selected for processing (1)
  • src/sync.rs (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (8)
src/sync.rs (8)

23-23: Good addition of JoinSet for concurrent task management.

JoinSet is a more robust approach for managing multiple asynchronous tasks compared to individual task spawning. This aligns well with the PR objective of streamlining arrow file processing.


30-30: Improved organization with constants moved to crate root.

Using centralized constants from the crate root increases maintainability and consistency across the codebase. Good refactoring decision.


89-90: Error message correction applied successfully.

This fix addresses the previously identified issue where the error message incorrectly referred to "remote_sync_handler" instead of "localsync_handler".


172-173: Function renamed to better reflect its responsibility.

The rename from run_local_sync() to local_sync() is more concise while the updated documentation clearly explains the combined responsibility of flushing arrows and converting them to parquet files.


186-188: Good use of standardized intervals and JoinSet initialization.

The code now properly uses the standardized LOCAL_SYNC_INTERVAL constant and sets up a JoinSet for managing concurrent tasks, which supports the PR's goal of streamlining the arrow file processing workflow.


191-194: Streamlined processing with combined flush and convert operations.

The implementation now merges arrow file finishing and conversion to parquet in a single workflow, directly addressing the main objective of this PR: to prevent potential data loss from synchronization issues between these previously separate operations.


196-202: Well-structured error handling for task completion.

The addition of explicit handling for JoinSet task completion provides clear logging for successful conversions and proper error reporting. This makes debugging easier and gives better visibility into the conversion process.


77-77:

❓ Verification inconclusive

Verify the worker thread count is sufficient for task needs.

The multi-thread runtime is configured with 2 worker threads. Ensure this is sufficient for the workload, especially since you're now managing multiple tasks with JoinSet.


🏁 Script executed:

#!/bin/bash
# Check for any performance-related comments or discussions about thread counts
rg -i "worker.?thread|thread.?count|performance|concurrent" --type rust -A 3 -B 3

Length of output: 5626


Action Required: Verify Worker Thread Configuration

In src/sync.rs at line 77, the Tokio multi-thread runtime is initialized with 2 worker threads:

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]

Since you're now managing multiple tasks via JoinSet, please confirm that 2 worker threads are adequate to handle the increased concurrency. Consider whether benchmarking under simulated load or aligning with dynamic thread configurations (as seen in src/connectors/common/mod.rs) might offer better scalability for the workload.

Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@nitisht nitisht merged commit 649c000 into parseablehq:main Feb 25, 2025
14 checks passed
@de-sh de-sh deleted the merge-sync branch February 25, 2025 09:55
7h3cyb3rm0nk pushed a commit to 7h3cyb3rm0nk/parseable that referenced this pull request Feb 26, 2025
)

Signed-off-by: Devdutt Shenoi <[email protected]>
Co-authored-by: Nikhil Sinha <[email protected]>
7h3cyb3rm0nk pushed a commit to 7h3cyb3rm0nk/parseable that referenced this pull request Feb 26, 2025
)

Signed-off-by: Devdutt Shenoi <[email protected]>
Co-authored-by: Nikhil Sinha <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants