From 41b3688f59822a46542dc6b562a3059f039bbfc0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 18:55:08 +0530 Subject: [PATCH 1/8] refactor: use regex to extract info from arrow file name --- src/parseable/streams.rs | 59 ++++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b21f1ee20..7eb93a8aa 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -33,6 +33,7 @@ use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; +use once_cell::sync::Lazy; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -41,6 +42,7 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; +use regex::Regex; use relative_path::RelativePathBuf; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; @@ -68,6 +70,41 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +/// Regex pattern for parsing arrow file names. +/// +/// # Format +/// The expected format is: `..data.arrows` +/// where: +/// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value +/// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2") +/// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition +/// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76") +/// +/// # Limitations +/// - Partition keys and values must only contain alphanumeric characters +/// - Special characters in partition values will cause the pattern to fail in capturing +/// +/// # Examples +/// Valid: "key1=value1,key2=value2" +/// Invalid: "key1=special!value,key2=special#value" +static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { + Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.data\.arrows$").expect("Validated regex") +}); + +/// Returns the filename for parquet if provided arrows file path is valid as per our expectation +fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { + let filename = path.file_name().unwrap().to_str().unwrap(); + let filename = ARROWS_NAME_STRUCTURE + .captures(filename) + .and_then(|c| c.get(1))? + .as_str(); + let filename_with_random_number = format!("{filename}.data.{random_string}.parquet"); + let mut parquet_path = path.to_owned(); + parquet_path.set_file_name(filename_with_random_number); + + Some(parquet_path) +} + #[derive(Debug, thiserror::Error)] #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); @@ -187,7 +224,11 @@ impl Stream { let paths = dir .flatten() .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) + .filter(|path| { + path.file_name() + .and_then(|f| f.to_str()) + .is_some_and(|file_name| ARROWS_NAME_STRUCTURE.is_match(file_name)) + }) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); @@ -230,12 +271,13 @@ impl Stream { &arrow_file_path, self.stream_name ); remove_file(&arrow_file_path).unwrap(); - } else { - let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string); + } else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) { grouped_arrow_file .entry(key) .or_default() .push(arrow_file_path); + } else { + warn!("Unexpected arrows file: {}", arrow_file_path.display()); } } grouped_arrow_file @@ -294,17 +336,6 @@ impl Stream { } } - fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf { - let filename = path.file_stem().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); - assert!(filename.contains('.'), "contains the delim `.`"); - let filename_with_random_number = format!("{filename}.{random_string}.arrows"); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random_number); - parquet_path.set_extension("parquet"); - parquet_path - } - /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { info!( From 2995cacecc837dd45dedcbca6b14aedb873fa2ef Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 18:59:45 +0530 Subject: [PATCH 2/8] test: `arrow_path_to_parquet` --- src/parseable/streams.rs | 64 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7eb93a8aa..0c6059d32 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -1230,4 +1230,68 @@ mod tests { assert_eq!(staging.parquet_files().len(), 2); assert_eq!(staging.arrow_files().len(), 1); } + + #[test] + fn test_valid_arrow_path_conversion() { + let path = Path::new("/tmp/12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows"); + let random_string = "random123"; + + let result = arrow_path_to_parquet(path, random_string); + + assert!(result.is_some()); + let parquet_path = result.unwrap(); + assert_eq!( + parquet_path.to_str().unwrap(), + "/tmp/date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet" + ); + } + + #[test] + fn test_invalid_arrow_path() { + // Missing the ".data.arrows" suffix + let path = Path::new("/tmp/12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30"); + let random_string = "random123"; + + let result = arrow_path_to_parquet(path, random_string); + + assert!(result.is_none()); + } + + #[test] + fn test_invalid_schema_key() { + // Invalid schema key with special characters + let path = + Path::new("/tmp/12345abcde&key1=value1!.date=2020-01-21.hour=10.minute=30.data.arrows"); + let random_string = "random123"; + + let result = arrow_path_to_parquet(path, random_string); + + assert!(result.is_none()); + } + + #[test] + fn test_complex_path() { + let path = Path::new("/nested/directory/structure/20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows"); + let random_string = "random456"; + + let result = arrow_path_to_parquet(path, random_string); + + assert!(result.is_some()); + let parquet_path = result.unwrap(); + assert_eq!( + parquet_path.to_str().unwrap(), + "/nested/directory/structure/date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet" + ); + } + + #[test] + fn test_empty_front_part() { + // Valid but with empty front part + let path = Path::new("/tmp/schema_key..data.arrows"); + let random_string = "random789"; + + let result = arrow_path_to_parquet(path, random_string); + + assert!(result.is_none()); + } } From c3f133ec94d7aed72b8783fa3faaed7ba1cbaddf Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 19:06:14 +0530 Subject: [PATCH 3/8] doc: code readability improvements --- src/parseable/streams.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 0c6059d32..a43851d6f 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -93,12 +93,12 @@ static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { /// Returns the filename for parquet if provided arrows file path is valid as per our expectation fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { - let filename = path.file_name().unwrap().to_str().unwrap(); - let filename = ARROWS_NAME_STRUCTURE + let filename = path.file_name()?.to_str()?; + let front = ARROWS_NAME_STRUCTURE .captures(filename) .and_then(|c| c.get(1))? .as_str(); - let filename_with_random_number = format!("{filename}.data.{random_string}.parquet"); + let filename_with_random_number = format!("{front}.data.{random_string}.parquet"); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); From 382a8d883f1f198d350a1d023d01714e9e4ae2bb Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 19:11:47 +0530 Subject: [PATCH 4/8] use `temp_dir` in tests --- src/parseable/streams.rs | 60 ++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index a43851d6f..b49a2a187 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -855,7 +855,7 @@ impl Streams { #[cfg(test)] mod tests { - use std::time::Duration; + use std::{io::Write, time::Duration}; use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; use arrow_schema::{DataType, Field, TimeUnit}; @@ -1231,66 +1231,92 @@ mod tests { assert_eq!(staging.arrow_files().len(), 1); } + fn create_test_file(dir: &TempDir, filename: &str) -> PathBuf { + let file_path = dir.path().join(filename); + let mut file = File::create(&file_path).expect("Failed to create test file"); + // Write some dummy content + file.write_all(b"test content") + .expect("Failed to write to test file"); + file_path + } + #[test] fn test_valid_arrow_path_conversion() { - let path = Path::new("/tmp/12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows"); + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows"; + let file_path = create_test_file(&temp_dir, filename); let random_string = "random123"; - let result = arrow_path_to_parquet(path, random_string); + let result = arrow_path_to_parquet(&file_path, random_string); assert!(result.is_some()); let parquet_path = result.unwrap(); assert_eq!( - parquet_path.to_str().unwrap(), - "/tmp/date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet" - ); + parquet_path.file_name().unwrap().to_str().unwrap(), + "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet" + ); } #[test] fn test_invalid_arrow_path() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); // Missing the ".data.arrows" suffix - let path = Path::new("/tmp/12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30"); + let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30"; + let file_path = create_test_file(&temp_dir, filename); let random_string = "random123"; - let result = arrow_path_to_parquet(path, random_string); + let result = arrow_path_to_parquet(&file_path, random_string); assert!(result.is_none()); } #[test] fn test_invalid_schema_key() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); // Invalid schema key with special characters - let path = - Path::new("/tmp/12345abcde&key1=value1!.date=2020-01-21.hour=10.minute=30.data.arrows"); + let filename = "12345abcde&key1=value1!.date=2020-01-21.hour=10.minute=30.data.arrows"; + let file_path = create_test_file(&temp_dir, filename); let random_string = "random123"; - let result = arrow_path_to_parquet(path, random_string); + let result = arrow_path_to_parquet(&file_path, random_string); assert!(result.is_none()); } #[test] fn test_complex_path() { - let path = Path::new("/nested/directory/structure/20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows"); + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let nested_dir = temp_dir.path().join("nested/directory/structure"); + std::fs::create_dir_all(&nested_dir).expect("Failed to create nested directories"); + + let filename = "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows"; + let file_path = nested_dir.join(filename); + + let mut file = File::create(&file_path).expect("Failed to create test file"); + file.write_all(b"test content") + .expect("Failed to write to test file"); + let random_string = "random456"; - let result = arrow_path_to_parquet(path, random_string); + let result = arrow_path_to_parquet(&file_path, random_string); assert!(result.is_some()); let parquet_path = result.unwrap(); assert_eq!( - parquet_path.to_str().unwrap(), - "/nested/directory/structure/date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet" + parquet_path.file_name().unwrap().to_str().unwrap(), + "date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet" ); } #[test] fn test_empty_front_part() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); // Valid but with empty front part - let path = Path::new("/tmp/schema_key..data.arrows"); + let filename = "schema_key..data.arrows"; + let file_path = create_test_file(&temp_dir, filename); let random_string = "random789"; - let result = arrow_path_to_parquet(path, random_string); + let result = arrow_path_to_parquet(&file_path, random_string); assert!(result.is_none()); } From 684b8b110dda25ea60bb25da9bcc0a0cb18570fc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Mar 2025 16:38:33 +0530 Subject: [PATCH 5/8] refactor: rm regex --- src/parseable/streams.rs | 38 ++++++-------------------------------- 1 file changed, 6 insertions(+), 32 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 6dba66013..09e0926c5 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -32,7 +32,6 @@ use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; -use once_cell::sync::Lazy; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -41,7 +40,6 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; -use regex::Regex; use relative_path::RelativePathBuf; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; @@ -69,34 +67,11 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; -/// Regex pattern for parsing arrow file names. -/// -/// # Format -/// The expected format is: `..data.arrows` -/// where: -/// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value -/// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2") -/// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition -/// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76") -/// -/// # Limitations -/// - Partition keys and values must only contain alphanumeric characters -/// - Special characters in partition values will cause the pattern to fail in capturing -/// -/// # Examples -/// Valid: "key1=value1,key2=value2" -/// Invalid: "key1=special!value,key2=special#value" -static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { - Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.data\.arrows$").expect("Validated regex") -}); - /// Returns the filename for parquet if provided arrows file path is valid as per our expectation fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { - let filename = path.file_name()?.to_str()?; - let front = ARROWS_NAME_STRUCTURE - .captures(filename) - .and_then(|c| c.get(1))? - .as_str(); + let filename = path.file_stem()?.to_str()?; + let (_, front) = filename.split_once('.')?; + assert!(filename.contains('.'), "contains the delim `.`"); let filename_with_random_number = format!("{front}.data.{random_string}.parquet"); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); @@ -219,10 +194,9 @@ impl Stream { let paths = dir .flatten() .map(|file| file.path()) - .filter(|path| { - path.file_name() - .and_then(|f| f.to_str()) - .is_some_and(|file_name| ARROWS_NAME_STRUCTURE.is_match(file_name)) + .filter(|file| { + file.extension() + .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) }) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); From 5c4d580e7534b16bc7f3451465727965fa6898cb Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Mar 2025 16:44:00 +0530 Subject: [PATCH 6/8] fix: check front, not filename --- src/parseable/streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 09e0926c5..371382359 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -71,7 +71,7 @@ use super::{ fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { let filename = path.file_stem()?.to_str()?; let (_, front) = filename.split_once('.')?; - assert!(filename.contains('.'), "contains the delim `.`"); + assert!(front.contains('.'), "contains the delim `.`"); let filename_with_random_number = format!("{front}.data.{random_string}.parquet"); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); From 9a5df0cd38666046752f83829e7f2863d16ead44 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Mar 2025 21:54:28 +0530 Subject: [PATCH 7/8] fix: parquet name --- src/parseable/streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index e7c694837..42b5b1235 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -72,7 +72,7 @@ fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { let filename = path.file_stem()?.to_str()?; let (_, front) = filename.split_once('.')?; assert!(front.contains('.'), "contains the delim `.`"); - let filename_with_random_number = format!("{front}.data.{random_string}.parquet"); + let filename_with_random_number = format!("{front}.{random_string}.parquet"); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); From d97723e02f6225822a3018e2e1053e8fa4be5e54 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Mar 2025 23:32:15 +0530 Subject: [PATCH 8/8] test: rm invalidated test --- src/parseable/streams.rs | 39 --------------------------------------- 1 file changed, 39 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 42b5b1235..c67c60043 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -1238,32 +1238,6 @@ mod tests { ); } - #[test] - fn test_invalid_arrow_path() { - let temp_dir = TempDir::new().expect("Failed to create temp dir"); - // Missing the ".data.arrows" suffix - let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30"; - let file_path = create_test_file(&temp_dir, filename); - let random_string = "random123"; - - let result = arrow_path_to_parquet(&file_path, random_string); - - assert!(result.is_none()); - } - - #[test] - fn test_invalid_schema_key() { - let temp_dir = TempDir::new().expect("Failed to create temp dir"); - // Invalid schema key with special characters - let filename = "12345abcde&key1=value1!.date=2020-01-21.hour=10.minute=30.data.arrows"; - let file_path = create_test_file(&temp_dir, filename); - let random_string = "random123"; - - let result = arrow_path_to_parquet(&file_path, random_string); - - assert!(result.is_none()); - } - #[test] fn test_complex_path() { let temp_dir = TempDir::new().expect("Failed to create temp dir"); @@ -1289,19 +1263,6 @@ mod tests { ); } - #[test] - fn test_empty_front_part() { - let temp_dir = TempDir::new().expect("Failed to create temp dir"); - // Valid but with empty front part - let filename = "schema_key..data.arrows"; - let file_path = create_test_file(&temp_dir, filename); - let random_string = "random789"; - - let result = arrow_path_to_parquet(&file_path, random_string); - - assert!(result.is_none()); - } - #[test] fn get_or_create_returns_existing_stream() { let streams = Streams::default();