From 6dc39ab99c7eb2c65de44d3c2252c90ea319834b Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 7 Mar 2025 08:00:37 -0500 Subject: [PATCH 1/6] feat: add custom fields to events p_user_agent - fetch user_agent from request header p_src_ip - fetch source ip from connection info from request header user can add additional headers to the ingest apis in the below format `x-p-: ` e.g. x-p-environment:dev server adds `environment` in the events with the value `dev` user can add multiple custom headers to be inserted as separate fields in the event --- src/event/format/json.rs | 2 + src/event/format/mod.rs | 24 ++---- src/event/mod.rs | 2 + src/handlers/http/audit.rs | 3 +- src/handlers/http/ingest.rs | 80 +++++++++++++++---- src/handlers/http/modal/utils/ingest_utils.rs | 59 +++++++++++--- src/utils/arrow/mod.rs | 51 +++++++++++- 7 files changed, 173 insertions(+), 48 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 903ab2752..36ffe0427 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -149,6 +149,7 @@ impl EventFormat for Event { time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, + p_custom_fields: &HashMap, ) -> Result { let custom_partition_values = match custom_partitions.as_ref() { Some(custom_partition) => { @@ -168,6 +169,7 @@ impl EventFormat for Event { static_schema_flag, time_partition, schema_version, + p_custom_fields, )?; Ok(super::Event { diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 58c35fc79..12368f35e 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -33,7 +33,7 @@ use serde_json::Value; use crate::{ metadata::SchemaVersion, storage::StreamType, - utils::arrow::{get_field, get_timestamp_array, replace_columns}, + utils::arrow::{add_parseable_fields, get_field}, }; use super::{Event, DEFAULT_TIMESTAMP_KEY}; @@ -145,6 +145,7 @@ pub trait EventFormat: Sized { static_schema_flag: bool, time_partition: Option<&String>, schema_version: SchemaVersion, + p_custom_fields: &HashMap, ) -> Result<(RecordBatch, bool), AnyError> { let p_timestamp = self.get_p_timestamp(); let (data, mut schema, is_first) = self.to_data( @@ -161,16 +162,6 @@ pub trait EventFormat: Sized { )); }; - // add the p_timestamp field to the event schema to the 0th index - schema.insert( - 0, - Arc::new(Field::new( - DEFAULT_TIMESTAMP_KEY, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )), - ); - // prepare the record batch and new fields to be added let mut new_schema = Arc::new(Schema::new(schema)); if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { @@ -179,13 +170,9 @@ pub trait EventFormat: Sized { new_schema = update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); - let mut rb = Self::decode(data, new_schema.clone())?; - rb = replace_columns( - rb.schema(), - &rb, - &[0], - &[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))], - ); + let rb = Self::decode(data, new_schema.clone())?; + + let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields); Ok((rb, is_first)) } @@ -223,6 +210,7 @@ pub trait EventFormat: Sized { time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, + p_custom_fields: &HashMap, ) -> Result; } diff --git a/src/event/mod.rs b/src/event/mod.rs index 29a4a0899..0b9dd119b 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -35,6 +35,8 @@ use chrono::NaiveDateTime; use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; +pub const USER_AGENT_KEY: &str = "p_user_agent"; +pub const SOURCE_IP_KEY: &str = "p_src_ip"; #[derive(Clone)] pub struct Event { diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index 79e180d71..bf9991a02 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -23,6 +23,7 @@ use actix_web::{ middleware::Next, }; use actix_web_httpauth::extractors::basic::BasicAuth; +use http::header::USER_AGENT; use ulid::Ulid; use crate::{ @@ -85,7 +86,7 @@ pub async fn audit_log_middleware( ) .with_user_agent( req.headers() - .get("User-Agent") + .get(USER_AGENT) .and_then(|a| a.to_str().ok()) .unwrap_or_default(), ) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 1e5e6d048..b8e056cac 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -41,7 +41,7 @@ use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::flatten_and_push_logs; +use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; @@ -72,6 +72,8 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result) -> Result Result< None, SchemaVersion::V0, StreamType::Internal, + &HashMap::new(), )? .process()?; @@ -143,8 +146,9 @@ pub async fn handle_otel_logs_ingestion( vec![log_source_entry], ) .await?; + let p_custom_fields = get_custom_fields_from_header(req); - flatten_and_push_logs(json, &stream_name, &log_source).await?; + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -166,6 +170,7 @@ pub async fn handle_otel_metrics_ingestion( if log_source != LogSource::OtelMetrics { return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics)); } + let stream_name = stream_name.to_str().unwrap().to_owned(); let log_source_entry = LogSourceEntry::new( log_source.clone(), @@ -182,7 +187,9 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let p_custom_fields = get_custom_fields_from_header(req); + + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -222,7 +229,9 @@ pub async fn handle_otel_traces_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let p_custom_fields = get_custom_fields_from_header(req); + + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -271,7 +280,8 @@ pub async fn post_event( return Err(PostError::OtelNotSupported); } - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let p_custom_fields = get_custom_fields_from_header(req); + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; Ok(HttpResponse::Ok().finish()) } @@ -421,7 +431,13 @@ mod tests { }); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -449,7 +465,13 @@ mod tests { }); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -481,7 +503,7 @@ mod tests { ); let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -513,7 +535,7 @@ mod tests { ); assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .is_err()); } @@ -531,7 +553,7 @@ mod tests { ); let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); assert_eq!(rb.num_rows(), 1); @@ -572,7 +594,13 @@ mod tests { ]); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 3); @@ -620,7 +648,13 @@ mod tests { ]); let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 3); @@ -669,7 +703,7 @@ mod tests { ); let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); assert_eq!(rb.num_rows(), 3); @@ -718,7 +752,7 @@ mod tests { ); assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .is_err()); } @@ -758,7 +792,13 @@ mod tests { .unwrap(); let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V0, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 5); @@ -841,7 +881,13 @@ mod tests { .unwrap(); let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) + .into_recordbatch( + &HashMap::default(), + false, + None, + SchemaVersion::V1, + &HashMap::new(), + ) .unwrap(); assert_eq!(rb.num_rows(), 4); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 84d5ae117..eb327d69a 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,6 +16,9 @@ * */ +use std::collections::HashMap; + +use actix_web::HttpRequest; use chrono::Utc; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, @@ -23,10 +26,16 @@ use opentelemetry_proto::tonic::{ use serde_json::Value; use crate::{ - event::format::{json, EventFormat, LogSource}, - handlers::http::{ - ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, + event::{ + format::{json, EventFormat, LogSource}, + SOURCE_IP_KEY, USER_AGENT_KEY, + }, + handlers::{ + http::{ + ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, + }, + LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, }, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::PARSEABLE, @@ -34,42 +43,45 @@ use crate::{ utils::json::{convert_array_to_object, flatten::convert_to_array}, }; +const IGNORE_HEADERS: [&str; 2] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY]; + pub async fn flatten_and_push_logs( json: Value, stream_name: &str, log_source: &LogSource, + p_custom_fields: &HashMap, ) -> Result<(), PostError> { match log_source { LogSource::Kinesis => { //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, &LogSource::default()).await?; + push_logs(stream_name, record, &LogSource::default(), p_custom_fields).await?; } } LogSource::OtelLogs => { //custom flattening required for otel logs let logs: LogsData = serde_json::from_value(json)?; for record in flatten_otel_logs(&logs) { - push_logs(stream_name, record, log_source).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelTraces => { //custom flattening required for otel traces let traces: TracesData = serde_json::from_value(json)?; for record in flatten_otel_traces(&traces) { - push_logs(stream_name, record, log_source).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelMetrics => { //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_value(json)?; for record in flatten_otel_metrics(metrics) { - push_logs(stream_name, record, log_source).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } _ => { - push_logs(stream_name, json, log_source).await?; + push_logs(stream_name, json, log_source, p_custom_fields).await?; } } Ok(()) @@ -79,6 +91,7 @@ async fn push_logs( stream_name: &str, json: Value, log_source: &LogSource, + p_custom_fields: &HashMap, ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; let time_partition = stream.get_time_partition(); @@ -123,8 +136,36 @@ async fn push_logs( time_partition.as_ref(), schema_version, StreamType::UserDefined, + p_custom_fields, )? .process()?; } Ok(()) } + +pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap { + let user_agent = req + .headers() + .get("User-Agent") + .and_then(|a| a.to_str().ok()) + .unwrap_or_default(); + + let conn = req.connection_info().clone(); + + let source_ip = conn.realip_remote_addr().unwrap_or_default(); + let mut p_custom_fields = HashMap::new(); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string()); + p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string()); + + // Iterate through headers and add custom fields + for (header_name, header_value) in req.headers().iter() { + let header_name = header_name.as_str(); + if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) { + if let Ok(value) = header_value.to_str() { + let key = header_name.trim_start_matches("x-p-"); + p_custom_fields.insert(key.to_string(), value.to_string()); + } + } + } + p_custom_fields +} diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index e29762047..145bf6b0c 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -40,10 +40,12 @@ //! } //! ``` -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; -use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array}; -use arrow_schema::Schema; +use arrow_array::{ + Array, ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array, +}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; use arrow_select::take::take; use chrono::{DateTime, Utc}; use itertools::Itertools; @@ -55,6 +57,8 @@ use anyhow::Result; pub use batch_adapter::adapt_batch; use serde_json::{Map, Value}; +use crate::event::DEFAULT_TIMESTAMP_KEY; + /// Replaces columns in a record batch with new arrays. /// /// # Arguments @@ -139,6 +143,47 @@ pub fn get_timestamp_array(p_timestamp: DateTime, size: usize) -> Timestamp TimestampMillisecondArray::from_value(p_timestamp.timestamp_millis(), size) } +pub fn add_parseable_fields( + rb: RecordBatch, + p_timestamp: DateTime, + p_custom_fields: &HashMap, +) -> RecordBatch { + let schema = rb.schema(); + let row_count = rb.num_rows(); + + let mut fields: Vec = schema.fields().iter().map(|f| f.as_ref().clone()).collect(); + let mut columns: Vec = rb.columns().to_vec(); + + // Create and insert the p_timestamp field and array at index 0 + let p_timestamp_array = Arc::new(TimestampMillisecondArray::from_iter_values( + std::iter::repeat(p_timestamp.timestamp_millis()).take(row_count), + )); + let p_timestamp_field = Field::new( + DEFAULT_TIMESTAMP_KEY.to_string(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ); + fields.insert(0, p_timestamp_field); + columns.insert(0, p_timestamp_array); + + // Sort p_custom_fields by key and insert custom fields at the beginning, after the p_timestamp field + let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect(); + sorted_keys.sort(); + for key in sorted_keys.iter().rev() { + let value = p_custom_fields.get(*key).unwrap(); + let string_array: ArrayRef = Arc::new(StringArray::from_iter_values( + std::iter::repeat(value).take(row_count), + )); + columns.insert(1, string_array); + + let new_field = Field::new((*key).clone(), DataType::Utf8, true); + fields.insert(1, new_field); + } + + let new_schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(new_schema, columns).unwrap() +} + pub fn reverse(rb: &RecordBatch) -> RecordBatch { let indices = UInt64Array::from_iter_values((0..rb.num_rows()).rev().map(|x| x as u64)); let arrays = rb From e9d4ae189ef8ef8818131f7a8831fcd130384b51 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 13 Mar 2025 04:28:25 -0400 Subject: [PATCH 2/6] update logic --- src/event/format/mod.rs | 4 +-- src/utils/arrow/mod.rs | 65 +++++++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 12368f35e..830409d3d 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -148,7 +148,7 @@ pub trait EventFormat: Sized { p_custom_fields: &HashMap, ) -> Result<(RecordBatch, bool), AnyError> { let p_timestamp = self.get_p_timestamp(); - let (data, mut schema, is_first) = self.to_data( + let (data, schema, is_first) = self.to_data( storage_schema, time_partition, schema_version, @@ -172,7 +172,7 @@ pub trait EventFormat: Sized { let rb = Self::decode(data, new_schema.clone())?; - let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields); + let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?; Ok((rb, is_first)) } diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 145bf6b0c..79d3aea3e 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -45,7 +45,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow_array::{ Array, ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array, }; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; use arrow_select::take::take; use chrono::{DateTime, Utc}; use itertools::Itertools; @@ -147,41 +147,50 @@ pub fn add_parseable_fields( rb: RecordBatch, p_timestamp: DateTime, p_custom_fields: &HashMap, -) -> RecordBatch { +) -> Result { + // Return Result for proper error handling + + // Add custom fields in sorted order + let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect(); + sorted_keys.sort(); + let schema = rb.schema(); let row_count = rb.num_rows(); - let mut fields: Vec = schema.fields().iter().map(|f| f.as_ref().clone()).collect(); - let mut columns: Vec = rb.columns().to_vec(); - - // Create and insert the p_timestamp field and array at index 0 - let p_timestamp_array = Arc::new(TimestampMillisecondArray::from_iter_values( - std::iter::repeat(p_timestamp.timestamp_millis()).take(row_count), - )); - let p_timestamp_field = Field::new( - DEFAULT_TIMESTAMP_KEY.to_string(), - DataType::Timestamp(TimeUnit::Millisecond, None), - false, + let mut fields = schema + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect_vec(); + fields.insert( + 0, + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ); + fields.extend( + sorted_keys + .iter() + .map(|k| Field::new(*k, DataType::Utf8, true)), ); - fields.insert(0, p_timestamp_field); - columns.insert(0, p_timestamp_array); - // Sort p_custom_fields by key and insert custom fields at the beginning, after the p_timestamp field - let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect(); - sorted_keys.sort(); - for key in sorted_keys.iter().rev() { - let value = p_custom_fields.get(*key).unwrap(); - let string_array: ArrayRef = Arc::new(StringArray::from_iter_values( + let mut columns = rb.columns().iter().map(Arc::clone).collect_vec(); + columns.insert( + 0, + Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef, + ); + columns.extend(sorted_keys.iter().map(|k| { + let value = p_custom_fields.get(*k).unwrap(); + Arc::new(StringArray::from_iter_values( std::iter::repeat(value).take(row_count), - )); - columns.insert(1, string_array); - - let new_field = Field::new((*key).clone(), DataType::Utf8, true); - fields.insert(1, new_field); - } + )) as ArrayRef + })); + // Create the new schema and batch let new_schema = Arc::new(Schema::new(fields)); - RecordBatch::try_new(new_schema, columns).unwrap() + RecordBatch::try_new(new_schema, columns) } pub fn reverse(rb: &RecordBatch) -> RecordBatch { From ae7deb57267e7dee06c9ddc34c0826c6be6b4bcc Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 20 Mar 2025 23:06:59 -0400 Subject: [PATCH 3/6] add p_format to event, add tests --- src/event/mod.rs | 1 + src/handlers/http/modal/utils/ingest_utils.rs | 59 ++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/event/mod.rs b/src/event/mod.rs index 0b9dd119b..b641643cb 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -37,6 +37,7 @@ use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const USER_AGENT_KEY: &str = "p_user_agent"; pub const SOURCE_IP_KEY: &str = "p_src_ip"; +pub const FORMAT_KEY: &str = "p_format"; #[derive(Clone)] pub struct Event { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index eb327d69a..77d09e648 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use actix_web::HttpRequest; use chrono::Utc; +use http::header::USER_AGENT; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, }; @@ -28,7 +29,7 @@ use serde_json::Value; use crate::{ event::{ format::{json, EventFormat, LogSource}, - SOURCE_IP_KEY, USER_AGENT_KEY, + FORMAT_KEY, SOURCE_IP_KEY, USER_AGENT_KEY, }, handlers::{ http::{ @@ -43,7 +44,7 @@ use crate::{ utils::json::{convert_array_to_object, flatten::convert_to_array}, }; -const IGNORE_HEADERS: [&str; 2] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY]; +const IGNORE_HEADERS: [&str; 1] = [STREAM_NAME_HEADER_KEY]; pub async fn flatten_and_push_logs( json: Value, @@ -146,7 +147,7 @@ async fn push_logs( pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap { let user_agent = req .headers() - .get("User-Agent") + .get(USER_AGENT) .and_then(|a| a.to_str().ok()) .unwrap_or_default(); @@ -166,6 +167,58 @@ pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap Date: Fri, 21 Mar 2025 01:27:39 -0400 Subject: [PATCH 4/6] add user-agent for kafka --- src/connectors/kafka/processor.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 10025ed98..09d826b46 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -16,12 +16,12 @@ * */ -use std::sync::Arc; - use async_trait::async_trait; use futures_util::StreamExt; use rdkafka::consumer::{CommitMode, Consumer}; use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, error}; @@ -29,7 +29,7 @@ use crate::{ connectors::common::processor::Processor, event::{ format::{json, EventFormat, LogSourceEntry}, - Event as ParseableEvent, + Event as ParseableEvent, USER_AGENT_KEY, }, parseable::PARSEABLE, storage::StreamType, @@ -76,6 +76,9 @@ impl ParseableSinkProcessor { } } + let mut p_custom_fields = HashMap::new(); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string()); + let p_event = json::Event::new(Value::Array(json_vec)).into_event( stream_name.to_string(), total_payload_size, @@ -85,6 +88,7 @@ impl ParseableSinkProcessor { time_partition.as_ref(), schema_version, StreamType::UserDefined, + &p_custom_fields, )?; Ok(p_event) From d547ef5f51b9de3b842c5ad1f69ae62352ae6517 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Mar 2025 05:23:06 -0400 Subject: [PATCH 5/6] add duplicate and empty check --- src/handlers/http/modal/utils/ingest_utils.rs | 26 +++++++++++++-- src/utils/arrow/mod.rs | 32 +++++++++++-------- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 77d09e648..318f18e49 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,8 +16,6 @@ * */ -use std::collections::HashMap; - use actix_web::HttpRequest; use chrono::Utc; use http::header::USER_AGENT; @@ -25,6 +23,8 @@ use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, }; use serde_json::Value; +use std::collections::HashMap; +use tracing::warn; use crate::{ event::{ @@ -164,7 +164,14 @@ pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap = fields.iter().map(|f| f.name().to_string()).collect(); + fields.insert( 0, Field::new( @@ -170,23 +175,24 @@ pub fn add_parseable_fields( true, ), ); - fields.extend( - sorted_keys - .iter() - .map(|k| Field::new(*k, DataType::Utf8, true)), - ); - let mut columns = rb.columns().iter().map(Arc::clone).collect_vec(); columns.insert( 0, Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef, ); - columns.extend(sorted_keys.iter().map(|k| { - let value = p_custom_fields.get(*k).unwrap(); - Arc::new(StringArray::from_iter_values( - std::iter::repeat(value).take(row_count), - )) as ArrayRef - })); + + //ignore the duplicate fields, no need to add them again + for key in sorted_keys { + if !field_names.contains(key) { + fields.push(Field::new(key, DataType::Utf8, true)); + field_names.insert(key.to_string()); + + let value = p_custom_fields.get(key).unwrap(); + columns.push(Arc::new(StringArray::from_iter_values( + std::iter::repeat(value).take(row_count), + )) as ArrayRef); + } + } // Create the new schema and batch let new_schema = Arc::new(Schema::new(fields)); From 718c9b2feebeb07cdeefade9df7d45ee0fe6d2f1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 21 Mar 2025 05:37:53 -0400 Subject: [PATCH 6/6] limit custom column counts, limit value length --- src/handlers/http/modal/utils/ingest_utils.rs | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 318f18e49..cb932a5c3 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -44,7 +44,9 @@ use crate::{ utils::json::{convert_array_to_object, flatten::convert_to_array}, }; -const IGNORE_HEADERS: [&str; 1] = [STREAM_NAME_HEADER_KEY]; +const IGNORE_HEADERS: [&str; 2] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY]; +const MAX_CUSTOM_FIELDS: usize = 10; +const MAX_FIELD_VALUE_LENGTH: usize = 100; pub async fn flatten_and_push_logs( json: Value, @@ -160,12 +162,31 @@ pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap= MAX_CUSTOM_FIELDS { + warn!( + "Maximum number of custom fields ({}) reached, ignoring remaining headers", + MAX_CUSTOM_FIELDS + ); + break; + } + let header_name = header_name.as_str(); if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) { if let Ok(value) = header_value.to_str() { let key = header_name.trim_start_matches("x-p-"); if !key.is_empty() { - p_custom_fields.insert(key.to_string(), value.to_string()); + // Truncate value if it exceeds the maximum length + let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH { + warn!( + "Header value for '{}' exceeds maximum length, truncating", + header_name + ); + &value[..MAX_FIELD_VALUE_LENGTH] + } else { + value + }; + p_custom_fields.insert(key.to_string(), truncated_value.to_string()); } else { warn!( "Ignoring header with empty key after prefix: {}",