Skip to content

refactor: retire to_recordbatch in favor of to_event #1181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 8 additions & 26 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use tracing::{debug, error};

use crate::{
connectors::common::processor::Processor,
event::{
format::{json, EventFormat, LogSource},
Event as ParseableEvent,
},
event::format::{json, EventFormat, LogSource},
parseable::PARSEABLE,
storage::StreamType,
};
Expand All @@ -41,10 +38,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition
pub struct ParseableSinkProcessor;

impl ParseableSinkProcessor {
async fn build_event_from_chunk(
&self,
records: &[ConsumerRecord],
) -> anyhow::Result<ParseableEvent> {
async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<()> {
let stream_name = records
.first()
.map(|r| r.topic.as_str())
Expand All @@ -55,11 +49,6 @@ impl ParseableSinkProcessor {
.await?;

let stream = PARSEABLE.get_stream(stream_name)?;
let schema = stream.get_schema_raw();
let time_partition = stream.get_time_partition();
let custom_partition = stream.get_custom_partition();
let static_schema_flag = stream.get_static_schema_flag();
let schema_version = stream.get_schema_version();

let mut json_vec = Vec::with_capacity(records.len());
let mut total_payload_size = 0u64;
Expand All @@ -71,18 +60,11 @@ impl ParseableSinkProcessor {
}
}

let p_event = json::Event::new(Value::Array(json_vec)).into_event(
stream_name.to_string(),
total_payload_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
time_partition.as_ref(),
schema_version,
StreamType::UserDefined,
)?;

Ok(p_event)
json::Event::new(Value::Array(json_vec))
.to_event(&stream, total_payload_size)?
.process(&stream)?;

Ok(())
}
}

Expand All @@ -92,7 +74,7 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
let len = records.len();
debug!("Processing {len} records");

self.build_event_from_chunk(&records).await?.process()?;
self.process_event_from_chunk(&records).await?;

debug!("Processed {len} records");
Ok(())
Expand Down
73 changes: 24 additions & 49 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::EventFormat;
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
use super::{EventFormat, EventSchema};
use crate::{metadata::SchemaVersion, utils::arrow::get_field};

pub struct Event {
pub json: Value,
Expand All @@ -55,16 +55,36 @@ impl EventFormat for Event {
self.p_timestamp
}

fn get_partitions(
&self,
time_partition: Option<&String>,
custom_partitions: Option<&String>,
) -> anyhow::Result<(NaiveDateTime, HashMap<String, String>)> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
extract_custom_partition_values(&self.json, &custom_partitions)
}
None => HashMap::new(),
};

let parsed_timestamp = match time_partition {
Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?,
_ => self.p_timestamp.naive_utc(),
};

Ok((parsed_timestamp, custom_partition_values))
}

// convert the incoming json to a vector of json values
// also extract the arrow schema, tags and metadata from the incoming json
fn to_data(
self,
schema: &HashMap<String, Arc<Field>>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
) -> anyhow::Result<(Self::Data, EventSchema, bool)> {
let stream_schema = schema;

// incoming event may be a single json or a json array
// but Data (type defined above) is a vector of json values
// hence we need to convert the incoming event to a vector of json values
Expand Down Expand Up @@ -136,51 +156,6 @@ impl EventFormat for Event {
Ok(None) => unreachable!("all records are added to one rb"),
}
}

/// Converts a JSON event into a Parseable Event
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
extract_custom_partition_values(&self.json, &custom_partitions)
}
None => HashMap::new(),
};

let parsed_timestamp = match time_partition {
Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?,
_ => self.p_timestamp.naive_utc(),
};

let (rb, is_first_event) = self.into_recordbatch(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
)?;

Ok(super::Event {
rb,
stream_name,
origin_format: "json",
origin_size,
is_first_event,
parsed_timestamp,
time_partition: None,
custom_partition_values,
stream_type,
})
}
}

/// Extracts custom partition values from provided JSON object
Expand Down
73 changes: 41 additions & 32 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ use std::{
sync::Arc,
};

use anyhow::{anyhow, Error as AnyError};
use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::{DateTime, Utc};
use chrono::{DateTime, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{
metadata::SchemaVersion,
storage::StreamType,
parseable::Stream,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
};

use super::{Event, DEFAULT_TIMESTAMP_KEY};
use super::DEFAULT_TIMESTAMP_KEY;

pub mod json;

Expand Down Expand Up @@ -97,28 +97,36 @@ impl Display for LogSource {
pub trait EventFormat: Sized {
type Data;

fn get_partitions(
&self,
time_partition: Option<&String>,
custom_partitions: Option<&String>,
) -> anyhow::Result<(NaiveDateTime, HashMap<String, String>)>;

fn to_data(
self,
schema: &HashMap<String, Arc<Field>>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
) -> anyhow::Result<(Self::Data, EventSchema, bool)>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> anyhow::Result<RecordBatch>;

/// Returns the UTC time at ingestion
fn get_p_timestamp(&self) -> DateTime<Utc>;

fn into_recordbatch(
self,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(RecordBatch, bool), AnyError> {
fn to_event(self, stream: &Stream, origin_size: u64) -> anyhow::Result<super::Event> {
let storage_schema = stream.get_schema_raw();
let static_schema_flag = stream.get_static_schema_flag();
let time_partition = stream.get_time_partition();
let custom_partition = stream.get_custom_partition();
let schema_version = stream.get_schema_version();
let stream_type = stream.get_stream_type();
let p_timestamp = self.get_p_timestamp();
let (data, mut schema, is_first) =
self.to_data(storage_schema, time_partition, schema_version)?;
let (parsed_timestamp, custom_partition_values) =
self.get_partitions(time_partition.as_ref(), custom_partition.as_ref())?;
let (data, mut schema, is_first_event) =
self.to_data(&storage_schema, time_partition.as_ref(), schema_version)?;

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
Expand All @@ -139,11 +147,16 @@ pub trait EventFormat: Sized {

// prepare the record batch and new fields to be added
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
if !Self::is_schema_matching(new_schema.clone(), &storage_schema, static_schema_flag) {
return Err(anyhow!("Schema mismatch"));
}
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
new_schema = update_field_type_in_schema(
new_schema,
None,
time_partition.as_ref(),
None,
schema_version,
);

let mut rb = Self::decode(data, new_schema.clone())?;
rb = replace_columns(
Expand All @@ -153,7 +166,16 @@ pub trait EventFormat: Sized {
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
);

Ok((rb, is_first))
Ok(super::Event {
rb,
origin_format: "json",
origin_size,
is_first_event,
time_partition,
parsed_timestamp,
custom_partition_values,
stream_type,
})
}

fn is_schema_matching(
Expand All @@ -177,19 +199,6 @@ pub trait EventFormat: Sized {
}
true
}

#[allow(clippy::too_many_arguments)]
fn into_event(
self,
stream_name: String,
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<Event, AnyError>;
}

pub fn get_existing_field_names(
Expand Down
17 changes: 8 additions & 9 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;
use self::error::EventError;
use crate::{
metadata::update_stats,
parseable::{StagingError, PARSEABLE},
parseable::{StagingError, Stream, PARSEABLE},
storage::StreamType,
LOCK_EXPECT,
};
Expand All @@ -38,7 +38,6 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";

#[derive(Clone)]
pub struct Event {
pub stream_name: String,
pub rb: RecordBatch,
pub origin_format: &'static str,
pub origin_size: u64,
Expand All @@ -51,7 +50,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub fn process(self) -> Result<(), EventError> {
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand All @@ -65,10 +64,10 @@ impl Event {
}

if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
commit_schema(&stream.stream_name, self.rb.schema())?;
}

PARSEABLE.get_or_create_stream(&self.stream_name).push(
PARSEABLE.get_or_create_stream(&stream.stream_name).push(
&key,
&self.rb,
self.parsed_timestamp,
Expand All @@ -77,22 +76,22 @@ impl Event {
)?;

update_stats(
&self.stream_name,
&stream.stream_name,
self.origin_format,
self.origin_size,
self.rb.num_rows(),
self.parsed_timestamp.date(),
);

crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
crate::livetail::LIVETAIL.process(&stream.stream_name, &self.rb);

Ok(())
}

pub fn process_unchecked(&self) -> Result<(), EventError> {
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

PARSEABLE.get_or_create_stream(&self.stream_name).push(
PARSEABLE.get_or_create_stream(&stream.stream_name).push(
&key,
&self.rb,
self.parsed_timestamp,
Expand Down
Loading
Loading