@@ -31,7 +31,7 @@ use crate::{
31
31
storage:: StreamType ,
32
32
LOCK_EXPECT ,
33
33
} ;
34
- use chrono:: NaiveDateTime ;
34
+ use chrono:: { NaiveDateTime , Utc } ;
35
35
use std:: collections:: HashMap ;
36
36
37
37
pub const DEFAULT_TIMESTAMP_KEY : & str = "p_timestamp" ;
@@ -47,7 +47,7 @@ pub struct Event {
47
47
pub origin_size : u64 ,
48
48
pub is_first_event : bool ,
49
49
pub parsed_timestamp : NaiveDateTime ,
50
- pub time_partition : Option < String > ,
50
+ pub time_partitioned : bool ,
51
51
pub custom_partition_values : HashMap < String , String > ,
52
52
pub stream_type : StreamType ,
53
53
}
@@ -56,12 +56,14 @@ pub struct Event {
56
56
impl Event {
57
57
pub fn process ( self ) -> Result < ( ) , EventError > {
58
58
let mut key = get_schema_key ( & self . rb . schema ( ) . fields ) ;
59
- if self . time_partition . is_some ( ) {
60
- let parsed_timestamp_to_min = self . parsed_timestamp . format ( "%Y%m%dT%H%M" ) . to_string ( ) ;
61
- key. push_str ( & parsed_timestamp_to_min) ;
59
+ if self . time_partitioned {
60
+ // For time partitioned streams, concatenate timestamp to filename, ensuring we don't write to a finished arrows file
61
+ let curr_timestamp = Utc :: now ( ) . format ( "%Y%m%dT%H%M" ) . to_string ( ) ;
62
+ key. push_str ( & curr_timestamp) ;
62
63
}
63
64
64
65
if !self . custom_partition_values . is_empty ( ) {
66
+ // For custom partitioned streams, concatenate values to filename, ensuring we write to different arrows files
65
67
for ( k, v) in self . custom_partition_values . iter ( ) . sorted_by_key ( |v| v. 0 ) {
66
68
key. push_str ( & format ! ( "&{k}={v}" ) ) ;
67
69
}
0 commit comments