From 764cba33751d9e5866ba99c5b89f00451b901e17 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 15:23:02 +0530 Subject: [PATCH 1/3] fix: capture time created from metadata not filename --- src/parseable/streams.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..85314feef 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -24,12 +24,13 @@ use std::{ path::{Path, PathBuf}, process, sync::{Arc, Mutex, RwLock}, + time::UNIX_EPOCH, }; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -193,7 +194,7 @@ impl Stream { /// Only includes ones starting from the previous minute pub fn arrow_files_grouped_exclude_time( &self, - exclude: NaiveDateTime, + exclude: DateTime, shutdown_signal: bool, ) -> HashMap> { let mut grouped_arrow_file: HashMap> = HashMap::new(); @@ -203,12 +204,14 @@ impl Stream { // don't keep the ones for the current minute if !shutdown_signal { arrow_files.retain(|path| { - !path - .file_name() - .unwrap() - .to_str() - .unwrap() - .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) + path.metadata() + .expect("Arrow file should exist on disk") + .created() + .expect("Creation time should be accessible") + .duration_since(UNIX_EPOCH) + .expect("Unix Timestamp Duration") + .as_millis() + < exclude.timestamp_millis() as u128 }); } @@ -430,8 +433,7 @@ impl Stream { ) -> Result, StagingError> { let mut schemas = Vec::new(); - let time = chrono::Utc::now().naive_utc(); - let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal); + let staging_files = self.arrow_files_grouped_exclude_time(Utc::now(), shutdown_signal); if staging_files.is_empty() { metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) From 3048c637312c485b54addc0f179ac480854fb4b5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 15:47:32 +0530 Subject: [PATCH 2/3] refactor: use `SystemTime` --- src/parseable/streams.rs | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 4429bea41..b2a766260 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -24,13 +24,13 @@ use std::{ path::{Path, PathBuf}, process, sync::{Arc, Mutex, RwLock}, - time::UNIX_EPOCH, + time::{SystemTime, UNIX_EPOCH}, }; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::{Field, Fields, Schema}; -use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -73,6 +73,14 @@ const ARROW_FILE_EXTENSION: &str = "data.arrows"; pub type StreamRef = Arc; +/// Gets the unix timestamp for the minute as described by the `SystemTime` +fn minute_from_system_time(time: SystemTime) -> u128 { + time.duration_since(UNIX_EPOCH) + .expect("Legitimate time") + .as_millis() + / 60000 +} + /// All state associated with a single logstream in Parseable. pub struct Stream { pub stream_name: String, @@ -193,7 +201,7 @@ impl Stream { /// Only includes ones starting from the previous minute pub fn arrow_files_grouped_exclude_time( &self, - exclude: DateTime, + exclude: SystemTime, shutdown_signal: bool, ) -> HashMap> { let mut grouped_arrow_file: HashMap> = HashMap::new(); @@ -203,14 +211,13 @@ impl Stream { // don't keep the ones for the current minute if !shutdown_signal { arrow_files.retain(|path| { - path.metadata() + let creation = path + .metadata() .expect("Arrow file should exist on disk") .created() - .expect("Creation time should be accessible") - .duration_since(UNIX_EPOCH) - .expect("Unix Timestamp Duration") - .as_millis() - < exclude.timestamp_millis() as u128 + .expect("Creation time should be accessible"); + // Compare if creation time is actually from previous minute + minute_from_system_time(creation) < minute_from_system_time(exclude) }); } @@ -432,7 +439,8 @@ impl Stream { ) -> Result, StagingError> { let mut schemas = Vec::new(); - let staging_files = self.arrow_files_grouped_exclude_time(Utc::now(), shutdown_signal); + let now = SystemTime::now(); + let staging_files = self.arrow_files_grouped_exclude_time(now, shutdown_signal); if staging_files.is_empty() { metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) From 354dc913a4e8e12a2c439d1690c4767d4f5e3925 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 15:56:37 +0530 Subject: [PATCH 3/3] refactor: don't prefix time --- src/parseable/streams.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b2a766260..57d24a3fb 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -30,7 +30,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Timelike}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -165,8 +165,7 @@ impl Stream { hostname.push_str(id); } let filename = format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", - Utc::now().format("%Y%m%dT%H%M"), + "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), @@ -767,7 +766,7 @@ mod tests { use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; use arrow_schema::{DataType, Field, TimeUnit}; - use chrono::{NaiveDate, TimeDelta}; + use chrono::{NaiveDate, TimeDelta, Utc}; use temp_dir::TempDir; use tokio::time::sleep; @@ -884,8 +883,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", - Utc::now().format("%Y%m%dT%H%M"), + "{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), @@ -919,8 +917,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", - Utc::now().format("%Y%m%dT%H%M"), + "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),