Skip to content

update log source in stream info #1231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 14, 2025

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Mar 10, 2025

updated to store multiple log source formats
along with the known fields list
added fields list for otel-logs, otel-traces and otel-metrics
merge log sources when fetching from storage (in distributed)

Summary by CodeRabbit

  • New Features

    • Introduced support for multiple log sources per stream, providing more flexible and robust logging capabilities.
    • Added predefined field sets for enhanced monitoring of logs, metrics, and traces.
  • Refactor

    • Streamlined log ingestion and metadata migration processes, ensuring smoother updates and improved log source management.

Copy link
Contributor

coderabbitai bot commented Mar 10, 2025

Walkthrough

This pull request introduces the LogSourceEntry struct to encapsulate a log source’s format along with its associated field names. Functions across ingestion, parsing, metadata management, migration, and storage modules are updated to work with a vector of LogSourceEntry instances instead of a single LogSource. The changes include updating method signatures, adding new helper functions, and modifying migration logic to handle version upgrades. Additionally, new constants are added for known OpenTelemetry fields and related methods in the Kafka connector have been adjusted to support the new log source representation.

Changes

File(s) Change Summary
src/event/format/mod.rs Introduced LogSourceEntry struct with fields for log source format and field set; updated LogSource enum to derive Hash.
src/handlers/http/ingest.rs
src/handlers/http/logstream.rs
Modified stream creation functions to accept a vector of LogSourceEntry; added log source retrieval/updating in stream info.
src/metadata.rs Updated LogStreamMetadata: changed log_source from a single LogSource to Vec<LogSourceEntry>; modified the constructor accordingly.
src/migration/mod.rs
src/migration/stream_metadata_migration.rs
Added helper functions (fetch_or_create_schema, fetch_or_create_stream_metadata, migrate_stream_metadata) and updated migration logic to transform log source data into an array of LogSourceEntry.
src/otel/logs.rs
src/otel/metrics.rs
src/otel/traces.rs
Added new constants defining known field lists for OpenTelemetry logs, metrics, and traces.
src/parseable/mod.rs
src/parseable/streams.rs
Updated methods to accept vectors of LogSourceEntry and introduced new methods (add_update_log_source, update_log_source, set_log_source, get_log_source, etc.) for managing log sources.
src/storage/mod.rs
src/storage/object_storage.rs
Changed log source fields in storage-related structs to use vectors of LogSourceEntry; updated defaults and added methods for updating and retrieving log source information.
src/connectors/kafka/processor.rs Replaced LogSource with LogSourceEntry in stream creation, using a default instance wrapped in a vector.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant H as HTTP Ingest Handler
    participant P as Parseable Module
    participant S as Object Storage

    C->>H: Send ingestion request
    H->>H: Create LogSourceEntry with known fields
    H->>P: Call create_stream_if_not_exists([LogSourceEntry])
    P->>S: Update stream metadata with new log source vector
    S-->>P: Confirm metadata update
    P-->>H: Stream creation succeeded
    H-->>C: Respond with success
Loading
sequenceDiagram
    participant M as Migration Module
    participant O as Object Storage
    participant X as Migration Helper

    M->>O: fetch_or_create_schema(stream)
    M->>O: fetch_or_create_stream_metadata(stream)
    O-->>M: Return schema & metadata
    M->>X: migrate_stream_metadata(metadata)
    X-->>M: Return migrated metadata
    M->>O: Update stream metadata in storage
Loading

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • de-sh

Poem

I'm a bunny, hopping with glee,
New log sources set our data free.
Fields organized in a vector array,
Migration paths now lead the way.
With each update, our code hops high—
A joyful leap beneath the digital sky!
🐇💻

Tip

⚡🧪 Multi-step agentic review comment chat (experimental)
  • We're introducing multi-step agentic chat in review comments. This experimental feature enhances review discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments.
    - To enable this feature, set early_access to true under in the settings.
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
src/handlers/http/logstream.rs (1)

312-318: Added code to retrieve and update log sources

This new code segment retrieves log sources from storage and updates them in memory, which fulfills the PR objective to merge log sources when fetching data in a distributed environment.

However, the unwrap_or_default() could potentially mask storage errors by treating them as "no data found" cases.

Consider separating error handling from the "not found" case:

-    let stream_log_source = storage
-        .get_log_source_from_storage(&stream_name)
-        .await
-        .unwrap_or_default();
-    PARSEABLE
-        .update_log_source(&stream_name, stream_log_source)
-        .await?;
+    let stream_log_source = match storage.get_log_source_from_storage(&stream_name).await {
+        Ok(sources) => sources,
+        Err(e) if matches!(e, ObjectStorageError::NoSuchKey(_)) => Vec::new(),
+        Err(e) => return Err(StreamError::Storage(e)),
+    };
+    PARSEABLE
+        .update_log_source(&stream_name, stream_log_source)
+        .await?;
src/migration/stream_metadata_migration.rs (1)

183-208: New migration function to handle log source format changes

The v5_v6 function properly migrates from a single LogSource to a vector of LogSourceEntry objects, handling both cases where a log source exists and where it doesn't.

There's a potential issue with error handling when deserializing the log source.

The use of unwrap() on line 199 could cause a panic if the deserialization fails. Consider a more robust error handling approach:

-            let log_source: LogSource = serde_json::from_value(stream_log_source.clone()).unwrap();
+            if let Ok(log_source) = serde_json::from_value::<LogSource>(stream_log_source.clone()) {
+                log_source_entry.add_log_source(log_source, HashSet::new());
+                stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
+            } else {
+                // If deserialization fails, use default
+                stream_metadata_map.insert("log_source".to_owned(), LogSourceEntry::default().to_value());
+            }
src/storage/object_storage.rs (1)

563-604: Updated code to merge log sources from multiple ingestors

This section refactors the stream creation process to merge log sources from multiple ingestors, aligning with the PR objective to handle multiple log source formats.

There's code duplication between this method and get_log_source_from_storage.

Consider extracting the log source merging logic into a helper function to avoid duplication:

+    /// Merges multiple LogSourceEntry objects into a consolidated list with combined fields
+    fn merge_log_sources(all_log_sources: Vec<LogSourceEntry>) -> Vec<LogSourceEntry> {
+        let mut merged_log_sources: Vec<LogSourceEntry> = Vec::new();
+        let mut log_source_map: HashMap<LogSource, HashSet<String>> = HashMap::new();
+        
+        for log_source_entry in all_log_sources {
+            let log_source_format = log_source_entry.log_source_format;
+            let fields = log_source_entry.fields;
+            
+            log_source_map
+                .entry(log_source_format)
+                .or_default()
+                .extend(fields);
+        }
+        
+        for (log_source_format, fields) in log_source_map {
+            merged_log_sources.push(LogSourceEntry {
+                log_source_format,
+                fields: fields.into_iter().collect(),
+            });
+        }
+        
+        merged_log_sources
+    }

Then you could use this function in both places:

-                // Merge log sources
-                let mut merged_log_sources: Vec<LogSourceEntry> = Vec::new();
-                let mut log_source_map: HashMap<LogSource, HashSet<String>> = HashMap::new();
-
-                for log_source_entry in all_log_sources {
-                    let log_source_format = log_source_entry.log_source_format;
-                    let fields = log_source_entry.fields;
-
-                    log_source_map
-                        .entry(log_source_format)
-                        .or_default()
-                        .extend(fields);
-                }
-
-                for (log_source_format, fields) in log_source_map {
-                    merged_log_sources.push(LogSourceEntry {
-                        log_source_format,
-                        fields: fields.into_iter().collect(),
-                    });
-                }
+                let merged_log_sources = merge_log_sources(all_log_sources);
src/parseable/streams.rs (2)

683-685: New method: get_log_source

Method added to retrieve the current log source entries from the stream's metadata. Note that this uses clone() which could be inefficient for large collections.

Consider returning a reference to the log source vector instead of cloning it to avoid unnecessary copying:

-pub fn get_log_source(&self) -> Vec<LogSourceEntry> {
-    self.metadata.read().expect(LOCK_EXPECT).log_source.clone()
+pub fn get_log_source(&self) -> &Vec<LogSourceEntry> {
+    &self.metadata.read().expect(LOCK_EXPECT).log_source
}

Or, if mutability is needed later:

-pub fn get_log_source(&self) -> Vec<LogSourceEntry> {
-    self.metadata.read().expect(LOCK_EXPECT).log_source.clone()
+pub fn get_log_source_mut(&self) -> &mut Vec<LogSourceEntry> {
+    &mut self.metadata.write().expect(LOCK_EXPECT).log_source
}

687-690: New method: add_log_source

Method added to append a new log source entry to the existing list in the stream's metadata.

Consider checking for duplicate log source formats before adding a new entry. This would prevent redundant entries with the same format:

pub fn add_log_source(&self, log_source: LogSourceEntry) {
    let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+   // Check if this log source format already exists
+   for existing in &metadata.log_source {
+       if existing.log_source_format == log_source.log_source_format {
+           return; // or merge fields, depending on requirements
+       }
+   }
    metadata.log_source.push(log_source);
}
src/parseable/mod.rs (1)

421-470: Incremental merging of log source fields
This method cleverly merges only new fields into existing entries. If multiple writes occur concurrently, ensure the underlying locking in streams handles potential race conditions. Otherwise, the approach to sync changes to storage only when new fields are detected is efficient.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8cbee4d and cb883df.

📒 Files selected for processing (14)
  • src/event/format/mod.rs (3 hunks)
  • src/handlers/http/ingest.rs (6 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/metadata.rs (3 hunks)
  • src/migration/mod.rs (1 hunks)
  • src/migration/stream_metadata_migration.rs (2 hunks)
  • src/otel/logs.rs (1 hunks)
  • src/otel/metrics.rs (1 hunks)
  • src/otel/traces.rs (1 hunks)
  • src/parseable/mod.rs (9 hunks)
  • src/parseable/streams.rs (3 hunks)
  • src/query/mod.rs (0 hunks)
  • src/storage/mod.rs (5 hunks)
  • src/storage/object_storage.rs (5 hunks)
💤 Files with no reviewable changes (1)
  • src/query/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (38)
src/otel/metrics.rs (1)

30-36: Well-structured constant for OTEL metrics fields.

The addition of the OTEL_METRICS_KNOWN_FIELD_LIST constant is a good enhancement that provides a centralized definition of known fields for OpenTelemetry metrics. This aligns well with the PR objective of adding fields lists for various OpenTelemetry components.

src/otel/logs.rs (1)

30-37: Clear definition of standard OTEL logs fields.

The OTEL_LOG_KNOWN_FIELD_LIST constant provides a well-defined set of known fields for OpenTelemetry logs, which is consistent with the approach taken for metrics and traces. This supports the PR objective of enhancing field management for OpenTelemetry formats.

src/otel/traces.rs (1)

30-46: Comprehensive field list for OTEL traces.

The OTEL_TRACES_KNOWN_FIELD_LIST constant includes a thorough set of fields related to OpenTelemetry traces. The naming is consistent with the other OpenTelemetry constants, and the inclusion of 15 specific fields provides good coverage for trace data attributes.

src/migration/mod.rs (1)

257-262:

✅ Verification successful

Migration path for v5 to v6 properly implemented.

The addition of the v5 to v6 migration path is essential for supporting the new log source entry format. This ensures backward compatibility with existing data while enabling the enhanced log source functionality introduced in this PR.


🏁 Script executed:

#!/bin/bash
# Verify that the v5_v6 function exists and properly handles log sources

# Check the implementation of v5_v6 function
rg -A 10 "fn v5_v6" src/migration/stream_metadata_migration.rs

Length of output: 514


Verification Confirmed: Migration Implementation is Correct

After verifying the code in src/migration/stream_metadata_migration.rs, the v5_v6 function was found to correctly handle the migration. The function properly updates the metadata by inserting the new "objectstore-format" and "version" keys, ensuring backward compatibility with the existing data while supporting the new log source entry format.

  • The v5_v6 function is implemented as expected in src/migration/stream_metadata_migration.rs.
  • The call to v5_v6 in src/migration/mod.rs at lines 257-262 properly updates and persists the transformed metadata.
src/metadata.rs (3)

27-27: Updated import to use LogSourceEntry instead of LogSource

The import statement is correctly updated to import LogSourceEntry from the format module.


90-90: Field type change to support multiple log sources

The log_source field has been changed from a single LogSource to a vector of LogSourceEntry objects, enabling the storage of multiple log source formats with their associated fields, as per the PR objectives.


104-104: Updated method signature to accept multiple log sources

The constructor signature is properly updated to accept a vector of LogSourceEntry objects instead of a single LogSource, maintaining consistency with the field type change.

src/migration/stream_metadata_migration.rs (1)

20-27: Added imports for new functionality

The imports for HashSet, CURRENT_SNAPSHOT_VERSION, LogSource, and LogSourceEntry are correctly added to support the new migration function.

src/storage/object_storage.rs (3)

48-49: Added imports for log source types

The imports for LogSource and LogSourceEntry are correctly added to support the new functionality.


273-285: New method to update log sources in stream metadata

This method correctly updates the log source in the object store format for a given stream, following the same pattern as other update methods in this file.


690-723:

❓ Verification inconclusive

New method to retrieve and merge log sources from storage

This method correctly retrieves log sources from all stream metadata files in storage and merges them into a consolidated list, properly handling fields from sources with the same format.

As noted above, there's duplication with the merging logic in create_stream_from_ingestor.

To ensure proper merging of log sources, let's validate the function by checking what kind of log sources and fields might exist in the codebase:


🏁 Script executed:

#!/bin/bash
# Check for LogSource enum values and their usages
rg -A 5 "enum LogSource" --glob "*.rs"

# Check for LogSourceEntry struct definition
rg -A 10 "struct LogSourceEntry" --glob "*.rs"

# Look for OpenTelemetry field definitions mentioned in PR
rg -A 5 "otel-(logs|traces|metrics)" --glob "*.rs"

Length of output: 2558


Merged Log Sources Retrieval – Verify Duplication Refactor

The new method in src/storage/object_storage.rs correctly retrieves log sources from all stream metadata files and consolidates their fields by merging entries that share the same log source format. Verification confirms that the merging logic is consistent with our project’s definitions (as seen with LogSource and LogSourceEntry in src/event/format/mod.rs).

  • The merging process uses a HashMap to combine fields from duplicate sources, matching the structure used elsewhere.
  • Note that similar merging logic exists in create_stream_from_ingestor, which may be an opportunity for refactoring to reduce duplication.

Please verify the impact of this duplication and consider consolidating the merging routines if appropriate.

src/event/format/mod.rs (3)

31-31: Import updated to include json macro

The import is now correctly updated to include the json macro which supports the new to_value method in the LogSourceEntry struct.


47-47: Added Hash trait to LogSource enum

The Hash trait has been added to the LogSource enum's derivation list, allowing it to be used as a key in hash-based collections such as HashSet.


95-120:

✅ Verification successful

New LogSourceEntry struct implementation

This new struct effectively encapsulates a log source format with its associated fields, supporting the PR objective of storing multiple log source formats with their fields.

The implementation includes essential methods:

  • Constructor with appropriate initialization
  • Method to update the log source format and fields
  • Method to convert to JSON value

The to_value method creates a JSON array with a single object, which seems unusual for a single entry. This suggests the schema might be designed to support multiple entries in the future.

Verify that the to_value method's return format (array with one object) is the expected format for consumer code:


🏁 Script executed:

#!/bin/bash
# Check how to_value is used in the codebase
rg "\\.to_value\\(" --type rust

Length of output: 305


Verified: The JSON array format returned by to_value() is consistent with consumer expectations.

The new LogSourceEntry implementation meets the PR objective. The migration code (in src/migration/stream_metadata_migration.rs) uses to_value() by inserting its output directly into a metadata map—indicating that the array containing a single object is indeed the expected format for downstream processing. There are no further issues noted.

src/storage/mod.rs (5)

26-26: Updated import for LogSourceEntry

Import for the newly created LogSourceEntry struct is correctly added.


120-120: Updated log_source field type in ObjectStoreFormat

The log_source field has been changed from LogSource to Vec<LogSourceEntry>, allowing multiple log source formats with their associated fields to be stored.


145-145: Updated log_source field type in StreamInfo

Similar to ObjectStoreFormat, the log_source field in StreamInfo has been changed from LogSource to Vec<LogSourceEntry>.


222-222: Default implementation updated

The default implementation of ObjectStoreFormat now correctly initializes log_source as a vector containing a default LogSourceEntry.


74-75:

✅ Verification successful

Version constants updated

The CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants have been incremented from "v5" to "v6". This is appropriate since the changes introduce a breaking change in data structures.

Let's verify if there are any migration functions to handle the version change:


🏁 Script executed:

#!/bin/bash
# Check for migration logic between v5 and v6
rg "v5.*v6|version.*migration" --type rust

Length of output: 257


Verified: Constants update and migration logic are in place.

The version constants update from "v5" to "v6" is reflected correctly, and we verified that a migration function (stream_metadata_migration::v5_v6) exists in the codebase (located in src/migration/mod.rs and implemented in src/migration/stream_metadata_migration.rs). This confirms that a proper migration path is implemented for the breaking change.

src/parseable/streams.rs (5)

21-21: Added HashSet import

HashSet is now imported to support storing unique field names in the LogSourceEntry struct.


51-53: Updated imports for event module

Imports have been restructured to include LogSource and LogSourceEntry from the format module.


679-681: New method: set_log_source

Method added to set the log source entries in the stream's metadata.


692-700: New method: add_fields_to_log_source

Method added to add fields to a specific log source entry if it matches the provided log source format.

The implementation correctly finds the matching log source entry and extends its fields.


702-710: New method: get_fields_from_log_source

Method added to retrieve fields associated with a specific log source entry.

The implementation correctly returns the fields as a HashSet if the log source format is found.

src/handlers/http/ingest.rs (7)

19-19: Added HashSet import

HashSet is now imported to support creating the field sets for LogSourceEntry.


31-31: Updated import for format module

Import has been updated to include LogSourceEntry alongside the existing imports.


35-37: Added imports for OpenTelemetry field constants

Imports added for the OpenTelemetry known field lists that will be used with the new LogSourceEntry instances.


75-82: Updated stream creation in ingest handler

The code now creates a LogSourceEntry with an empty field set and uses it in create_stream_if_not_exists.


132-138: Create LogSourceEntry with known fields for OtelLogs

The OtelLogs handler now creates a LogSourceEntry with pre-populated fields from OTEL_LOG_KNOWN_FIELD_LIST.


170-176: Create LogSourceEntry with known fields for OtelMetrics

The OtelMetrics handler now creates a LogSourceEntry with pre-populated fields from OTEL_METRICS_KNOWN_FIELD_LIST.


209-215: Create LogSourceEntry with known fields for OtelTraces

The OtelTraces handler now creates a LogSourceEntry with pre-populated fields from OTEL_TRACES_KNOWN_FIELD_LIST.

src/parseable/mod.rs (7)

20-26: Imports for HashSet & NonZeroU32 look appropriate.
They are used effectively for handling unique sets of fields and partition limits. No concerns.


44-44: Expanded event format import appears consistent.
Bringing in LogSourceEntry aligns well with the newly added methods managing multiple log sources.


355-360: Use of a default LogSourceEntry for internal streams
Creating a new LogSourceEntry with LogSource::Pmeta and an empty field set is correct. This provides a placeholder for any metadata fields if needed in the future.


384-391: Iterating over all new log source entries for an existing stream
Looping through each entry and calling add_update_log_source ensures incremental merging of fields. Looks conceptually sound.


535-544: Creating a LogSourceEntry with an empty field set
Consider verifying that having no fields initially is desired. If you intend to populate fields later, confirm that subsequent merges provide correct updates.


600-600: Expanded method signature
Accepting a vector of LogSourceEntry allows more flexible stream creation with multiple log sources in one go. No issues spotted here.


825-843: Full overwrite of log source metadata
Calling update_log_source overwrites the entire log source vector for a stream. This may remove data added by add_update_log_source if both are used concurrently. Confirm this approach won’t cause data loss in your multi-tenant or distributed setting.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 10, 2025
coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 10, 2025
coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 10, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
src/migration/stream_metadata_migration.rs (1)

183-214: Consider consolidating redundant field insertion logic.

The v5_v6 function contains redundant code in the error handling path. When deserialization fails (line 200), you insert a default LogSourceEntry at line 204, but then immediately overwrite it at line 208 with the local log_source_entry variable. This makes the insertion at line 204 unnecessary.

Additionally, consider adding error logging for the deserialization failure to help with troubleshooting during migration.

pub fn v5_v6(mut stream_metadata: Value) -> Value {
    let stream_metadata_map: &mut serde_json::Map<String, Value> =
        stream_metadata.as_object_mut().unwrap();
    stream_metadata_map.insert(
        "objectstore-format".to_owned(),
        Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()),
    );
    stream_metadata_map.insert(
        "version".to_owned(),
        Value::String(storage::CURRENT_SCHEMA_VERSION.into()),
    );
    let log_source = stream_metadata_map.get("log_source");
    let mut log_source_entry = LogSourceEntry::default();
    match log_source {
        Some(stream_log_source) => {
            if let Ok(log_source) = serde_json::from_value::<LogSource>(stream_log_source.clone()) {
                log_source_entry.add_log_source(log_source, HashSet::new());
            } else {
                // If deserialization fails, use default and log the error
-               stream_metadata_map.insert(
-                   "log_source".to_owned(),
-                   LogSourceEntry::default().to_value(),
-               );
+               // Consider adding error logging here
+               // e.g., tracing::warn!("Failed to deserialize log source during v5_v6 migration");
            }
            stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
        }
        None => {
            stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
        }
    }
    stream_metadata
}
src/parseable/streams.rs (1)

702-710: Missing error handling for non-existent log source.

The method silently does nothing when the log source doesn't exist. Consider adding a return value to indicate success/failure or return/create a new entry if one doesn't exist.

-pub fn add_fields_to_log_source(&self, log_source: &LogSource, fields: HashSet<String>) {
+pub fn add_fields_to_log_source(&self, log_source: &LogSource, fields: HashSet<String>) -> bool {
    let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
    for log_source_entry in metadata.log_source.iter_mut() {
        if log_source_entry.log_source_format == *log_source {
            log_source_entry.fields.extend(fields);
-            return;
+            return true;
        }
    }
+    return false;
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c56ccca and 06beee3.

📒 Files selected for processing (2)
  • src/migration/stream_metadata_migration.rs (2 hunks)
  • src/parseable/streams.rs (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (7)
src/migration/stream_metadata_migration.rs (2)

20-21: LGTM - Appropriate imports for supporting the new structure.

The new imports of HashSet and LogSourceEntry are correctly added to support the migration logic.

Also applies to: 26-27


195-199:

✅ Verification successful

Verify if empty field collection is intentional.

When migrating an existing log source, an empty HashSet is used for fields. If the original log source had associated fields that should be preserved, ensure this information is migrated correctly. If fields weren't previously stored or if starting with an empty set is intentional, you might want to add a comment explaining this decision.


🏁 Script executed:

#!/bin/bash
# Check for any existing field-related properties in the old LogSource type
# that might need to be migrated to the new LogSourceEntry

echo "Looking for the LogSource struct definition..."
rg -A 15 "struct LogSource" --type rust

echo "Looking for field-related properties in log source implementations..."
rg -B 3 -A 10 "impl.*LogSource" --type rust

Length of output: 3517


Clarification on Empty Field Collection in LogSource Migration

Based on the review of the legacy implementation in src/event/format/mod.rs, it appears that the original LogSource data did not include any associated fields—the LogSourceEntry structure was later extended to include a fields: HashSet<String>, and the migration presently initializes this with an empty set. If this behavior is intentional (i.e., legacy log sources never stored field data), please add a brief comment in the migration code to document that this empty collection is deliberate. Otherwise, consider revisiting the migration logic to preserve any potential legacy field data.

src/parseable/streams.rs (5)

21-21: Import for HashSet added correctly.

The HashSet import is needed for the new log source field management functionality.


49-53: Appropriate imports added for the new log source functionality.

The imports for LogSource, LogSourceEntry, and DEFAULT_TIMESTAMP_KEY are correctly added to support the changes in this file.


679-681: Simple setter method follows existing patterns.

This setter method follows the project's pattern for updating metadata fields with appropriate locking.


683-685: Getter method with appropriate cloning.

The getter clones the log_source vector to avoid exposing internal references, maintaining thread safety.


712-720: Getter method implementation is correct.

The implementation correctly returns a cloned HashSet when found or None when not found, maintaining thread safety.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/migration/stream_metadata_migration.rs (1)

197-200: Consider more explicit error handling for deserialization failures.

When deserialization fails, the function silently falls back to inserting a default LogSourceEntry. While this gracefully handles corrupted data, consider adding a warning log to track these cases.

 if let Ok(log_source) = serde_json::from_value::<LogSource>(log_source.unwrap().clone()) {
     log_source_entry.add_log_source(log_source, HashSet::new());
+} else {
+    warn!("Failed to deserialize log source during v5_v6 migration, using default");
 }
src/parseable/streams.rs (2)

684-685: Consider optimizing clone for large log source collections.

The method returns a clone of the entire log source vector. While necessary for ownership purposes, this could be inefficient if the collection becomes large. For most use cases this is likely fine, but consider adding a non-cloning version that returns a reference if needed for high-performance paths.


724-732: Consider providing a referenced version of get_fields_from_log_source.

Similar to the get_log_source method, this makes a clone of potentially large HashSets. For cases where the caller doesn't need ownership, consider adding a method that returns a reference to the fields to avoid unnecessary cloning.

+pub fn get_fields_from_log_source_ref<'a>(&'a self, log_source: &LogSource) -> Option<&'a HashSet<String>> {
+    let metadata = self.metadata.read().expect(LOCK_EXPECT);
+    for log_source_entry in metadata.log_source.iter() {
+        if log_source_entry.log_source_format == *log_source {
+            return Some(&log_source_entry.fields);
+        }
+    }
+    None
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 06beee3 and 01d7e8b.

📒 Files selected for processing (2)
  • src/migration/stream_metadata_migration.rs (3 hunks)
  • src/parseable/streams.rs (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (3)
src/migration/stream_metadata_migration.rs (2)

183-203: Implementation of v5_v6 migration looks solid.

The migration function correctly transforms a single log source format into the new array-based structure, updating necessary versions in the process. It handles the case where log_source may or may not exist, and properly initializes a default LogSourceEntry if needed.


231-256: Great test coverage for migration edge cases.

The tests comprehensively cover all important migration scenarios: existing OtelLogs source, existing Json source, and no log source. This directly addresses the previous review comment asking for better test coverage of migration edge cases.

src/parseable/streams.rs (1)

679-712: Good job fixing the potential deadlock issue in add_log_source.

This implementation properly addresses the deadlock risk mentioned in the previous review by:

  1. Taking a read lock first to check for existing log sources
  2. Releasing the read lock before calling add_fields_to_log_source which takes another lock
  3. Only acquiring a write lock when necessary
  4. Double-checking with the write lock to handle race conditions

This pattern prevents nested lock acquisition that could lead to deadlocks.

Copy link
Contributor

@de-sh de-sh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 1

🧹 Nitpick comments (4)
CONTRIBUTING.md (4)

5-5: Grammar correction needed in first paragraph

The sentence uses an incorrect verb form.

-Thank you for considering to contribute to Parseable. The goal of this document is to provide everything you need to start your contribution. We encourage all contributions, including but not limited to:
+Thank you for considering contributing to Parseable. The goal of this document is to provide everything you need to start your contribution. We encourage all contributions, including but not limited to:
🧰 Tools
🪛 LanguageTool

[grammar] ~5-~5: The verb ‘considering’ is used with the gerund form.
Context: ...contribute to Parseable. Thank you for considering to contribute to Parseable. The goal of this document...

(ADMIT_ENJOY_VB)


14-14: Typo in prerequisite statement

There's a typo in the second prerequisite that affects readability.

-You've discussed the with [Parseable community](https://logg.ing/community).
+You've discussed it with the [Parseable community](https://logg.ing/community).

15-15: Missing article before "GitHub Pull Requests"

For grammatical correctness, add the article "the" before "GitHub Pull Requests".

-You're familiar with GitHub Pull Requests(PR) workflow.
+You're familiar with the GitHub Pull Requests (PR) workflow.
🧰 Tools
🪛 LanguageTool

[uncategorized] ~15-~15: You might be missing the article “the” here.
Context: ....ing/community). - You're familiar with GitHub Pull Requests(PR) workflow. - You've re...

(AI_EN_LECTOR_MISSING_DETERMINER_THE)


25-25: Improve CLA signing instruction formatting

The CLA signing instruction would be clearer with quotation marks around the required comment text.

-You'll be asked to review & sign the [Parseable Contributor License Agreement (CLA)](https://github.com/parseablehq/.github/blob/main/CLA.md) on the GitHub PR. Please ensure that you review the document. Once you're ready, please sign the CLA by adding a comment I have read the CLA Document and I hereby sign the CLA in your Pull Request.
+You'll be asked to review & sign the [Parseable Contributor License Agreement (CLA)](https://github.com/parseablehq/.github/blob/main/CLA.md) on the GitHub PR. Please ensure that you review the document. Once you're ready, please sign the CLA by adding a comment "I have read the CLA Document and I hereby sign the CLA" in your Pull Request.
🛑 Comments failed to post (1)
src/migration/stream_metadata_migration.rs (1)

197-207: ⚠️ Potential issue

Excellent error handling improvement, but tests need updates

The change from direct unwrapping to using if let Ok(...) for log source deserialization is a robust improvement that prevents panics. However, all three tests for this function are currently failing.

The tests are failing because they expect specific output structures that need to be updated to match the new implementation. The tests should verify:

  1. Successful log source conversion to LogSourceEntry
  2. Fallback to default when deserialization fails
  3. Handling of missing log source

Run the failing tests with more verbose output to identify the exact differences:

#!/bin/bash
cargo test --package parseable --lib migration::stream_metadata_migration::tests::test_v5_v6_with_log_source -- --nocapture
cargo test --package parseable --lib migration::stream_metadata_migration::tests::test_v5_v6_with_default_log_source -- --nocapture
cargo test --package parseable --lib migration::stream_metadata_migration::tests::test_v5_v6_without_log_source -- --nocapture

@de-sh
Copy link
Contributor

de-sh commented Mar 11, 2025

nikhilsinhaparseable#10

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
src/migration/stream_metadata_migration.rs (1)

29-29: ⚠️ Potential issue

Fix syntax error: Remove stray closing bracket.

There appears to be a stray closing bracket }; on line 29 that doesn't match any opening bracket. This could lead to compilation errors.

-};
🧹 Nitpick comments (1)
src/migration/stream_metadata_migration.rs (1)

193-204: Consider handling potential corrupt data more robustly.

The migration function handles the case when log_source is missing, but should also handle potential cases where the log_source field exists but contains invalid or corrupted data. While your tests cover valid scenarios, production data may contain unexpected values.

Consider adding a test case for handling invalid log source values that might cause deserialization failures to ensure the migration is robust against all forms of input data.

🛑 Comments failed to post (2)
src/migration/stream_metadata_migration.rs (2)

178-179: ⚠️ Potential issue

Add missing closing brace for the v4_v5 function.

The v4_v5 function is missing a closing brace, which causes the last stream_metadata statement to be outside the function. This is likely why the stray bracket appears on line 29.

        }
+    }
    stream_metadata
}

Committable suggestion skipped: line range outside the PR's diff.


181-206: 🛠️ Refactor suggestion

Improve error handling in the migration function.

The v5_v6 function uses unwrap() when deserializing the log source, which could panic if the deserialization fails. For a migration function that handles user data, it would be better to handle deserialization errors gracefully.

 pub fn v5_v6(mut stream_metadata: Value) -> Value {
     let stream_metadata_map: &mut serde_json::Map<String, Value> =
         stream_metadata.as_object_mut().unwrap();
     stream_metadata_map.insert(
         "objectstore-format".to_owned(),
         Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()),
     );
     stream_metadata_map.insert(
         "version".to_owned(),
         Value::String(storage::CURRENT_SCHEMA_VERSION.into()),
     );
     let log_source = stream_metadata_map.get("log_source");
     let mut log_source_entry = LogSourceEntry::default();

     match log_source {
         Some(stream_log_source) => {
-            let log_source: LogSource = serde_json::from_value(stream_log_source.clone()).unwrap();
-            log_source_entry.add_log_source(log_source, vec![]);
+            match serde_json::from_value::<LogSource>(stream_log_source.clone()) {
+                Ok(log_source) => {
+                    log_source_entry.add_log_source(log_source, vec![]);
+                }
+                Err(_) => {
+                    // Log error but continue with default
+                    log::warn!("Failed to deserialize log source, using default");
+                }
+            }
             stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
         }
         None => {
             stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
         }
     }
     stream_metadata
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

}

pub fn v5_v6(mut stream_metadata: Value) -> Value {
    let stream_metadata_map: &mut serde_json::Map<String, Value> =
        stream_metadata.as_object_mut().unwrap();
    stream_metadata_map.insert(
        "objectstore-format".to_owned(),
        Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()),
    );
    stream_metadata_map.insert(
        "version".to_owned(),
        Value::String(storage::CURRENT_SCHEMA_VERSION.into()),
    );
    let log_source = stream_metadata_map.get("log_source");
    let mut log_source_entry = LogSourceEntry::default();

    match log_source {
        Some(stream_log_source) => {
            match serde_json::from_value::<LogSource>(stream_log_source.clone()) {
                Ok(log_source) => {
                    log_source_entry.add_log_source(log_source, vec![]);
                }
                Err(_) => {
                    // Log error but continue with default
                    log::warn!("Failed to deserialize log source, using default");
                }
            }
            stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
        }
        None => {
            stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
        }
    }
    stream_metadata
}

fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value {
    let manifest_list = snapshot.get("manifest_list").unwrap();
    let mut new_manifest_list = Vec::new();

nikhilsinhaparseable and others added 13 commits March 13, 2025 02:06
updated to store multiple log source formats
along with the known fields list

added fields list for otel-logs, otel-traces and otel-metrics
added logic to add log source,
add fields to existing log source,
merge log sources when fetching from storage (in distributed)
refactor: use owned type instead of cloning
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/handlers/http/ingest.rs (1)

132-222: Consider reducing duplicated code for OTEL ingestion.

Although each OTEL ingestion handler needs its own known field list, the repeated pattern of creating a LogSourceEntry and invoking create_stream_if_not_exists could be lifted into a helper function. This would follow DRY principles and streamline future maintenance.

src/migration/mod.rs (1)

230-296: Stepwise migration handles multiple versions well, but is repetitive.

Your match-arm logic handles migrations from v1 through v5 thoroughly by calling the corresponding migration functions and updating the schema and stream metadata. While the functionality looks accurate, there is repeated code invoking v4_v5, v5_v6, and storage updates. Consolidating or abstracting repeated portions could reduce code duplication.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4f7ff4a and d42ad0b.

📒 Files selected for processing (14)
  • src/connectors/kafka/processor.rs (2 hunks)
  • src/event/format/mod.rs (2 hunks)
  • src/handlers/http/ingest.rs (6 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/metadata.rs (3 hunks)
  • src/migration/mod.rs (3 hunks)
  • src/migration/stream_metadata_migration.rs (3 hunks)
  • src/otel/logs.rs (1 hunks)
  • src/otel/metrics.rs (1 hunks)
  • src/otel/traces.rs (1 hunks)
  • src/parseable/mod.rs (9 hunks)
  • src/parseable/streams.rs (3 hunks)
  • src/storage/mod.rs (5 hunks)
  • src/storage/object_storage.rs (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/metadata.rs
  • src/otel/metrics.rs
  • src/otel/logs.rs
  • src/otel/traces.rs
  • src/handlers/http/logstream.rs
  • src/storage/mod.rs
  • src/connectors/kafka/processor.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (26)
src/migration/stream_metadata_migration.rs (2)

183-202: Well-implemented migration function with good error handling.

The new v5_v6 migration function correctly transforms the log source structure from a single value to an array of LogSourceEntry objects, maintaining backward compatibility. The error handling properly defaults to the LogSourceEntry::default() when deserialization fails or no log source is present.


232-270: Comprehensive test coverage for migration edge cases.

The test suite thoroughly covers all relevant scenarios:

  • Migration with a valid log source (test_v5_v6_with_log_source)
  • Migration with the default log source (test_v5_v6_with_default_log_source)
  • Migration with no existing log source (test_v5_v6_without_log_source)
  • Migration with an unknown/invalid log source format (test_v5_v6_unknown_log_source)
  • Migration with a malformed log source object (test_v5_v6_invalid_log_source)

This robust test coverage ensures the migration function handles all edge cases correctly.

src/event/format/mod.rs (2)

47-47: Hash trait added to LogSource correctly.

Adding the Hash trait derivation to the LogSource enum enables its use in hash-based collections like HashMap and HashSet. This is a necessary change to support storing log sources as keys in the new implementation.


95-111: Good implementation of LogSourceEntry struct with clear documentation.

The new LogSourceEntry struct properly encapsulates a log source format with its associated field names. The default implementation and constructor method provide convenient ways to create instances. The documentation clearly explains the purpose and storage details of this struct.

src/storage/object_storage.rs (3)

273-285: Well-designed method for updating log sources.

The update_log_source_in_stream method efficiently updates the log source entries for a stream. It retrieves the current format, updates the log source field, and persists the changes back to storage.


563-610: Robust implementation for merging log sources during stream creation.

The enhanced create_stream_from_ingestor method now properly handles multiple log sources by:

  1. Collecting all log sources from multiple metadata observations
  2. Merging fields from log sources with the same format
  3. Creating a consolidated list of unique LogSourceEntry instances

This approach ensures no field information is lost when aggregating data from distributed environments.


690-723: Efficient implementation of log source retrieval and merging.

The get_log_source_from_storage method effectively:

  1. Collects all log sources from stream metadata
  2. Merges fields of duplicate log source formats into a single entry
  3. Returns a deduplicated list of log source entries

This implementation avoids redundant storage and ensures all fields are preserved when retrieving log source information.

src/parseable/streams.rs (3)

679-686: Simple and effective getter/setter for log sources.

These methods provide clean access to the log source entries stored in the stream's metadata.


687-712: Safe implementation for adding log sources with proper locking.

The method correctly uses a read-lock first to check if the log source format already exists, then drops that lock before potentially taking a write lock. The double-check pattern prevents race conditions where another thread might have added the same log source between the locks. This addresses the potential deadlock risk mentioned in previous reviews.


714-732: Utility methods for field manipulation look good.

The add_fields_to_log_source and get_fields_from_log_source methods provide convenient ways to modify and access the fields associated with a specific log source format.

src/handlers/http/ingest.rs (5)

19-37: No issues with new imports.

These added imports are straightforward, align with the usage of HashMap, HashSet, LogSource, and LogSourceEntry, and do not introduce any security or performance concerns.


75-83: Stream creation with empty field set looks good.

Here, you wrap the cloned log_source into a LogSourceEntry with no known fields, then invoke create_stream_if_not_exists. This straightforwardly initializes a custom log source for standard ingestion, and the error handling on .await? is properly performed.


132-144: OTEL logs ingestion logic is correct.

You generate a LogSourceEntry using OTEL_LOG_KNOWN_FIELD_LIST and then call create_stream_if_not_exists. The approach effectively sets up a specialized OTEL logs stream with known fields. Error handling is properly handled.


170-181: OTEL metrics ingestion logic is consistent.

Similarly to OTEL logs, you prepare a LogSourceEntry using OTEL_METRICS_KNOWN_FIELD_LIST before creating the stream. This keeps the known fields centralized and explicit.


209-222: OTEL traces ingestion logic is consistent.

Same reasoning applies here as with OTEL logs/metrics. The code properly creates a specialized OTEL traces stream through create_stream_if_not_exists.

src/migration/mod.rs (5)

156-174: Migration logic correctly fetches schema and metadata.

By first calling fetch_or_create_schema and fetch_or_create_stream_metadata, you neatly centralize the retrieval of necessary data for migration. The checks for empty metadata and short-circuit for non-ingest mode appear correct.


175-182: Proper fallback for initializing an empty schema.

If arrow_schema is empty, the code deserializes from the fetched schema bytes, ensuring a valid schema for downstream usage. This approach protects against potential NPE-like scenarios.


184-205: Graceful fallback to querier or ingestor for schema creation.

The function attempts to load schema from storage first, then falls back to create_schema_from_querier or create_schema_from_ingestor when storage retrieval fails. This is a sensible approach and correctly returns an empty schema if none is found.


207-228: Fetching or creating stream metadata follows the same robust approach.

Mirroring the schema logic, the code tries retrieving the existing metadata, then attempts from querier or ingestor if missing. This ensures continuity for distributed setups.


298-353: setup_logstream_metadata accurately constructs LogStreamMetadata.

After retrieving relevant fields from ObjectStoreFormat, you call update_data_type_time_partition, save the schema, and finalize the LogStreamMetadata. The overall flow is clear with correct interactions among storage, arrow schema, and internal in-memory data.

src/parseable/mod.rs (6)

20-26: Trivial import additions.

These imports from std::collections and related crates align with the newly introduced storage of log source entries. No issues noted.


44-44: Added imports for LogSource and LogSourceEntry.

No concerns. This import is needed to utilize multiple log source entries throughout the module.


355-360: Properly creating a LogSourceEntry for internal usage.

Using LogSource::Pmeta with an empty field set is consistent with how internal streams do not require external known fields.


384-391: Updated method signature to accept multiple log sources.

Adopting Vec<LogSourceEntry> in create_stream_if_not_exists accounts for the multi-log-source refactor. The loop then calls add_update_log_source for each entry, ensuring incremental updates if the stream already exists.


421-470: add_update_log_source logic is well-structured.

The method merges new fields into existing log sources or appends a new log source if not present. Only persisting changes on actual modifications prevents unnecessary writes. This approach is both efficient and clear.


825-843: update_log_source ensures direct replacement.

This function straightforwardly replaces the entire log source array with the new entries and persists them. It complements add_update_log_source for cases where a direct overwrite is needed.

@nitisht nitisht merged commit db4a68d into parseablehq:main Mar 14, 2025
14 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Apr 7, 2025
@nikhilsinhaparseable nikhilsinhaparseable deleted the multiple-log-sources branch July 12, 2025 09:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants