Skip to content

Commit 6e1dd5c

Browse files
author
Devdutt Shenoi
committed
refactor: Minute
1 parent db7c74a commit 6e1dd5c

File tree

3 files changed

+58
-38
lines changed

3 files changed

+58
-38
lines changed

src/parseable/streams.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::{
2929
use arrow_array::RecordBatch;
3030
use arrow_ipc::writer::StreamWriter;
3131
use arrow_schema::{Field, Fields, Schema};
32-
use chrono::{NaiveDateTime, Timelike, Utc};
32+
use chrono::{NaiveDateTime, Utc};
3333
use derive_more::{Deref, DerefMut};
3434
use itertools::Itertools;
3535
use parquet::{
@@ -52,7 +52,7 @@ use crate::{
5252
storage::{
5353
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
5454
},
55-
utils::minute_to_slot,
55+
utils::time::Minute,
5656
LOCK_EXPECT,
5757
};
5858

@@ -157,16 +157,15 @@ impl Stream {
157157
hostname.push_str(id);
158158
}
159159
let filename = format!(
160-
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
160+
"{}{stream_hash}.{}.minute={}.{}.{hostname}.{ARROW_FILE_EXTENSION}",
161161
Utc::now().format("%Y%m%dT%H%M"),
162-
parsed_timestamp.date(),
163-
parsed_timestamp.hour(),
164-
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
162+
parsed_timestamp.format("date=%Y-%m-%d.hour=%H"),
163+
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
165164
custom_partition_values
166165
.iter()
167166
.sorted_by_key(|v| v.0)
168-
.map(|(key, value)| format!("{key}={value}."))
169-
.join("")
167+
.map(|(key, value)| format!("{key}={value}"))
168+
.join(".")
170169
);
171170
self.data_path.join(filename)
172171
}
@@ -860,11 +859,10 @@ mod tests {
860859
);
861860

862861
let expected_path = staging.data_path.join(format!(
863-
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
862+
"{}{stream_hash}.{}.minute={}.{}.{ARROW_FILE_EXTENSION}",
864863
Utc::now().format("%Y%m%dT%H%M"),
865-
parsed_timestamp.date(),
866-
parsed_timestamp.hour(),
867-
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
864+
parsed_timestamp.format("date=%Y-%m-%d.hour=%H"),
865+
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
868866
hostname::get().unwrap().into_string().unwrap()
869867
));
870868

@@ -895,11 +893,10 @@ mod tests {
895893
);
896894

897895
let expected_path = staging.data_path.join(format!(
898-
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
896+
"{}{stream_hash}.{}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
899897
Utc::now().format("%Y%m%dT%H%M"),
900-
parsed_timestamp.date(),
901-
parsed_timestamp.hour(),
902-
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
898+
parsed_timestamp.format("date=%Y-%m-%d.hour=%H"),
899+
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
903900
hostname::get().unwrap().into_string().unwrap()
904901
));
905902

src/utils/mod.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,6 @@ use regex::Regex;
3535
use sha2::{Digest, Sha256};
3636
use tracing::debug;
3737

38-
/// Convert minutes to a slot range
39-
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
40-
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
41-
if minute >= 60 {
42-
return None;
43-
}
44-
45-
let block_n = minute / data_granularity;
46-
let block_start = block_n * data_granularity;
47-
if data_granularity == 1 {
48-
return Some(format!("{block_start:02}"));
49-
}
50-
51-
let block_end = (block_n + 1) * data_granularity - 1;
52-
Some(format!("{block_start:02}-{block_end:02}"))
53-
}
54-
5538
pub fn get_ingestor_id() -> String {
5639
let now = Utc::now().to_rfc3339();
5740
let id = get_hash(&now).to_string().split_at(15).0.to_string();

src/utils/time.rs

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*
1717
*/
1818

19-
use chrono::{DateTime, NaiveDate, TimeDelta, Timelike, Utc};
20-
21-
use super::minute_to_slot;
19+
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Timelike, Utc};
2220

2321
#[derive(Debug, thiserror::Error)]
2422
pub enum TimeParseError {
@@ -231,8 +229,8 @@ impl TimeRange {
231229
prefixes: &mut Vec<String>,
232230
) {
233231
let mut push_prefix = |block: u32| {
234-
if let Some(minute_slot) = minute_to_slot(block * data_granularity, data_granularity) {
235-
let prefix = format!("{hour_prefix}minute={minute_slot}/");
232+
if let Ok(minute) = Minute::try_from(block * data_granularity) {
233+
let prefix = format!("{hour_prefix}minute={}/", minute.to_slot(data_granularity));
236234
prefixes.push(prefix);
237235
}
238236
};
@@ -266,6 +264,48 @@ impl TimeRange {
266264
}
267265
}
268266

267+
/// Describes a minute block
268+
#[derive(Debug, Clone, Copy)]
269+
pub struct Minute {
270+
block: u32,
271+
}
272+
273+
impl TryFrom<u32> for Minute {
274+
type Error = u32;
275+
276+
/// Returns a Minute if block is an acceptable minute value, else returns it as is
277+
fn try_from(block: u32) -> Result<Self, Self::Error> {
278+
if block >= 60 {
279+
return Err(block);
280+
}
281+
282+
Ok(Self { block })
283+
}
284+
}
285+
286+
impl From<NaiveDateTime> for Minute {
287+
fn from(timestamp: NaiveDateTime) -> Self {
288+
Self {
289+
block: timestamp.minute(),
290+
}
291+
}
292+
}
293+
294+
impl Minute {
295+
/// Convert minutes to a slot range
296+
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
297+
pub fn to_slot(self, data_granularity: u32) -> String {
298+
let block_n = self.block / data_granularity;
299+
let block_start = block_n * data_granularity;
300+
if data_granularity == 1 {
301+
return format!("{block_start:02}");
302+
}
303+
304+
let block_end = (block_n + 1) * data_granularity - 1;
305+
format!("{block_start:02}-{block_end:02}")
306+
}
307+
}
308+
269309
#[cfg(test)]
270310
mod tests {
271311
use super::*;

0 commit comments

Comments
 (0)