Skip to content

Commit 764cba3

Browse files
author
Devdutt Shenoi
committed
fix: capture time created from metadata not filename
1 parent 245ec54 commit 764cba3

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

src/parseable/streams.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ use std::{
2424
path::{Path, PathBuf},
2525
process,
2626
sync::{Arc, Mutex, RwLock},
27+
time::UNIX_EPOCH,
2728
};
2829

2930
use arrow_array::RecordBatch;
3031
use arrow_ipc::writer::StreamWriter;
3132
use arrow_schema::{Field, Fields, Schema};
32-
use chrono::{NaiveDateTime, Timelike, Utc};
33+
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
3334
use derive_more::{Deref, DerefMut};
3435
use itertools::Itertools;
3536
use parquet::{
@@ -193,7 +194,7 @@ impl Stream {
193194
/// Only includes ones starting from the previous minute
194195
pub fn arrow_files_grouped_exclude_time(
195196
&self,
196-
exclude: NaiveDateTime,
197+
exclude: DateTime<Utc>,
197198
shutdown_signal: bool,
198199
) -> HashMap<PathBuf, Vec<PathBuf>> {
199200
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
@@ -203,12 +204,14 @@ impl Stream {
203204
// don't keep the ones for the current minute
204205
if !shutdown_signal {
205206
arrow_files.retain(|path| {
206-
!path
207-
.file_name()
208-
.unwrap()
209-
.to_str()
210-
.unwrap()
211-
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
207+
path.metadata()
208+
.expect("Arrow file should exist on disk")
209+
.created()
210+
.expect("Creation time should be accessible")
211+
.duration_since(UNIX_EPOCH)
212+
.expect("Unix Timestamp Duration")
213+
.as_millis()
214+
< exclude.timestamp_millis() as u128
212215
});
213216
}
214217

@@ -430,8 +433,7 @@ impl Stream {
430433
) -> Result<Option<Schema>, StagingError> {
431434
let mut schemas = Vec::new();
432435

433-
let time = chrono::Utc::now().naive_utc();
434-
let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal);
436+
let staging_files = self.arrow_files_grouped_exclude_time(Utc::now(), shutdown_signal);
435437
if staging_files.is_empty() {
436438
metrics::STAGING_FILES
437439
.with_label_values(&[&self.stream_name])

0 commit comments

Comments
 (0)