diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index e2f245f73..c714729c7 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::str; +use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels}; + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Message { @@ -57,29 +59,56 @@ struct Data { // "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a", // "timestamp": "1704964113659" // } -pub fn flatten_kinesis_logs(message: Message) -> Vec { +pub async fn flatten_kinesis_logs(message: Message) -> Result, anyhow::Error> { let mut vec_kinesis_json = Vec::new(); for record in message.records.iter() { - let bytes = STANDARD.decode(record.data.clone()).unwrap(); - let json_string: String = String::from_utf8(bytes).unwrap(); - let json: serde_json::Value = serde_json::from_str(&json_string).unwrap(); - let mut kinesis_json: Map = match serde_json::from_value(json) { - Ok(value) => value, - Err(error) => panic!("Failed to deserialize JSON: {}", error), - }; - - kinesis_json.insert( - "requestId".to_owned(), - Value::String(message.request_id.clone()), - ); - kinesis_json.insert( - "timestamp".to_owned(), - Value::String(message.timestamp.to_string()), - ); + let bytes = STANDARD.decode(record.data.clone())?; + if let Ok(json_string) = String::from_utf8(bytes) { + let json: serde_json::Value = serde_json::from_str(&json_string)?; + // Check if the JSON has more than the allowed levels of nesting + // If it has less than or equal to the allowed levels, we flatten it. + // If it has more than the allowed levels, we just push it as is + // without flattening or modifying it. + if !has_more_than_max_allowed_levels(&json, 1) { + let flattened_json_arr = generic_flattening(&json)?; + for flattened_json in flattened_json_arr { + let mut kinesis_json: Map = + serde_json::from_value(flattened_json)?; + kinesis_json.insert( + "requestId".to_owned(), + Value::String(message.request_id.clone()), + ); + kinesis_json.insert( + "timestamp".to_owned(), + Value::String(message.timestamp.to_string()), + ); - vec_kinesis_json.push(Value::Object(kinesis_json)); + vec_kinesis_json.push(Value::Object(kinesis_json)); + } + } else { + // If the JSON has more than the allowed levels, we just push it as is + // without flattening or modifying it. + // This is a fallback to ensure we don't lose data. + tracing::warn!( + "Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.", + message.request_id, message.timestamp + ); + vec_kinesis_json.push(json); + } + } else { + tracing::error!( + "Failed to decode base64 data for kinesis log with requestId {} and timestamp {}", + message.request_id, + message.timestamp + ); + return Err(anyhow::anyhow!( + "Failed to decode base64 data for record with requestId {} and timestamp {}", + message.request_id, + message.timestamp + )); + } } - vec_kinesis_json + Ok(vec_kinesis_json) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 04efcbba4..ee1a90cfd 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -61,9 +61,9 @@ pub async fn flatten_and_push_logs( LogSource::Kinesis => { //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; - for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, log_source, p_custom_fields).await?; - } + let flattened_kinesis_data = flatten_kinesis_logs(message).await?; + let record = convert_to_array(flattened_kinesis_data)?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } LogSource::OtelLogs => { //custom flattening required for otel logs diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index e087536be..cb1e2fb81 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -44,10 +44,8 @@ pub fn flatten_json_body( // Flatten the json body only if new schema and has less than 4 levels of nesting let mut nested_value = if schema_version == SchemaVersion::V1 && !has_more_than_max_allowed_levels(&body, 1) - && matches!( - log_source, - LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis - ) { + && matches!(log_source, LogSource::Json | LogSource::Custom(_)) + { let flattened_json = generic_flattening(&body)?; convert_to_array(flattened_json)? } else {