diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index b74754003..10025ed98 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -28,7 +28,7 @@ use tracing::{debug, error}; use crate::{ connectors::common::processor::Processor, event::{ - format::{json, EventFormat, LogSource}, + format::{json, EventFormat, LogSourceEntry}, Event as ParseableEvent, }, parseable::PARSEABLE, @@ -49,9 +49,14 @@ impl ParseableSinkProcessor { .first() .map(|r| r.topic.as_str()) .unwrap_or_default(); + let log_source_entry = LogSourceEntry::default(); PARSEABLE - .create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json) + .create_stream_if_not_exists( + stream_name, + StreamType::UserDefined, + vec![log_source_entry], + ) .await?; let stream = PARSEABLE.get_stream(stream_name)?; diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index ce90cfc52..6e9aca131 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -44,7 +44,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"]; type EventSchema = Vec>; /// Source of the logs, used to perform special processing for certain sources -#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum LogSource { // AWS Kinesis sends logs in the format of a json array Kinesis, @@ -92,6 +92,23 @@ impl Display for LogSource { } } +/// Contains the format name and a list of known field names that are associated with the said format. +/// Stored on disk as part of `ObjectStoreFormat` in stream.json +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LogSourceEntry { + pub log_source_format: LogSource, + pub fields: HashSet, +} + +impl LogSourceEntry { + pub fn new(log_source_format: LogSource, fields: HashSet) -> Self { + LogSourceEntry { + log_source_format, + fields, + } + } +} + // Global Trait for event format // This trait is implemented by all the event formats pub trait EventFormat: Sized { diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 0523e8757..1e5e6d048 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -16,7 +16,7 @@ * */ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use actix_web::web::{Json, Path}; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; @@ -28,10 +28,13 @@ use serde_json::Value; use crate::event; use crate::event::error::EventError; -use crate::event::format::{self, EventFormat, LogSource}; +use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::SchemaVersion; use crate::option::Mode; +use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; +use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; +use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; @@ -55,9 +58,6 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result) -> Result) -> Result, } impl LogStreamMetadata { @@ -101,7 +101,7 @@ impl LogStreamMetadata { static_schema: HashMap>, stream_type: StreamType, schema_version: SchemaVersion, - log_source: LogSource, + log_source: Vec, ) -> Self { LogStreamMetadata { created_at: if created_at.is_empty() { diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 5e82ec0d6..20c159a63 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -153,116 +153,153 @@ async fn migration_stream( ) -> anyhow::Result> { let mut arrow_schema: Schema = Schema::empty(); - //check if schema exists for the node - //if not, create schema from querier schema from storage - //if not present with querier, create schema from ingestor schema from storage + let schema = fetch_or_create_schema(stream, storage).await?; + let stream_metadata = fetch_or_create_stream_metadata(stream, storage).await?; + + let mut stream_meta_found = true; + if stream_metadata.is_empty() { + if PARSEABLE.options.mode != Mode::Ingest { + return Ok(None); + } + stream_meta_found = false; + } + + let mut stream_metadata_value = Value::Null; + if stream_meta_found { + stream_metadata_value = + serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); + stream_metadata_value = + migrate_stream_metadata(stream_metadata_value, stream, storage, &schema).await?; + } + + if arrow_schema.fields().is_empty() { + arrow_schema = serde_json::from_slice(&schema)?; + } + + let metadata = + setup_logstream_metadata(stream, &mut arrow_schema, stream_metadata_value).await?; + Ok(Some(metadata)) +} + +async fn fetch_or_create_schema( + stream: &str, + storage: &dyn ObjectStorage, +) -> anyhow::Result { let schema_path = schema_path(stream); - let schema = if let Ok(schema) = storage.get_object(&schema_path).await { - schema + if let Ok(schema) = storage.get_object(&schema_path).await { + Ok(schema) } else { let querier_schema = storage .create_schema_from_querier(stream) .await .unwrap_or_default(); if !querier_schema.is_empty() { - querier_schema + Ok(querier_schema) } else { - storage + Ok(storage .create_schema_from_ingestor(stream) .await - .unwrap_or_default() + .unwrap_or_default()) } - }; + } +} - //check if stream.json exists for the node - //if not, create stream.json from querier stream.json from storage - //if not present with querier, create from ingestor stream.json from storage +async fn fetch_or_create_stream_metadata( + stream: &str, + storage: &dyn ObjectStorage, +) -> anyhow::Result { let path = stream_json_path(stream); - let stream_metadata = if let Ok(stream_metadata) = storage.get_object(&path).await { - stream_metadata + if let Ok(stream_metadata) = storage.get_object(&path).await { + Ok(stream_metadata) } else { let querier_stream = storage .create_stream_from_querier(stream) .await .unwrap_or_default(); if !querier_stream.is_empty() { - querier_stream + Ok(querier_stream) } else { - storage + Ok(storage .create_stream_from_ingestor(stream) .await - .unwrap_or_default() - } - }; - - let mut stream_meta_found = true; - if stream_metadata.is_empty() { - if PARSEABLE.options.mode != Mode::Ingest { - return Ok(None); + .unwrap_or_default()) } - stream_meta_found = false; } - let mut stream_metadata_value = Value::Null; - if stream_meta_found { - stream_metadata_value = - serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); - let version = stream_metadata_value - .as_object() - .and_then(|meta| meta.get("version")) - .and_then(|version| version.as_str()); +} - match version { - Some("v1") => { - stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::v4_v5(stream_metadata_value, stream); - storage - .put_object(&path, to_bytes(&stream_metadata_value)) - .await?; - let schema = serde_json::from_slice(&schema).ok(); - arrow_schema = schema_migration::v1_v4(schema)?; - storage - .put_object(&schema_path, to_bytes(&arrow_schema)) - .await?; - } - Some("v2") => { - stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::v4_v5(stream_metadata_value, stream); - storage - .put_object(&path, to_bytes(&stream_metadata_value)) - .await?; - - let schema = serde_json::from_slice(&schema)?; - arrow_schema = schema_migration::v2_v4(schema)?; - storage - .put_object(&schema_path, to_bytes(&arrow_schema)) - .await?; - } - Some("v3") => { - stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); - stream_metadata_value = - stream_metadata_migration::v4_v5(stream_metadata_value, stream); - storage - .put_object(&path, to_bytes(&stream_metadata_value)) - .await?; - } - Some("v4") => { - stream_metadata_value = - stream_metadata_migration::v4_v5(stream_metadata_value, stream); - storage - .put_object(&path, to_bytes(&stream_metadata_value)) - .await?; - } - _ => (), +async fn migrate_stream_metadata( + mut stream_metadata_value: Value, + stream: &str, + storage: &dyn ObjectStorage, + schema: &Bytes, +) -> anyhow::Result { + let path = stream_json_path(stream); + let schema_path = schema_path(stream); + + let version = stream_metadata_value + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + + match version { + Some("v1") => { + stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); + stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + let schema = serde_json::from_slice(schema).ok(); + let arrow_schema = schema_migration::v1_v4(schema)?; + storage + .put_object(&schema_path, to_bytes(&arrow_schema)) + .await?; + } + Some("v2") => { + stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); + stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + let schema = serde_json::from_slice(schema)?; + let arrow_schema = schema_migration::v2_v4(schema)?; + storage + .put_object(&schema_path, to_bytes(&arrow_schema)) + .await?; + } + Some("v3") => { + stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); + stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + } + Some("v4") => { + stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); + stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; + } + Some("v5") => { + stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + storage + .put_object(&path, to_bytes(&stream_metadata_value)) + .await?; } + _ => (), } - if arrow_schema.fields().is_empty() { - arrow_schema = serde_json::from_slice(&schema)?; - } + Ok(stream_metadata_value) +} - // Setup logstream meta on startup +async fn setup_logstream_metadata( + stream: &str, + arrow_schema: &mut Schema, + stream_metadata_value: Value, +) -> anyhow::Result { let ObjectStoreFormat { schema_version, created_at, @@ -279,19 +316,17 @@ async fn migration_stream( log_source, .. } = serde_json::from_value(stream_metadata_value).unwrap_or_default(); + let storage = PARSEABLE.storage.get_object_store(); - // update the schema and store it back - // NOTE: write could be saved, but the cost is cheap, given the low possibilities of being called multiple times - update_data_type_time_partition(&mut arrow_schema, time_partition.as_ref()).await?; - storage.put_schema(stream, &arrow_schema).await?; - //load stats from storage + update_data_type_time_partition(arrow_schema, time_partition.as_ref()).await?; + storage.put_schema(stream, arrow_schema).await?; fetch_stats_from_storage(stream, stats).await; load_daily_metrics(&snapshot.manifest_list, stream); let schema = PARSEABLE .get_or_create_stream(stream) - .updated_schema(arrow_schema); + .updated_schema(arrow_schema.clone()); let schema = HashMap::from_iter( schema .fields @@ -314,7 +349,7 @@ async fn migration_stream( log_source, }; - Ok(Some(metadata)) + Ok(metadata) } #[inline(always)] diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index 82c676f82..ab29e124b 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -17,10 +17,14 @@ * */ +use std::collections::HashSet; + use serde_json::{json, Value}; use crate::{ - catalog::snapshot::CURRENT_SNAPSHOT_VERSION, handlers::http::cluster::INTERNAL_STREAM_NAME, + catalog::snapshot::CURRENT_SNAPSHOT_VERSION, + event::format::{LogSource, LogSourceEntry}, + handlers::http::cluster::INTERNAL_STREAM_NAME, storage, }; @@ -176,6 +180,27 @@ pub fn v4_v5(mut stream_metadata: Value, stream_name: &str) -> Value { stream_metadata } +pub fn v5_v6(mut stream_metadata: Value) -> Value { + let stream_metadata_map: &mut serde_json::Map = + stream_metadata.as_object_mut().unwrap(); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + let mut log_source_entry = LogSourceEntry::default(); + if let Some(log_source) = stream_metadata_map.get("log_source") { + if let Ok(log_source) = serde_json::from_value::(log_source.clone()) { + log_source_entry = LogSourceEntry::new(log_source, HashSet::new()); + } + } + stream_metadata_map.insert("log_source".to_owned(), json!([log_source_entry])); + stream_metadata +} + fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { let manifest_list = snapshot.get("manifest_list").unwrap(); let mut new_manifest_list = Vec::new(); @@ -201,3 +226,46 @@ fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { snapshot_map.insert("manifest_list".to_owned(), Value::Array(new_manifest_list)); snapshot } + +#[cfg(test)] +mod tests { + #[test] + fn test_v5_v6_with_log_source() { + let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"OtelLogs"}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"OtelLogs","fields":[]}]}); + let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v5_v6_with_default_log_source() { + let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Json"}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v5_v6_without_log_source() { + let stream_metadata = serde_json::json!({"version":"v4","schema_version":"v0","objectstore-format":"v4","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined"}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v5_v6_unknown_log_source() { + let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Invalid"}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v5_v6_invalid_log_source() { + let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":{"log_source": "Invalid"}}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } +} diff --git a/src/otel/logs.rs b/src/otel/logs.rs index 969758d5a..cc7fe327f 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -27,6 +27,14 @@ use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; +pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ + "time_unix_nano", + "severity_number", + "severity_text", + "body", + "span_id", + "trace_id", +]; /// otel log event has severity number /// there is a mapping of severity number to severity text provided in proto /// this function fetches the severity text from the severity number diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index aa621b03e..f35a7f88a 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -27,6 +27,14 @@ use super::otel_utils::{ convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, }; +pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [ + "metric_name", + "metric_description", + "metric_unit", + "start_time_unix_nano", + "time_unix_nano", +]; + /// otel metrics event has json array for exemplar /// this function flatten the exemplar json array /// and returns a `Map` of the exemplar json diff --git a/src/otel/traces.rs b/src/otel/traces.rs index ebea012a6..596e788fc 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -27,6 +27,23 @@ use serde_json::{Map, Value}; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; +pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ + "span_trace_id", + "span_span_id", + "span_name", + "span_parent_span_id", + "flags", + "name", + "span_kind", + "span_kind_description", + "span_start_time_unix_nano", + "span_end_time_unix_nano", + "event_name", + "event_time_unix_nano", + "span_status_code", + "span_status_description", + "span_status_message", +]; /// this function flattens the `ScopeSpans` object /// and returns a `Vec` of `Map` of the flattened json fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 25558a46d..443445474 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -17,7 +17,13 @@ * */ -use std::{collections::HashMap, num::NonZeroU32, path::PathBuf, str::FromStr, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + num::NonZeroU32, + path::PathBuf, + str::FromStr, + sync::Arc, +}; use actix_web::http::header::HeaderMap; use arrow_schema::{Field, Schema}; @@ -35,7 +41,7 @@ use tracing::error; use crate::connectors::kafka::config::KafkaConfig; use crate::{ cli::{Cli, Options, StorageOptions}, - event::format::LogSource, + event::format::{LogSource, LogSourceEntry}, handlers::{ http::{ cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}, @@ -346,11 +352,12 @@ impl Parseable { } pub async fn create_internal_stream_if_not_exists(&self) -> Result<(), StreamError> { + let log_source_entry = LogSourceEntry::new(LogSource::Pmeta, HashSet::new()); match self .create_stream_if_not_exists( INTERNAL_STREAM_NAME, StreamType::Internal, - LogSource::Pmeta, + vec![log_source_entry], ) .await { @@ -374,9 +381,14 @@ impl Parseable { &self, stream_name: &str, stream_type: StreamType, - log_source: LogSource, + log_source: Vec, ) -> Result { if self.streams.contains(stream_name) { + for stream_log_source in log_source { + self.add_update_log_source(stream_name, stream_log_source) + .await?; + } + return Ok(true); } @@ -406,6 +418,57 @@ impl Parseable { Ok(false) } + pub async fn add_update_log_source( + &self, + stream_name: &str, + log_source: LogSourceEntry, + ) -> Result<(), StreamError> { + let stream = self.get_stream(stream_name).expect(STREAM_EXISTS); + let mut log_sources = stream.get_log_source(); + let mut changed = false; + + // Try to find existing log source with the same format + if let Some(stream_log_source) = log_sources + .iter_mut() + .find(|source| source.log_source_format == log_source.log_source_format) + { + // Use a HashSet to efficiently track only new fields + let existing_fields: HashSet = + stream_log_source.fields.iter().cloned().collect(); + let new_fields: HashSet = log_source + .fields + .iter() + .filter(|field| !existing_fields.contains(*field)) + .cloned() + .collect(); + + // Only update if there are new fields to add + if !new_fields.is_empty() { + stream_log_source.fields.extend(new_fields); + changed = true; + } + } else { + // If no matching log source found, add the new one + log_sources.push(log_source); + changed = true; + } + + // Only persist to storage if we made changes + if changed { + stream.set_log_source(log_sources.clone()); + + let storage = self.storage.get_object_store(); + if let Err(err) = storage + .update_log_source_in_stream(stream_name, &log_sources) + .await + { + return Err(StreamError::Storage(err)); + } + } + + Ok(()) + } + pub async fn create_update_stream( &self, headers: &HeaderMap, @@ -469,7 +532,7 @@ impl Parseable { custom_partition.as_ref(), static_schema_flag, )?; - + let log_source_entry = LogSourceEntry::new(log_source, HashSet::new()); self.create_stream( stream_name.to_string(), &time_partition, @@ -478,7 +541,7 @@ impl Parseable { static_schema_flag, schema, stream_type, - log_source, + vec![log_source_entry], ) .await?; @@ -534,7 +597,7 @@ impl Parseable { static_schema_flag: bool, schema: Arc, stream_type: StreamType, - log_source: LogSource, + log_source: Vec, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal { @@ -759,6 +822,25 @@ impl Parseable { Some(first_event_at.to_string()) } + + pub async fn update_log_source( + &self, + stream_name: &str, + log_source: Vec, + ) -> Result<(), StreamError> { + let storage = self.storage.get_object_store(); + if let Err(err) = storage + .update_log_source_in_stream(stream_name, &log_source) + .await + { + return Err(StreamError::Storage(err)); + } + + let stream = self.get_stream(stream_name).expect(STREAM_EXISTS); + stream.set_log_source(log_source); + + Ok(()) + } } pub fn validate_static_schema( diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 0a1b0ba96..b21f1ee20 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -18,7 +18,7 @@ */ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fs::{remove_file, write, File, OpenOptions}, num::NonZeroU32, path::{Path, PathBuf}, @@ -47,7 +47,10 @@ use tracing::{error, info, trace, warn}; use crate::{ cli::Options, - event::DEFAULT_TIMESTAMP_KEY, + event::{ + format::{LogSource, LogSourceEntry}, + DEFAULT_TIMESTAMP_KEY, + }, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, @@ -673,6 +676,61 @@ impl Stream { self.metadata.read().expect(LOCK_EXPECT).stream_type } + pub fn set_log_source(&self, log_source: Vec) { + self.metadata.write().expect(LOCK_EXPECT).log_source = log_source; + } + + pub fn get_log_source(&self) -> Vec { + self.metadata.read().expect(LOCK_EXPECT).log_source.clone() + } + + pub fn add_log_source(&self, log_source: LogSourceEntry) { + let metadata = self.metadata.read().expect(LOCK_EXPECT); + for existing in &metadata.log_source { + if existing.log_source_format == log_source.log_source_format { + drop(metadata); + self.add_fields_to_log_source( + &log_source.log_source_format, + log_source.fields.clone(), + ); + return; + } + } + drop(metadata); + + let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + for existing in &metadata.log_source { + if existing.log_source_format == log_source.log_source_format { + self.add_fields_to_log_source( + &log_source.log_source_format, + log_source.fields.clone(), + ); + return; + } + } + metadata.log_source.push(log_source); + } + + pub fn add_fields_to_log_source(&self, log_source: &LogSource, fields: HashSet) { + let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + for log_source_entry in metadata.log_source.iter_mut() { + if log_source_entry.log_source_format == *log_source { + log_source_entry.fields.extend(fields); + return; + } + } + } + + pub fn get_fields_from_log_source(&self, log_source: &LogSource) -> Option> { + let metadata = self.metadata.read().expect(LOCK_EXPECT); + for log_source_entry in metadata.log_source.iter() { + if log_source_entry.log_source_format == *log_source { + return Some(log_source_entry.fields.clone()); + } + } + None + } + /// First flushes arrows onto disk and then converts the arrow into parquet pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> { self.flush(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6c138fa84..6a272c8e4 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -23,7 +23,7 @@ use tokio::task::JoinError; use crate::{ catalog::snapshot::Snapshot, - event::format::LogSource, + event::format::LogSourceEntry, metadata::SchemaVersion, option::StandaloneWithDistributed, parseable::StreamNotFound, @@ -71,8 +71,8 @@ const MAX_OBJECT_STORE_REQUESTS: usize = 1000; // const PERMISSIONS_READ_WRITE: &str = "readwrite"; const ACCESS_ALL: &str = "all"; -pub const CURRENT_OBJECT_STORE_VERSION: &str = "v5"; -pub const CURRENT_SCHEMA_VERSION: &str = "v5"; +pub const CURRENT_OBJECT_STORE_VERSION: &str = "v6"; +pub const CURRENT_SCHEMA_VERSION: &str = "v6"; const CONNECT_TIMEOUT_SECS: u64 = 5; const REQUEST_TIMEOUT_SECS: u64 = 300; @@ -117,7 +117,7 @@ pub struct ObjectStoreFormat { #[serde(default)] pub stream_type: StreamType, #[serde(default)] - pub log_source: LogSource, + pub log_source: Vec, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -142,7 +142,7 @@ pub struct StreamInfo { pub static_schema_flag: bool, #[serde(default)] pub stream_type: StreamType, - pub log_source: LogSource, + pub log_source: Vec, } #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] @@ -219,7 +219,7 @@ impl Default for ObjectStoreFormat { custom_partition: None, static_schema_flag: false, hot_tier_enabled: false, - log_source: LogSource::default(), + log_source: vec![LogSourceEntry::default()], } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 5b2984890..680d20f3e 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -45,6 +45,8 @@ use ulid::Ulid; use crate::alerts::AlertConfig; use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; +use crate::event::format::LogSource; +use crate::event::format::LogSourceEntry; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::users::CORRELATION_DIR; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; @@ -268,6 +270,20 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + async fn update_log_source_in_stream( + &self, + stream_name: &str, + log_source: &[LogSourceEntry], + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + format.log_source = log_source.to_owned(); + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + /// Updates the first event timestamp in the object store for the specified stream. /// /// This function retrieves the current object-store format for the given stream, @@ -544,6 +560,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { stream_name: &str, ) -> Result { let stream_path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + let mut all_log_sources: Vec = Vec::new(); + if let Some(stream_metadata_obs) = self .get_objects( Some(&stream_path), @@ -556,12 +574,39 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .next() { if !stream_metadata_obs.is_empty() { - let stream_metadata_bytes = &stream_metadata_obs[0]; + for stream_metadata_bytes in stream_metadata_obs.iter() { + let stream_ob_metadata = + serde_json::from_slice::(stream_metadata_bytes)?; + all_log_sources.extend(stream_ob_metadata.log_source.clone()); + } + + // Merge log sources + let mut merged_log_sources: Vec = Vec::new(); + let mut log_source_map: HashMap> = HashMap::new(); + + for log_source_entry in all_log_sources { + let log_source_format = log_source_entry.log_source_format; + let fields = log_source_entry.fields; + + log_source_map + .entry(log_source_format) + .or_default() + .extend(fields); + } + + for (log_source_format, fields) in log_source_map { + merged_log_sources.push(LogSourceEntry { + log_source_format, + fields: fields.into_iter().collect(), + }); + } + let stream_ob_metadata = - serde_json::from_slice::(stream_metadata_bytes)?; + serde_json::from_slice::(&stream_metadata_obs[0])?; let stream_metadata = ObjectStoreFormat { stats: FullStats::default(), snapshot: Snapshot::default(), + log_source: merged_log_sources, ..stream_ob_metadata }; @@ -642,6 +687,41 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(stream_metas) } + async fn get_log_source_from_storage( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let mut all_log_sources: Vec = Vec::new(); + let stream_metas = self.get_stream_meta_from_storage(stream_name).await; + if let Ok(stream_metas) = stream_metas { + for stream_meta in stream_metas.iter() { + // fetch unique log sources and their fields + all_log_sources.extend(stream_meta.log_source.clone()); + } + } + + //merge fields of same log source + let mut merged_log_sources: Vec = Vec::new(); + let mut log_source_map: HashMap> = HashMap::new(); + for log_source_entry in all_log_sources { + let log_source_format = log_source_entry.log_source_format; + let fields = log_source_entry.fields; + + log_source_map + .entry(log_source_format) + .or_default() + .extend(fields); + } + + for (log_source_format, fields) in log_source_map { + merged_log_sources.push(LogSourceEntry { + log_source_format, + fields: fields.into_iter().collect(), + }); + } + Ok(merged_log_sources) + } + /// Retrieves the earliest first-event-at from the storage for the specified stream. /// /// This function fetches the object-store format from all the stream.json files for the given stream from the storage,