Skip to content

Commit aa37f9c

Browse files
author
Devdutt Shenoi
committed
refactor: into_event brings conversion closer
1 parent 10ec3ba commit aa37f9c

File tree

5 files changed

+188
-179
lines changed

5 files changed

+188
-179
lines changed

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
*
1717
*/
1818

19-
use std::{collections::HashMap, sync::Arc};
19+
use std::sync::Arc;
2020

2121
use async_trait::async_trait;
22-
use chrono::Utc;
2322
use futures_util::StreamExt;
2423
use rdkafka::consumer::{CommitMode, Consumer};
2524
use serde_json::Value;
@@ -58,6 +57,7 @@ impl ParseableSinkProcessor {
5857
let stream = PARSEABLE.get_stream(stream_name)?;
5958
let schema = stream.get_schema_raw();
6059
let time_partition = stream.get_time_partition();
60+
let custom_partition = stream.get_custom_partition();
6161
let static_schema_flag = stream.get_static_schema_flag();
6262
let schema_version = stream.get_schema_version();
6363

@@ -71,28 +71,17 @@ impl ParseableSinkProcessor {
7171
}
7272
}
7373

74-
let (rb, is_first) = json::Event {
75-
json: Value::Array(json_vec),
76-
}
77-
.into_recordbatch(
74+
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
75+
stream_name.to_string(),
76+
total_payload_size,
7877
&schema,
7978
static_schema_flag,
79+
custom_partition.as_ref(),
8080
time_partition.as_ref(),
8181
schema_version,
82+
StreamType::UserDefined,
8283
)?;
8384

84-
let p_event = ParseableEvent {
85-
rb,
86-
stream_name: stream_name.to_string(),
87-
origin_format: "json",
88-
origin_size: total_payload_size,
89-
is_first_event: is_first,
90-
parsed_timestamp: Utc::now().naive_utc(),
91-
time_partition: None,
92-
custom_partition_values: HashMap::new(),
93-
stream_type: StreamType::UserDefined,
94-
};
95-
9685
Ok(p_event)
9786
}
9887
}

src/event/format/json.rs

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,28 @@ use anyhow::anyhow;
2323
use arrow_array::RecordBatch;
2424
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
2525
use arrow_schema::{DataType, Field, Fields, Schema};
26+
use chrono::{DateTime, NaiveDateTime, Utc};
2627
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2728
use itertools::Itertools;
2829
use serde_json::Value;
2930
use std::{collections::HashMap, sync::Arc};
3031
use tracing::error;
3132

3233
use super::EventFormat;
33-
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
34+
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
3435

3536
pub struct Event {
3637
pub json: Value,
38+
ingestion_time: DateTime<Utc>,
39+
}
40+
41+
impl Event {
42+
pub fn new(json: Value) -> Self {
43+
Self {
44+
json,
45+
ingestion_time: Utc::now(),
46+
}
47+
}
3748
}
3849

3950
impl EventFormat for Event {
@@ -120,6 +131,82 @@ impl EventFormat for Event {
120131
Ok(None) => unreachable!("all records are added to one rb"),
121132
}
122133
}
134+
135+
fn into_event(
136+
self,
137+
stream_name: String,
138+
origin_size: u64,
139+
storage_schema: &HashMap<String, Arc<Field>>,
140+
static_schema_flag: bool,
141+
custom_partitions: Option<&String>,
142+
time_partition: Option<&String>,
143+
schema_version: SchemaVersion,
144+
stream_type: StreamType,
145+
) -> Result<super::Event, anyhow::Error> {
146+
let custom_partition_values = match custom_partitions.as_ref() {
147+
Some(custom_partition) => {
148+
let custom_partitions = custom_partition.split(',').collect_vec();
149+
get_custom_partition_values(&self.json, &custom_partitions)
150+
}
151+
None => HashMap::new(),
152+
};
153+
154+
let parsed_timestamp = match time_partition {
155+
Some(time_partition) => get_parsed_timestamp(&self.json, time_partition)?,
156+
_ => self.ingestion_time.naive_utc(),
157+
};
158+
159+
let (rb, is_first_event) = self.into_recordbatch(
160+
storage_schema,
161+
static_schema_flag,
162+
time_partition,
163+
schema_version,
164+
)?;
165+
166+
Ok(super::Event {
167+
rb,
168+
stream_name,
169+
origin_format: "json",
170+
origin_size,
171+
is_first_event,
172+
parsed_timestamp,
173+
time_partition: None,
174+
custom_partition_values,
175+
stream_type,
176+
})
177+
}
178+
}
179+
180+
pub fn get_custom_partition_values(
181+
json: &Value,
182+
custom_partition_list: &[&str],
183+
) -> HashMap<String, String> {
184+
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
185+
for custom_partition_field in custom_partition_list {
186+
let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
187+
let custom_partition_value = match custom_partition_value {
188+
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
189+
Value::String(s) => s,
190+
_ => "".to_string(),
191+
};
192+
custom_partition_values.insert(
193+
custom_partition_field.trim().to_string(),
194+
custom_partition_value,
195+
);
196+
}
197+
custom_partition_values
198+
}
199+
200+
fn get_parsed_timestamp(
201+
json: &Value,
202+
time_partition: &str,
203+
) -> Result<NaiveDateTime, anyhow::Error> {
204+
let current_time = json
205+
.get(time_partition)
206+
.ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?;
207+
let parsed_time: DateTime<Utc> = serde_json::from_value(current_time.clone())?;
208+
209+
Ok(parsed_time.naive_utc())
123210
}
124211

125212
// Returns arrow schema with the fields that are present in the request body
@@ -225,3 +312,37 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
225312
}
226313
}
227314
}
315+
316+
#[cfg(test)]
317+
mod tests {
318+
use std::str::FromStr;
319+
320+
use serde_json::json;
321+
322+
use super::*;
323+
324+
#[test]
325+
fn parse_time_parition_from_value() {
326+
let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
327+
let parsed = get_parsed_timestamp(&json, "timestamp");
328+
329+
let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap();
330+
assert_eq!(parsed.unwrap(), expected);
331+
}
332+
333+
#[test]
334+
fn time_parition_not_in_json() {
335+
let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
336+
let parsed = get_parsed_timestamp(&json, "timestamp");
337+
338+
assert!(parsed.is_err());
339+
}
340+
341+
#[test]
342+
fn time_parition_not_parseable_as_datetime() {
343+
let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
344+
let parsed = get_parsed_timestamp(&json, "timestamp");
345+
346+
assert!(parsed.is_err());
347+
}
348+
}

src/event/format/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ use serde_json::Value;
3232

3333
use crate::{
3434
metadata::SchemaVersion,
35+
storage::StreamType,
3536
utils::arrow::{get_field, get_timestamp_array, replace_columns},
3637
};
3738

38-
use super::DEFAULT_TIMESTAMP_KEY;
39+
use super::{Event, DEFAULT_TIMESTAMP_KEY};
3940

4041
pub mod json;
4142

@@ -172,6 +173,18 @@ pub trait EventFormat: Sized {
172173
}
173174
true
174175
}
176+
177+
fn into_event(
178+
self,
179+
stream_name: String,
180+
origin_size: u64,
181+
storage_schema: &HashMap<String, Arc<Field>>,
182+
static_schema_flag: bool,
183+
custom_partitions: Option<&String>,
184+
time_partition: Option<&String>,
185+
schema_version: SchemaVersion,
186+
stream_type: StreamType,
187+
) -> Result<Event, AnyError>;
175188
}
176189

177190
pub fn get_existing_field_names(

0 commit comments

Comments
 (0)