Skip to content

Commit 14e173b

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into file-naming
Signed-off-by: Devdutt Shenoi <[email protected]>
2 parents ff9cddf + f209de6 commit 14e173b

File tree

6 files changed

+94
-43
lines changed

6 files changed

+94
-43
lines changed

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl ParseableSinkProcessor {
6868

6969
let (rb, is_first) = batch_json_event.into_recordbatch(
7070
&schema,
71+
Utc::now(),
7172
static_schema_flag,
7273
time_partition.as_ref(),
7374
schema_version,

src/event/format/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::{
2626
use anyhow::{anyhow, Error as AnyError};
2727
use arrow_array::RecordBatch;
2828
use arrow_schema::{DataType, Field, Schema, TimeUnit};
29-
use chrono::DateTime;
29+
use chrono::{DateTime, Utc};
3030
use serde::{Deserialize, Serialize};
3131
use serde_json::Value;
3232

@@ -108,6 +108,7 @@ pub trait EventFormat: Sized {
108108
fn into_recordbatch(
109109
self,
110110
storage_schema: &HashMap<String, Arc<Field>>,
111+
p_timestamp: DateTime<Utc>,
111112
static_schema_flag: bool,
112113
time_partition: Option<&String>,
113114
schema_version: SchemaVersion,
@@ -145,7 +146,7 @@ pub trait EventFormat: Sized {
145146
rb.schema(),
146147
&rb,
147148
&[0],
148-
&[Arc::new(get_timestamp_array(rb.num_rows()))],
149+
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
149150
);
150151

151152
Ok((rb, is_first))

src/handlers/http/ingest.rs

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7979

8080
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
8181
let size: usize = body.len();
82-
let parsed_timestamp = Utc::now().naive_utc();
82+
let now = Utc::now();
8383
let (rb, is_first) = {
8484
let body_val: Value = serde_json::from_slice(&body)?;
8585
let hash_map = PARSEABLE.streams.read().unwrap();
@@ -93,15 +93,15 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9393
.clone();
9494
let event = format::json::Event { data: body_val };
9595
// For internal streams, use old schema
96-
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
96+
event.into_recordbatch(&schema, now, false, None, SchemaVersion::V0)?
9797
};
9898
event::Event {
9999
rb,
100100
stream_name,
101101
origin_format: "json",
102102
origin_size: size as u64,
103103
is_first_event: is_first,
104-
parsed_timestamp,
104+
parsed_timestamp: now.naive_utc(),
105105
time_partition: None,
106106
custom_partition_values: HashMap::new(),
107107
stream_type: StreamType::Internal,
@@ -351,6 +351,7 @@ mod tests {
351351
use arrow::datatypes::Int64Type;
352352
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
353353
use arrow_schema::{DataType, Field};
354+
use chrono::Utc;
354355
use serde_json::json;
355356
use std::{collections::HashMap, sync::Arc};
356357

@@ -392,8 +393,15 @@ mod tests {
392393
"b": "hello",
393394
});
394395

395-
let (rb, _) =
396-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
396+
let (rb, _) = into_event_batch(
397+
json,
398+
HashMap::default(),
399+
Utc::now(),
400+
false,
401+
None,
402+
SchemaVersion::V0,
403+
)
404+
.unwrap();
397405

398406
assert_eq!(rb.num_rows(), 1);
399407
assert_eq!(rb.num_columns(), 4);
@@ -419,8 +427,15 @@ mod tests {
419427
"c": null
420428
});
421429

422-
let (rb, _) =
423-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
430+
let (rb, _) = into_event_batch(
431+
json,
432+
HashMap::default(),
433+
Utc::now(),
434+
false,
435+
None,
436+
SchemaVersion::V0,
437+
)
438+
.unwrap();
424439

425440
assert_eq!(rb.num_rows(), 1);
426441
assert_eq!(rb.num_columns(), 3);
@@ -450,7 +465,8 @@ mod tests {
450465
.into_iter(),
451466
);
452467

453-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
468+
let (rb, _) =
469+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
454470

455471
assert_eq!(rb.num_rows(), 1);
456472
assert_eq!(rb.num_columns(), 3);
@@ -480,7 +496,9 @@ mod tests {
480496
.into_iter(),
481497
);
482498

483-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
499+
assert!(
500+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
501+
);
484502
}
485503

486504
#[test]
@@ -496,7 +514,8 @@ mod tests {
496514
.into_iter(),
497515
);
498516

499-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
517+
let (rb, _) =
518+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
500519

501520
assert_eq!(rb.num_rows(), 1);
502521
assert_eq!(rb.num_columns(), 1);
@@ -535,8 +554,15 @@ mod tests {
535554
},
536555
]);
537556

538-
let (rb, _) =
539-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
557+
let (rb, _) = into_event_batch(
558+
json,
559+
HashMap::default(),
560+
Utc::now(),
561+
false,
562+
None,
563+
SchemaVersion::V0,
564+
)
565+
.unwrap();
540566

541567
assert_eq!(rb.num_rows(), 3);
542568
assert_eq!(rb.num_columns(), 4);
@@ -582,8 +608,15 @@ mod tests {
582608
},
583609
]);
584610

585-
let (rb, _) =
586-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
611+
let (rb, _) = into_event_batch(
612+
json,
613+
HashMap::default(),
614+
Utc::now(),
615+
false,
616+
None,
617+
SchemaVersion::V0,
618+
)
619+
.unwrap();
587620

588621
assert_eq!(rb.num_rows(), 3);
589622
assert_eq!(rb.num_columns(), 4);
@@ -630,7 +663,8 @@ mod tests {
630663
.into_iter(),
631664
);
632665

633-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
666+
let (rb, _) =
667+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
634668

635669
assert_eq!(rb.num_rows(), 3);
636670
assert_eq!(rb.num_columns(), 4);
@@ -677,7 +711,9 @@ mod tests {
677711
.into_iter(),
678712
);
679713

680-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
714+
assert!(
715+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
716+
);
681717
}
682718

683719
#[test]
@@ -718,6 +754,7 @@ mod tests {
718754
let (rb, _) = into_event_batch(
719755
flattened_json,
720756
HashMap::default(),
757+
Utc::now(),
721758
false,
722759
None,
723760
SchemaVersion::V0,
@@ -806,6 +843,7 @@ mod tests {
806843
let (rb, _) = into_event_batch(
807844
flattened_json,
808845
HashMap::default(),
846+
Utc::now(),
809847
false,
810848
None,
811849
SchemaVersion::V1,

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async fn push_logs(
9696
let static_schema_flag = stream.get_static_schema_flag();
9797
let custom_partition = stream.get_custom_partition();
9898
let schema_version = stream.get_schema_version();
99+
let p_timestamp = Utc::now();
99100

100101
let data = if time_partition.is_some() || custom_partition.is_some() {
101102
convert_array_to_object(
@@ -121,7 +122,7 @@ async fn push_logs(
121122
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
122123
let parsed_timestamp = match time_partition.as_ref() {
123124
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
124-
_ => Utc::now().naive_utc(),
125+
_ => p_timestamp.naive_utc(),
125126
};
126127
let custom_partition_values = match custom_partition.as_ref() {
127128
Some(custom_partition) => {
@@ -144,6 +145,7 @@ async fn push_logs(
144145
let (rb, is_first_event) = into_event_batch(
145146
value,
146147
schema,
148+
p_timestamp,
147149
static_schema_flag,
148150
time_partition.as_ref(),
149151
schema_version,
@@ -168,12 +170,14 @@ async fn push_logs(
168170
pub fn into_event_batch(
169171
data: Value,
170172
schema: HashMap<String, Arc<Field>>,
173+
p_timestamp: DateTime<Utc>,
171174
static_schema_flag: bool,
172175
time_partition: Option<&String>,
173176
schema_version: SchemaVersion,
174177
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
175178
let (rb, is_first) = json::Event { data }.into_recordbatch(
176179
&schema,
180+
p_timestamp,
177181
static_schema_flag,
178182
time_partition,
179183
schema_version,

src/parseable/streams.rs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ use std::{
2424
path::{Path, PathBuf},
2525
process,
2626
sync::{Arc, Mutex, RwLock},
27+
time::{SystemTime, UNIX_EPOCH},
2728
};
2829

2930
use arrow_array::RecordBatch;
3031
use arrow_ipc::writer::StreamWriter;
3132
use arrow_schema::{Field, Fields, Schema};
32-
use chrono::{NaiveDateTime, Timelike, Utc};
33+
use chrono::{NaiveDateTime, Timelike};
3334
use derive_more::{Deref, DerefMut};
3435
use itertools::Itertools;
3536
use parquet::{
@@ -70,6 +71,14 @@ pub struct StreamNotFound(pub String);
7071

7172
pub type StreamRef = Arc<Stream>;
7273

74+
/// Gets the unix timestamp for the minute as described by the `SystemTime`
75+
fn minute_from_system_time(time: SystemTime) -> u128 {
76+
time.duration_since(UNIX_EPOCH)
77+
.expect("Legitimate time")
78+
.as_millis()
79+
/ 60000
80+
}
81+
7382
/// All state associated with a single logstream in Parseable.
7483
pub struct Stream {
7584
pub stream_name: String,
@@ -154,8 +163,7 @@ impl Stream {
154163
hostname.push_str(id);
155164
}
156165
let filename = format!(
157-
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
158-
Utc::now().format("%Y%m%dT%H%M"),
166+
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
159167
parsed_timestamp.date(),
160168
parsed_timestamp.hour(),
161169
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
@@ -190,7 +198,7 @@ impl Stream {
190198
/// Only includes ones starting from the previous minute
191199
pub fn arrow_files_grouped_exclude_time(
192200
&self,
193-
exclude: NaiveDateTime,
201+
exclude: SystemTime,
194202
shutdown_signal: bool,
195203
) -> HashMap<PathBuf, Vec<PathBuf>> {
196204
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
@@ -200,12 +208,13 @@ impl Stream {
200208
// don't keep the ones for the current minute
201209
if !shutdown_signal {
202210
arrow_files.retain(|path| {
203-
!path
204-
.file_name()
205-
.unwrap()
206-
.to_str()
207-
.unwrap()
208-
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
211+
let creation = path
212+
.metadata()
213+
.expect("Arrow file should exist on disk")
214+
.created()
215+
.expect("Creation time should be accessible");
216+
// Compare if creation time is actually from previous minute
217+
minute_from_system_time(creation) < minute_from_system_time(exclude)
209218
});
210219
}
211220

@@ -427,8 +436,8 @@ impl Stream {
427436
) -> Result<Option<Schema>, StagingError> {
428437
let mut schemas = Vec::new();
429438

430-
let time = chrono::Utc::now().naive_utc();
431-
let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal);
439+
let now = SystemTime::now();
440+
let staging_files = self.arrow_files_grouped_exclude_time(now, shutdown_signal);
432441
if staging_files.is_empty() {
433442
metrics::STAGING_FILES
434443
.with_label_values(&[&self.stream_name])
@@ -764,7 +773,7 @@ mod tests {
764773

765774
use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray};
766775
use arrow_schema::{DataType, Field, TimeUnit};
767-
use chrono::{NaiveDate, TimeDelta};
776+
use chrono::{NaiveDate, TimeDelta, Utc};
768777
use temp_dir::TempDir;
769778
use tokio::time::sleep;
770779

@@ -881,8 +890,7 @@ mod tests {
881890
);
882891

883892
let expected_path = staging.data_path.join(format!(
884-
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
885-
Utc::now().format("%Y%m%dT%H%M"),
893+
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
886894
parsed_timestamp.date(),
887895
parsed_timestamp.hour(),
888896
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
@@ -916,8 +924,7 @@ mod tests {
916924
);
917925

918926
let expected_path = staging.data_path.join(format!(
919-
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
920-
Utc::now().format("%Y%m%dT%H%M"),
927+
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
921928
parsed_timestamp.date(),
922929
parsed_timestamp.hour(),
923930
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),

0 commit comments

Comments
 (0)