Skip to content

update log source in stream info #1231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing::{debug, error};
use crate::{
connectors::common::processor::Processor,
event::{
format::{json, EventFormat, LogSource},
format::{json, EventFormat, LogSourceEntry},
Event as ParseableEvent,
},
parseable::PARSEABLE,
Expand All @@ -49,9 +49,14 @@ impl ParseableSinkProcessor {
.first()
.map(|r| r.topic.as_str())
.unwrap_or_default();
let log_source_entry = LogSourceEntry::default();

PARSEABLE
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
.create_stream_if_not_exists(
stream_name,
StreamType::UserDefined,
vec![log_source_entry],
)
.await?;

let stream = PARSEABLE.get_stream(stream_name)?;
Expand Down
19 changes: 18 additions & 1 deletion src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];
type EventSchema = Vec<Arc<Field>>;

/// Source of the logs, used to perform special processing for certain sources
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub enum LogSource {
// AWS Kinesis sends logs in the format of a json array
Kinesis,
Expand Down Expand Up @@ -92,6 +92,23 @@ impl Display for LogSource {
}
}

/// Contains the format name and a list of known field names that are associated with the said format.
/// Stored on disk as part of `ObjectStoreFormat` in stream.json
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LogSourceEntry {
pub log_source_format: LogSource,
pub fields: HashSet<String>,
}

impl LogSourceEntry {
pub fn new(log_source_format: LogSource, fields: HashSet<String>) -> Self {
LogSourceEntry {
log_source_format,
fields,
}
}
}

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
Expand Down
56 changes: 48 additions & 8 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use actix_web::web::{Json, Path};
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
Expand All @@ -28,10 +28,13 @@ use serde_json::Value;

use crate::event;
use crate::event::error::EventError;
use crate::event::format::{self, EventFormat, LogSource};
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
use crate::metadata::SchemaVersion;
use crate::option::Mode;
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
Expand All @@ -55,9 +58,6 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
if internal_stream_names.contains(&stream_name) {
return Err(PostError::InternalStream(stream_name));
}
PARSEABLE
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::default())
.await?;

let log_source = req
.headers()
Expand All @@ -72,6 +72,15 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
return Err(PostError::OtelNotSupported);
}

let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
PARSEABLE
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
vec![log_source_entry],
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -119,8 +128,20 @@ pub async fn handle_otel_logs_ingestion(
}

let stream_name = stream_name.to_str().unwrap().to_owned();

let log_source_entry = LogSourceEntry::new(
log_source.clone(),
OTEL_LOG_KNOWN_FIELD_LIST
.iter()
.map(|&s| s.to_string())
.collect(),
);
PARSEABLE
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
vec![log_source_entry],
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
Expand All @@ -146,11 +167,18 @@ pub async fn handle_otel_metrics_ingestion(
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
}
let stream_name = stream_name.to_str().unwrap().to_owned();
let log_source_entry = LogSourceEntry::new(
log_source.clone(),
OTEL_METRICS_KNOWN_FIELD_LIST
.iter()
.map(|&s| s.to_string())
.collect(),
);
PARSEABLE
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
LogSource::OtelMetrics,
vec![log_source_entry],
)
.await?;

Expand Down Expand Up @@ -178,8 +206,20 @@ pub async fn handle_otel_traces_ingestion(
return Err(PostError::IncorrectLogSource(LogSource::OtelTraces));
}
let stream_name = stream_name.to_str().unwrap().to_owned();
let log_source_entry = LogSourceEntry::new(
log_source.clone(),
OTEL_TRACES_KNOWN_FIELD_LIST
.iter()
.map(|&s| s.to_string())
.collect(),
);

PARSEABLE
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
.create_stream_if_not_exists(
&stream_name,
StreamType::UserDefined,
vec![log_source_entry],
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
Expand Down
8 changes: 8 additions & 0 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,14 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
None
};

let stream_log_source = storage
.get_log_source_from_storage(&stream_name)
.await
.unwrap_or_default();
PARSEABLE
.update_log_source(&stream_name, stream_log_source)
.await?;

let hash_map = PARSEABLE.streams.read().unwrap();
let stream_meta = hash_map
.get(&stream_name)
Expand Down
6 changes: 3 additions & 3 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::num::NonZeroU32;
use std::sync::Arc;

use crate::catalog::snapshot::ManifestItem;
use crate::event::format::LogSource;
use crate::event::format::LogSourceEntry;
use crate::metrics::{
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct LogStreamMetadata {
pub static_schema_flag: bool,
pub hot_tier_enabled: bool,
pub stream_type: StreamType,
pub log_source: LogSource,
pub log_source: Vec<LogSourceEntry>,
}

impl LogStreamMetadata {
Expand All @@ -101,7 +101,7 @@ impl LogStreamMetadata {
static_schema: HashMap<String, Arc<Field>>,
stream_type: StreamType,
schema_version: SchemaVersion,
log_source: LogSource,
log_source: Vec<LogSourceEntry>,
) -> Self {
LogStreamMetadata {
created_at: if created_at.is_empty() {
Expand Down
Loading
Loading