From 9a01050715bdcb14cdcbe91228e94cfac8c4717c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 28 Mar 2025 23:36:17 -0400 Subject: [PATCH 1/2] fix: add custom fields for pmeta stream add p_custom_field - p_user_agent = parseable p_format = pmeta --- src/handlers/http/ingest.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 8a37e4c51..f2637fa4c 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -26,10 +26,10 @@ use chrono::Utc; use http::StatusCode; use serde_json::Value; -use crate::event; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; +use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::SchemaVersion; use crate::option::Mode; @@ -124,7 +124,9 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< let size: usize = body.len(); let json: Value = serde_json::from_slice(&body)?; let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); - + let mut p_custom_fields = HashMap::new(); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), "Parseable".to_string()); + p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); // For internal streams, use old schema format::json::Event::new(json) .into_event( @@ -136,7 +138,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< None, SchemaVersion::V0, StreamType::Internal, - &HashMap::new(), + &p_custom_fields, )? .process()?; From 85ff1960d3d8f64e851a0e42b00e4be12f012e43 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Sat, 29 Mar 2025 12:06:05 +0530 Subject: [PATCH 2/2] Update ingest.rs changed user_agent_key to `parseable` to keep it consistent with the user_agent_key used in kafka (all lowercase) Signed-off-by: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> --- src/handlers/http/ingest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index f2637fa4c..02bab9e97 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -125,7 +125,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< let json: Value = serde_json::from_slice(&body)?; let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); let mut p_custom_fields = HashMap::new(); - p_custom_fields.insert(USER_AGENT_KEY.to_string(), "Parseable".to_string()); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); // For internal streams, use old schema format::json::Event::new(json)