Skip to content

refactor: capture ingestion time at receive #1210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl ParseableSinkProcessor {

let (rb, is_first) = batch_json_event.into_recordbatch(
&schema,
Utc::now(),
static_schema_flag,
time_partition.as_ref(),
schema_version,
Expand Down
5 changes: 3 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
use anyhow::{anyhow, Error as AnyError};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -108,6 +108,7 @@ pub trait EventFormat: Sized {
fn into_recordbatch(
self,
storage_schema: &HashMap<String, Arc<Field>>,
p_timestamp: DateTime<Utc>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
Expand Down Expand Up @@ -145,7 +146,7 @@ pub trait EventFormat: Sized {
rb.schema(),
&rb,
&[0],
&[Arc::new(get_timestamp_array(rb.num_rows()))],
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
);

Ok((rb, is_first))
Expand Down
70 changes: 54 additions & 16 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
let size: usize = body.len();
let parsed_timestamp = Utc::now().naive_utc();
let now = Utc::now();
let (rb, is_first) = {
let body_val: Value = serde_json::from_slice(&body)?;
let hash_map = PARSEABLE.streams.read().unwrap();
Expand All @@ -93,15 +93,15 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
.clone();
let event = format::json::Event { data: body_val };
// For internal streams, use old schema
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
event.into_recordbatch(&schema, now, false, None, SchemaVersion::V0)?
};
event::Event {
rb,
stream_name,
origin_format: "json",
origin_size: size as u64,
is_first_event: is_first,
parsed_timestamp,
parsed_timestamp: now.naive_utc(),
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::Internal,
Expand Down Expand Up @@ -351,6 +351,7 @@ mod tests {
use arrow::datatypes::Int64Type;
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
use arrow_schema::{DataType, Field};
use chrono::Utc;
use serde_json::json;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -392,8 +393,15 @@ mod tests {
"b": "hello",
});

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 4);
Expand All @@ -419,8 +427,15 @@ mod tests {
"c": null
});

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -450,7 +465,8 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -480,7 +496,9 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
assert!(
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
);
}

#[test]
Expand All @@ -496,7 +514,8 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 1);
Expand Down Expand Up @@ -535,8 +554,15 @@ mod tests {
},
]);

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -582,8 +608,15 @@ mod tests {
},
]);

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -630,7 +663,8 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -677,7 +711,9 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
assert!(
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
);
}

#[test]
Expand Down Expand Up @@ -718,6 +754,7 @@ mod tests {
let (rb, _) = into_event_batch(
flattened_json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
Expand Down Expand Up @@ -806,6 +843,7 @@ mod tests {
let (rb, _) = into_event_batch(
flattened_json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V1,
Expand Down
6 changes: 5 additions & 1 deletion src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ async fn push_logs(
let static_schema_flag = stream.get_static_schema_flag();
let custom_partition = stream.get_custom_partition();
let schema_version = stream.get_schema_version();
let p_timestamp = Utc::now();

let data = if time_partition.is_some() || custom_partition.is_some() {
convert_array_to_object(
Expand All @@ -121,7 +122,7 @@ async fn push_logs(
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
let parsed_timestamp = match time_partition.as_ref() {
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
_ => Utc::now().naive_utc(),
_ => p_timestamp.naive_utc(),
};
let custom_partition_values = match custom_partition.as_ref() {
Some(custom_partition) => {
Expand All @@ -144,6 +145,7 @@ async fn push_logs(
let (rb, is_first_event) = into_event_batch(
value,
schema,
p_timestamp,
static_schema_flag,
time_partition.as_ref(),
schema_version,
Expand All @@ -168,12 +170,14 @@ async fn push_logs(
pub fn into_event_batch(
data: Value,
schema: HashMap<String, Arc<Field>>,
p_timestamp: DateTime<Utc>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
let (rb, is_first) = json::Event { data }.into_recordbatch(
&schema,
p_timestamp,
static_schema_flag,
time_partition,
schema_version,
Expand Down
14 changes: 7 additions & 7 deletions src/utils/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::Arc;
use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array};
use arrow_schema::Schema;
use arrow_select::take::take;
use chrono::Utc;
use chrono::{DateTime, Utc};
use itertools::Itertools;

pub mod batch_adapter;
Expand Down Expand Up @@ -133,8 +133,8 @@ pub fn get_field<'a>(
/// # Returns
///
/// A column in arrow, containing the current timestamp in millis.
pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size)
pub fn get_timestamp_array(p_timestamp: DateTime<Utc>, size: usize) -> TimestampMillisecondArray {
TimestampMillisecondArray::from_value(p_timestamp.timestamp_millis(), size)
}

pub fn reverse(rb: &RecordBatch) -> RecordBatch {
Expand Down Expand Up @@ -196,19 +196,19 @@ mod tests {
#[test]
fn test_timestamp_array_has_correct_size_and_value() {
let size = 5;
let now = Utc::now().timestamp_millis();
let now = Utc::now();

let array = get_timestamp_array(size);
let array = get_timestamp_array(now, size);

assert_eq!(array.len(), size);
for i in 0..size {
assert!(array.value(i) >= now);
assert!(array.value(i) >= now.timestamp_millis());
}
}

#[test]
fn test_timestamp_array_with_zero_size() {
let array = get_timestamp_array(0);
let array = get_timestamp_array(Utc::now(), 0);

assert_eq!(array.len(), 0);
assert!(array.is_empty());
Expand Down
Loading