Skip to content

Commit 25747b8

Browse files
author
Devdutt Shenoi
committed
refactor: retire to_recordbatch in favor of to_event
1 parent 4e9bb6e commit 25747b8

File tree

4 files changed

+567
-583
lines changed

4 files changed

+567
-583
lines changed

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,18 @@ impl ParseableSinkProcessor {
6060
data: Value::Array(json_vec.to_vec()),
6161
};
6262

63-
let (rb, is_first) = batch_json_event.into_recordbatch(
63+
let p_event = batch_json_event.to_event(
64+
stream_name,
65+
total_payload_size,
6466
&schema,
6567
static_schema_flag,
66-
time_partition.as_ref(),
68+
Utc::now().naive_utc(),
69+
time_partition,
70+
HashMap::new(),
6771
schema_version,
72+
StreamType::UserDefined,
6873
)?;
6974

70-
let p_event = ParseableEvent {
71-
rb,
72-
stream_name: stream_name.to_string(),
73-
origin_format: "json",
74-
origin_size: total_payload_size,
75-
is_first_event: is_first,
76-
parsed_timestamp: Utc::now().naive_utc(),
77-
time_partition: None,
78-
custom_partition_values: HashMap::new(),
79-
stream_type: StreamType::UserDefined,
80-
};
81-
8275
Ok(p_event)
8376
}
8477

src/event/format/mod.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ use std::{
2626
use anyhow::{anyhow, Error as AnyError};
2727
use arrow_array::RecordBatch;
2828
use arrow_schema::{DataType, Field, Schema, TimeUnit};
29-
use chrono::DateTime;
29+
use chrono::{DateTime, NaiveDateTime};
3030
use serde::{Deserialize, Serialize};
3131
use serde_json::Value;
3232

3333
use crate::{
3434
metadata::SchemaVersion,
35+
storage::StreamType,
3536
utils::arrow::{get_field, get_timestamp_array, replace_columns},
3637
};
3738

@@ -105,15 +106,20 @@ pub trait EventFormat: Sized {
105106

106107
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
107108

108-
fn into_recordbatch(
109+
fn to_event(
109110
self,
111+
stream_name: &str,
112+
origin_size: u64,
110113
storage_schema: &HashMap<String, Arc<Field>>,
111114
static_schema_flag: bool,
112-
time_partition: Option<&String>,
115+
parsed_timestamp: NaiveDateTime,
116+
time_partition: Option<String>,
117+
custom_partition_values: HashMap<String, String>,
113118
schema_version: SchemaVersion,
114-
) -> Result<(RecordBatch, bool), AnyError> {
115-
let (data, mut schema, is_first) =
116-
self.to_data(storage_schema, time_partition, schema_version)?;
119+
stream_type: StreamType,
120+
) -> Result<super::Event, AnyError> {
121+
let (data, mut schema, is_first_event) =
122+
self.to_data(storage_schema, time_partition.as_ref(), schema_version)?;
117123

118124
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
119125
return Err(anyhow!(
@@ -137,8 +143,13 @@ pub trait EventFormat: Sized {
137143
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
138144
return Err(anyhow!("Schema mismatch"));
139145
}
140-
new_schema =
141-
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
146+
new_schema = update_field_type_in_schema(
147+
new_schema,
148+
None,
149+
time_partition.as_ref(),
150+
None,
151+
schema_version,
152+
);
142153

143154
let mut rb = Self::decode(data, new_schema.clone())?;
144155
rb = replace_columns(
@@ -148,7 +159,17 @@ pub trait EventFormat: Sized {
148159
&[Arc::new(get_timestamp_array(rb.num_rows()))],
149160
);
150161

151-
Ok((rb, is_first))
162+
Ok(super::Event {
163+
rb,
164+
stream_name: stream_name.to_string(),
165+
origin_format: "json",
166+
origin_size,
167+
is_first_event,
168+
parsed_timestamp,
169+
time_partition,
170+
custom_partition_values,
171+
stream_type,
172+
})
152173
}
153174

154175
fn is_schema_matching(

0 commit comments

Comments
 (0)