@@ -119,23 +119,21 @@ impl Stream {
119
119
) -> Result < ( ) , StagingError > {
120
120
let mut guard = self . writer . lock ( ) . unwrap ( ) ;
121
121
if self . options . mode != Mode :: Query || stream_type == StreamType :: Internal {
122
+ let filename =
123
+ self . filename_by_partition ( schema_key, parsed_timestamp, custom_partition_values) ;
122
124
match guard. disk . get_mut ( schema_key) {
123
125
Some ( writer) => {
124
126
writer. write ( record) ?;
125
127
}
126
128
None => {
127
129
// entry is not present thus we create it
128
- let file_path = self . path_by_current_time (
129
- schema_key,
130
- parsed_timestamp,
131
- custom_partition_values,
132
- ) ;
133
130
std:: fs:: create_dir_all ( & self . data_path ) ?;
134
131
135
132
let range = TimeRange :: granularity_range (
136
133
parsed_timestamp. and_local_timezone ( Utc ) . unwrap ( ) ,
137
134
OBJECT_STORE_DATA_GRANULARITY ,
138
135
) ;
136
+ let file_path = self . data_path . join ( filename) ;
139
137
let mut writer = DiskWriter :: try_new ( file_path, & record. schema ( ) , range)
140
138
. expect ( "File and RecordBatch both are checked" ) ;
141
139
@@ -150,17 +148,17 @@ impl Stream {
150
148
Ok ( ( ) )
151
149
}
152
150
153
- pub fn path_by_current_time (
151
+ pub fn filename_by_partition (
154
152
& self ,
155
153
stream_hash : & str ,
156
154
parsed_timestamp : NaiveDateTime ,
157
155
custom_partition_values : & HashMap < String , String > ,
158
- ) -> PathBuf {
156
+ ) -> String {
159
157
let mut hostname = hostname:: get ( ) . unwrap ( ) . into_string ( ) . unwrap ( ) ;
160
158
if let Some ( id) = & self . ingestor_id {
161
159
hostname. push_str ( id) ;
162
160
}
163
- let filename = format ! (
161
+ format ! (
164
162
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}" ,
165
163
parsed_timestamp. date( ) ,
166
164
parsed_timestamp. hour( ) ,
@@ -170,8 +168,7 @@ impl Stream {
170
168
. sorted_by_key( |v| v. 0 )
171
169
. map( |( key, value) | format!( "{key}={value}." ) )
172
170
. join( "" )
173
- ) ;
174
- self . data_path . join ( filename)
171
+ )
175
172
}
176
173
177
174
pub fn arrow_files ( & self ) -> Vec < PathBuf > {
@@ -877,18 +874,18 @@ mod tests {
877
874
None ,
878
875
) ;
879
876
880
- let expected_path = staging . data_path . join ( format ! (
877
+ let expected = format ! (
881
878
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}" ,
882
879
parsed_timestamp. date( ) ,
883
880
parsed_timestamp. hour( ) ,
884
881
Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
885
882
hostname:: get( ) . unwrap( ) . into_string( ) . unwrap( )
886
- ) ) ;
883
+ ) ;
887
884
888
- let generated_path =
889
- staging. path_by_current_time ( stream_hash, parsed_timestamp, & custom_partition_values) ;
885
+ let generated =
886
+ staging. filename_by_partition ( stream_hash, parsed_timestamp, & custom_partition_values) ;
890
887
891
- assert_eq ! ( generated_path , expected_path ) ;
888
+ assert_eq ! ( generated , expected ) ;
892
889
}
893
890
894
891
#[ test]
@@ -911,18 +908,18 @@ mod tests {
911
908
None ,
912
909
) ;
913
910
914
- let expected_path = staging . data_path . join ( format ! (
911
+ let expected = format ! (
915
912
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}" ,
916
913
parsed_timestamp. date( ) ,
917
914
parsed_timestamp. hour( ) ,
918
915
Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
919
916
hostname:: get( ) . unwrap( ) . into_string( ) . unwrap( )
920
- ) ) ;
917
+ ) ;
921
918
922
- let generated_path =
923
- staging. path_by_current_time ( stream_hash, parsed_timestamp, & custom_partition_values) ;
919
+ let generated =
920
+ staging. filename_by_partition ( stream_hash, parsed_timestamp, & custom_partition_values) ;
924
921
925
- assert_eq ! ( generated_path , expected_path ) ;
922
+ assert_eq ! ( generated , expected ) ;
926
923
}
927
924
928
925
#[ test]
0 commit comments