Skip to content

fix: flattening for kinesis #1329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::str;

use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels};

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Message {
Expand Down Expand Up @@ -57,29 +59,56 @@ struct Data {
// "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a",
// "timestamp": "1704964113659"
// }
pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
pub async fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, anyhow::Error> {
let mut vec_kinesis_json = Vec::new();

for record in message.records.iter() {
let bytes = STANDARD.decode(record.data.clone()).unwrap();
let json_string: String = String::from_utf8(bytes).unwrap();
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
let mut kinesis_json: Map<String, Value> = match serde_json::from_value(json) {
Ok(value) => value,
Err(error) => panic!("Failed to deserialize JSON: {}", error),
};

kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.clone()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);
let bytes = STANDARD.decode(record.data.clone())?;
if let Ok(json_string) = String::from_utf8(bytes) {
let json: serde_json::Value = serde_json::from_str(&json_string)?;
// Check if the JSON has more than the allowed levels of nesting
// If it has less than or equal to the allowed levels, we flatten it.
// If it has more than the allowed levels, we just push it as is
// without flattening or modifying it.
if !has_more_than_max_allowed_levels(&json, 1) {
let flattened_json_arr = generic_flattening(&json)?;
for flattened_json in flattened_json_arr {
let mut kinesis_json: Map<String, Value> =
serde_json::from_value(flattened_json)?;
kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.clone()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);

vec_kinesis_json.push(Value::Object(kinesis_json));
vec_kinesis_json.push(Value::Object(kinesis_json));
}
} else {
// If the JSON has more than the allowed levels, we just push it as is
// without flattening or modifying it.
// This is a fallback to ensure we don't lose data.
tracing::warn!(
"Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.",
message.request_id, message.timestamp
);
vec_kinesis_json.push(json);
}
Comment on lines +89 to +98
Copy link
Contributor

@coderabbitai coderabbitai bot May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding metadata consistency for deeply nested records.

When skipping flattening due to excessive nesting, the original JSON is pushed without adding requestId and timestamp fields. This creates inconsistent data structure compared to flattened records.

Consider adding the metadata even for non-flattened records:

             } else {
                 // If the JSON has more than the allowed levels, we just push it as is
                 // without flattening or modifying it.
                 // This is a fallback to ensure we don't lose data.
+                let mut kinesis_json = json.as_object().unwrap_or(&Map::new()).clone();
+                kinesis_json.insert("requestId".to_owned(), Value::String(message.request_id.clone()));
+                kinesis_json.insert("timestamp".to_owned(), Value::String(message.timestamp.to_string()));
                 tracing::warn!(
                     "Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.",
                     message.request_id, message.timestamp
                 );
-                vec_kinesis_json.push(json);
+                vec_kinesis_json.push(Value::Object(kinesis_json));
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} else {
// If the JSON has more than the allowed levels, we just push it as is
// without flattening or modifying it.
// This is a fallback to ensure we don't lose data.
tracing::warn!(
"Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.",
message.request_id, message.timestamp
);
vec_kinesis_json.push(json);
}
} else {
// If the JSON has more than the allowed levels, we just push it as is
// without flattening or modifying it.
// This is a fallback to ensure we don't lose data.
let mut kinesis_json = json.as_object().unwrap_or(&Map::new()).clone();
kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.clone()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);
tracing::warn!(
"Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.",
message.request_id, message.timestamp
);
vec_kinesis_json.push(Value::Object(kinesis_json));
}
🤖 Prompt for AI Agents
In src/handlers/http/kinesis.rs around lines 89 to 98, when skipping flattening
for deeply nested JSON records, the code pushes the original JSON without adding
the requestId and timestamp metadata fields. To maintain consistent data
structure, modify the code to insert the requestId and timestamp into the JSON
object before pushing it to vec_kinesis_json, ensuring all records include these
metadata fields regardless of flattening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the decoded json is heavily nested and it's a json array, unwrapping it as object will always fail and unwrap_or(&Map::new()) will end up in data loss, hence pushing the actual nested json to process is the best we can do here. Obviously, we cannot add metadata to the json array unless flattened, hence, we will have to push the original json as is for processing. Hope the description clarifies the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhilsinhaparseable Thank you for the detailed explanation! You're absolutely right. I didn't consider the case where the decoded JSON could be an array at the root level. Your analysis is spot-on:

  1. For JSON arrays, json.as_object() would return None
  2. Using unwrap_or(&Map::new()) would indeed cause data loss by replacing the array with an empty object
  3. Metadata fields cannot be directly added to JSON arrays - they need to be flattened first to become individual objects
  4. When flattening is skipped due to nesting depth, preserving the original structure is the only viable option

The current implementation correctly handles this edge case by pushing the original JSON as-is. I appreciate the clarification!


✏️ Learnings added
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1329
File: src/handlers/http/kinesis.rs:89-98
Timestamp: 2025-05-28T04:45:40.846Z
Learning: In Kinesis log processing, when handling deeply nested JSON that exceeds flattening limits, the original JSON structure must be preserved as-is. If the JSON is an array at the root level, attempting to add metadata fields like `requestId` and `timestamp` would fail because arrays don't support direct key-value insertion. The flattening process handles arrays by converting elements to individual objects, but when flattening is skipped, the original array structure must remain intact to prevent data loss.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

} else {
tracing::error!(
"Failed to decode base64 data for kinesis log with requestId {} and timestamp {}",
message.request_id,
message.timestamp
);
return Err(anyhow::anyhow!(
"Failed to decode base64 data for record with requestId {} and timestamp {}",
message.request_id,
message.timestamp
));
}
}

vec_kinesis_json
Ok(vec_kinesis_json)
}
6 changes: 3 additions & 3 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ pub async fn flatten_and_push_logs(
LogSource::Kinesis => {
//custom flattening required for Amazon Kinesis
let message: Message = serde_json::from_value(json)?;
for record in flatten_kinesis_logs(message) {
push_logs(stream_name, record, log_source, p_custom_fields).await?;
}
let flattened_kinesis_data = flatten_kinesis_logs(message).await?;
let record = convert_to_array(flattened_kinesis_data)?;
push_logs(stream_name, record, log_source, p_custom_fields).await?;
}
LogSource::OtelLogs => {
//custom flattening required for otel logs
Expand Down
6 changes: 2 additions & 4 deletions src/utils/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ pub fn flatten_json_body(
// Flatten the json body only if new schema and has less than 4 levels of nesting
let mut nested_value = if schema_version == SchemaVersion::V1
&& !has_more_than_max_allowed_levels(&body, 1)
&& matches!(
log_source,
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis
) {
&& matches!(log_source, LogSource::Json | LogSource::Custom(_))
{
let flattened_json = generic_flattening(&body)?;
convert_to_array(flattened_json)?
} else {
Expand Down
Loading