@@ -32,7 +32,6 @@ use arrow_schema::{Field, Fields, Schema};
3232use chrono:: { NaiveDateTime , Timelike , Utc } ;
3333use derive_more:: { Deref , DerefMut } ;
3434use itertools:: Itertools ;
35- use once_cell:: sync:: Lazy ;
3635use parquet:: {
3736 arrow:: ArrowWriter ,
3837 basic:: Encoding ,
@@ -41,7 +40,6 @@ use parquet::{
4140 schema:: types:: ColumnPath ,
4241} ;
4342use rand:: distributions:: DistString ;
44- use regex:: Regex ;
4543use relative_path:: RelativePathBuf ;
4644use tokio:: task:: JoinSet ;
4745use tracing:: { error, info, trace, warn} ;
@@ -69,34 +67,11 @@ use super::{
6967 LogStream , ARROW_FILE_EXTENSION ,
7068} ;
7169
72- /// Regex pattern for parsing arrow file names.
73- ///
74- /// # Format
75- /// The expected format is: `<schema_key>.<front_part>.data.arrows`
76- /// where:
77- /// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value
78- /// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2")
79- /// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition
80- /// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76")
81- ///
82- /// # Limitations
83- /// - Partition keys and values must only contain alphanumeric characters
84- /// - Special characters in partition values will cause the pattern to fail in capturing
85- ///
86- /// # Examples
87- /// Valid: "key1=value1,key2=value2"
88- /// Invalid: "key1=special!value,key2=special#value"
89- static ARROWS_NAME_STRUCTURE : Lazy < Regex > = Lazy :: new ( || {
90- Regex :: new ( r"^[a-zA-Z0-9&=]+\.(?P<front>\S+)\.data\.arrows$" ) . expect ( "Validated regex" )
91- } ) ;
92-
9370/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
9471fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> Option < PathBuf > {
95- let filename = path. file_name ( ) ?. to_str ( ) ?;
96- let front = ARROWS_NAME_STRUCTURE
97- . captures ( filename)
98- . and_then ( |c| c. get ( 1 ) ) ?
99- . as_str ( ) ;
72+ let filename = path. file_stem ( ) ?. to_str ( ) ?;
73+ let ( _, front) = filename. split_once ( '.' ) ?;
74+ assert ! ( filename. contains( '.' ) , "contains the delim `.`" ) ;
10075 let filename_with_random_number = format ! ( "{front}.data.{random_string}.parquet" ) ;
10176 let mut parquet_path = path. to_owned ( ) ;
10277 parquet_path. set_file_name ( filename_with_random_number) ;
@@ -219,10 +194,9 @@ impl Stream {
219194 let paths = dir
220195 . flatten ( )
221196 . map ( |file| file. path ( ) )
222- . filter ( |path| {
223- path. file_name ( )
224- . and_then ( |f| f. to_str ( ) )
225- . is_some_and ( |file_name| ARROWS_NAME_STRUCTURE . is_match ( file_name) )
197+ . filter ( |file| {
198+ file. extension ( )
199+ . is_some_and ( |ext| ext. eq ( ARROW_FILE_EXTENSION ) )
226200 } )
227201 . sorted_by_key ( |f| f. metadata ( ) . unwrap ( ) . modified ( ) . unwrap ( ) )
228202 . collect ( ) ;
0 commit comments