diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 10025ed98..09d826b46 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -16,12 +16,12 @@ * */ -use std::sync::Arc; - use async_trait::async_trait; use futures_util::StreamExt; use rdkafka::consumer::{CommitMode, Consumer}; use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, error}; @@ -29,7 +29,7 @@ use crate::{ connectors::common::processor::Processor, event::{ format::{json, EventFormat, LogSourceEntry}, - Event as ParseableEvent, + Event as ParseableEvent, USER_AGENT_KEY, }, parseable::PARSEABLE, storage::StreamType, @@ -76,6 +76,9 @@ impl ParseableSinkProcessor { } } + let mut p_custom_fields = HashMap::new(); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string()); + let p_event = json::Event::new(Value::Array(json_vec)).into_event( stream_name.to_string(), total_payload_size, @@ -85,6 +88,7 @@ impl ParseableSinkProcessor { time_partition.as_ref(), schema_version, StreamType::UserDefined, + &p_custom_fields, )?; Ok(p_event) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 903ab2752..36ffe0427 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -149,6 +149,7 @@ impl EventFormat for Event { time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, + p_custom_fields: &HashMap, ) -> Result { let custom_partition_values = match custom_partitions.as_ref() { Some(custom_partition) => { @@ -168,6 +169,7 @@ impl EventFormat for Event { static_schema_flag, time_partition, schema_version, + p_custom_fields, )?; Ok(super::Event { diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 58c35fc79..830409d3d 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -33,7 +33,7 @@ use serde_json::Value; use crate::{ metadata::SchemaVersion, storage::StreamType, - utils::arrow::{get_field, get_timestamp_array, replace_columns}, + utils::arrow::{add_parseable_fields, get_field}, }; use super::{Event, DEFAULT_TIMESTAMP_KEY}; @@ -145,9 +145,10 @@ pub trait EventFormat: Sized { static_schema_flag: bool, time_partition: Option<&String>, schema_version: SchemaVersion, + p_custom_fields: &HashMap, ) -> Result<(RecordBatch, bool), AnyError> { let p_timestamp = self.get_p_timestamp(); - let (data, mut schema, is_first) = self.to_data( + let (data, schema, is_first) = self.to_data( storage_schema, time_partition, schema_version, @@ -161,16 +162,6 @@ pub trait EventFormat: Sized { )); }; - // add the p_timestamp field to the event schema to the 0th index - schema.insert( - 0, - Arc::new(Field::new( - DEFAULT_TIMESTAMP_KEY, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )), - ); - // prepare the record batch and new fields to be added let mut new_schema = Arc::new(Schema::new(schema)); if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { @@ -179,13 +170,9 @@ pub trait EventFormat: Sized { new_schema = update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); - let mut rb = Self::decode(data, new_schema.clone())?; - rb = replace_columns( - rb.schema(), - &rb, - &[0], - &[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))], - ); + let rb = Self::decode(data, new_schema.clone())?; + + let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?; Ok((rb, is_first)) } @@ -223,6 +210,7 @@ pub trait EventFormat: Sized { time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, + p_custom_fields: &HashMap, ) -> Result; } diff --git a/src/event/mod.rs b/src/event/mod.rs index 29a4a0899..b641643cb 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -35,6 +35,9 @@ use chrono::NaiveDateTime; use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; +pub const USER_AGENT_KEY: &str = "p_user_agent"; +pub const SOURCE_IP_KEY: &str = "p_src_ip"; +pub const FORMAT_KEY: &str = "p_format"; #[derive(Clone)] pub struct Event { diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index 79e180d71..bf9991a02 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -23,6 +23,7 @@ use actix_web::{ middleware::Next, }; use actix_web_httpauth::extractors::basic::BasicAuth; +use http::header::USER_AGENT; use ulid::Ulid; use crate::{ @@ -85,7 +86,7 @@ pub async fn audit_log_middleware( ) .with_user_agent( req.headers() - .get("User-Agent") + .get(USER_AGENT) .and_then(|a| a.to_str().ok()) .unwrap_or_default(), ) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 1e5e6d048..b8e056cac 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -41,7 +41,7 @@ use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::flatten_and_push_logs; +use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; @@ -72,6 +72,8 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result) -> Result Result< None, SchemaVersion::V0, StreamType::Internal, + &HashMap::new(), )? .process()?; @@ -143,8 +146,9 @@ pub async fn handle_otel_logs_ingestion( vec![log_source_entry], ) .await?; + let p_custom_fields = get_custom_fields_from_header(req); - flatten_and_push_logs(json, &stream_name, &log_source).await?; + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -166,6 +170,7 @@ pub async fn handle_otel_metrics_ingestion( if log_source != LogSource::OtelMetrics { return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics)); } + let stream_name = stream_name.to_str().unwrap().to_owned(); let log_source_entry = LogSourceEntry::new( log_source.clone(), @@ -182,7 +187,9 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let p_custom_fields = get_custom_fields_from_header(req); + + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -222,7 +229,9 @@ pub async fn handle_otel_traces_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let p_custom_fields = get_custom_fields_from_header(req); + + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -271,7 +280,8 @@ pub async fn post_event( return Err(PostError::OtelNotSupported); } - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let p_custom_fields = get_custom_fields_from_header(req); + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -421,7 +431,13 @@ mod tests { }); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -449,7 +465,13 @@ mod tests { }); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -481,7 +503,7 @@ mod tests { ); let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -513,7 +535,7 @@ mod tests { ); assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .is_err()); } @@ -531,7 +553,7 @@ mod tests { ); let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -572,7 +594,13 @@ mod tests { ]); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 3); @@ -620,7 +648,13 @@ mod tests { ]); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 3); @@ -669,7 +703,7 @@ mod tests { ); let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); assert_eq!(rb.num_rows(), 3); @@ -718,7 +752,7 @@ mod tests { ); assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .is_err()); } @@ -758,7 +792,13 @@ mod tests { .unwrap(); let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 5); @@ -841,7 +881,13 @@ mod tests { .unwrap(); let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V1, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 4); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 84d5ae117..cb932a5c3 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,17 +16,27 @@ * */ +use actix_web::HttpRequest; use chrono::Utc; +use http::header::USER_AGENT; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, }; use serde_json::Value; +use std::collections::HashMap; +use tracing::warn; use crate::{ - event::format::{json, EventFormat, LogSource}, - handlers::http::{ - ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, + event::{ + format::{json, EventFormat, LogSource}, + FORMAT_KEY, SOURCE_IP_KEY, USER_AGENT_KEY, + }, + handlers::{ + http::{ + ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, + }, + LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, }, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::PARSEABLE, @@ -34,42 +44,47 @@ use crate::{ utils::json::{convert_array_to_object, flatten::convert_to_array}, }; +const IGNORE_HEADERS: [&str; 2] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY]; +const MAX_CUSTOM_FIELDS: usize = 10; +const MAX_FIELD_VALUE_LENGTH: usize = 100; + pub async fn flatten_and_push_logs( json: Value, stream_name: &str, log_source: &LogSource, + p_custom_fields: &HashMap, ) -> Result<(), PostError> { match log_source { LogSource::Kinesis => { //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, &LogSource::default()).await?; + push_logs(stream_name, record, &LogSource::default(), p_custom_fields).await?; } } LogSource::OtelLogs => { //custom flattening required for otel logs let logs: LogsData = serde_json::from_value(json)?; for record in flatten_otel_logs(&logs) { - push_logs(stream_name, record, log_source).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelTraces => { //custom flattening required for otel traces let traces: TracesData = serde_json::from_value(json)?; for record in flatten_otel_traces(&traces) { - push_logs(stream_name, record, log_source).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelMetrics => { //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_value(json)?; for record in flatten_otel_metrics(metrics) { - push_logs(stream_name, record, log_source).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } _ => { - push_logs(stream_name, json, log_source).await?; + push_logs(stream_name, json, log_source, p_custom_fields).await?; } } Ok(()) @@ -79,6 +94,7 @@ async fn push_logs( stream_name: &str, json: Value, log_source: &LogSource, + p_custom_fields: &HashMap, ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; let time_partition = stream.get_time_partition(); @@ -123,8 +139,127 @@ async fn push_logs( time_partition.as_ref(), schema_version, StreamType::UserDefined, + p_custom_fields, )? .process()?; } Ok(()) } + +pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap { + let user_agent = req + .headers() + .get(USER_AGENT) + .and_then(|a| a.to_str().ok()) + .unwrap_or_default(); + + let conn = req.connection_info().clone(); + + let source_ip = conn.realip_remote_addr().unwrap_or_default(); + let mut p_custom_fields = HashMap::new(); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string()); + p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string()); + + // Iterate through headers and add custom fields + for (header_name, header_value) in req.headers().iter() { + // Check if we've reached the maximum number of custom fields + if p_custom_fields.len() >= MAX_CUSTOM_FIELDS { + warn!( + "Maximum number of custom fields ({}) reached, ignoring remaining headers", + MAX_CUSTOM_FIELDS + ); + break; + } + + let header_name = header_name.as_str(); + if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) { + if let Ok(value) = header_value.to_str() { + let key = header_name.trim_start_matches("x-p-"); + if !key.is_empty() { + // Truncate value if it exceeds the maximum length + let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH { + warn!( + "Header value for '{}' exceeds maximum length, truncating", + header_name + ); + &value[..MAX_FIELD_VALUE_LENGTH] + } else { + value + }; + p_custom_fields.insert(key.to_string(), truncated_value.to_string()); + } else { + warn!( + "Ignoring header with empty key after prefix: {}", + header_name + ); + } + } + } + + if header_name == LOG_SOURCE_KEY { + if let Ok(value) = header_value.to_str() { + p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string()); + } + } + } + + p_custom_fields +} + +#[cfg(test)] +mod tests { + use super::*; + use actix_web::test::TestRequest; + + #[test] + fn test_get_custom_fields_from_header_with_custom_fields() { + let req = TestRequest::default() + .insert_header((USER_AGENT, "TestUserAgent")) + .insert_header(("x-p-environment", "dev")) + .to_http_request(); + + let custom_fields = get_custom_fields_from_header(req); + + assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent"); + assert_eq!(custom_fields.get("environment").unwrap(), "dev"); + } + + #[test] + fn test_get_custom_fields_from_header_with_ignored_headers() { + let req = TestRequest::default() + .insert_header((USER_AGENT, "TestUserAgent")) + .insert_header((STREAM_NAME_HEADER_KEY, "teststream")) + .to_http_request(); + + let custom_fields = get_custom_fields_from_header(req); + + assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent"); + assert!(!custom_fields.contains_key(STREAM_NAME_HEADER_KEY)); + } + + #[test] + fn test_get_custom_fields_from_header_with_format_key() { + let req = TestRequest::default() + .insert_header((USER_AGENT, "TestUserAgent")) + .insert_header((LOG_SOURCE_KEY, "otel-logs")) + .to_http_request(); + + let custom_fields = get_custom_fields_from_header(req); + + assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent"); + assert_eq!(custom_fields.get(FORMAT_KEY).unwrap(), "otel-logs"); + } + + #[test] + fn test_get_custom_fields_empty_header_after_prefix() { + let req = TestRequest::default() + .insert_header(("x-p-", "empty")) + .to_http_request(); + + let custom_fields = get_custom_fields_from_header(req); + + assert_eq!(custom_fields.len(), 2); + assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), ""); + assert_eq!(custom_fields.get(SOURCE_IP_KEY).unwrap(), ""); + } +} diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index e29762047..659c2bf88 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -40,10 +40,15 @@ //! } //! ``` -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; -use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array}; -use arrow_schema::Schema; +use arrow_array::{ + Array, ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array, +}; +use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; use arrow_select::take::take; use chrono::{DateTime, Utc}; use itertools::Itertools; @@ -55,6 +60,8 @@ use anyhow::Result; pub use batch_adapter::adapt_batch; use serde_json::{Map, Value}; +use crate::event::DEFAULT_TIMESTAMP_KEY; + /// Replaces columns in a record batch with new arrays. /// /// # Arguments @@ -139,6 +146,59 @@ pub fn get_timestamp_array(p_timestamp: DateTime, size: usize) -> Timestamp TimestampMillisecondArray::from_value(p_timestamp.timestamp_millis(), size) } +pub fn add_parseable_fields( + rb: RecordBatch, + p_timestamp: DateTime, + p_custom_fields: &HashMap, +) -> Result { + // Return Result for proper error handling + + // Add custom fields in sorted order + let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect(); + sorted_keys.sort(); + + let schema = rb.schema(); + let row_count = rb.num_rows(); + + let mut fields = schema + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect_vec(); + let mut field_names: HashSet = fields.iter().map(|f| f.name().to_string()).collect(); + + fields.insert( + 0, + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ); + let mut columns = rb.columns().iter().map(Arc::clone).collect_vec(); + columns.insert( + 0, + Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef, + ); + + //ignore the duplicate fields, no need to add them again + for key in sorted_keys { + if !field_names.contains(key) { + fields.push(Field::new(key, DataType::Utf8, true)); + field_names.insert(key.to_string()); + + let value = p_custom_fields.get(key).unwrap(); + columns.push(Arc::new(StringArray::from_iter_values( + std::iter::repeat(value).take(row_count), + )) as ArrayRef); + } + } + + // Create the new schema and batch + let new_schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(new_schema, columns) +} + pub fn reverse(rb: &RecordBatch) -> RecordBatch { let indices = UInt64Array::from_iter_values((0..rb.num_rows()).rev().map(|x| x as u64)); let arrays = rb