diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 5ea1c7d91..c67c60043 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -67,6 +67,18 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +/// Returns the filename for parquet if provided arrows file path is valid as per our expectation +fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { + let filename = path.file_stem()?.to_str()?; + let (_, front) = filename.split_once('.')?; + assert!(front.contains('.'), "contains the delim `.`"); + let filename_with_random_number = format!("{front}.{random_string}.parquet"); + let mut parquet_path = path.to_owned(); + parquet_path.set_file_name(filename_with_random_number); + + Some(parquet_path) +} + #[derive(Debug, thiserror::Error)] #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); @@ -182,7 +194,10 @@ impl Stream { let paths = dir .flatten() .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) + .filter(|file| { + file.extension() + .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) + }) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); @@ -225,12 +240,13 @@ impl Stream { &arrow_file_path, self.stream_name ); remove_file(&arrow_file_path).unwrap(); - } else { - let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string); + } else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) { grouped_arrow_file .entry(key) .or_default() .push(arrow_file_path); + } else { + warn!("Unexpected arrows file: {}", arrow_file_path.display()); } } grouped_arrow_file @@ -289,17 +305,6 @@ impl Stream { } } - fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf { - let filename = path.file_stem().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); - assert!(filename.contains('.'), "contains the delim `.`"); - let filename_with_random_number = format!("{filename}.{random_string}.arrows"); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random_number); - parquet_path.set_extension("parquet"); - parquet_path - } - /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { info!( @@ -831,7 +836,7 @@ impl Streams { #[cfg(test)] mod tests { - use std::{sync::Barrier, thread::spawn, time::Duration}; + use std::{io::Write, sync::Barrier, thread::spawn, time::Duration}; use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; use arrow_schema::{DataType, Field, TimeUnit}; @@ -1207,6 +1212,57 @@ mod tests { assert_eq!(staging.arrow_files().len(), 1); } + fn create_test_file(dir: &TempDir, filename: &str) -> PathBuf { + let file_path = dir.path().join(filename); + let mut file = File::create(&file_path).expect("Failed to create test file"); + // Write some dummy content + file.write_all(b"test content") + .expect("Failed to write to test file"); + file_path + } + + #[test] + fn test_valid_arrow_path_conversion() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows"; + let file_path = create_test_file(&temp_dir, filename); + let random_string = "random123"; + + let result = arrow_path_to_parquet(&file_path, random_string); + + assert!(result.is_some()); + let parquet_path = result.unwrap(); + assert_eq!( + parquet_path.file_name().unwrap().to_str().unwrap(), + "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet" + ); + } + + #[test] + fn test_complex_path() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let nested_dir = temp_dir.path().join("nested/directory/structure"); + std::fs::create_dir_all(&nested_dir).expect("Failed to create nested directories"); + + let filename = "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows"; + let file_path = nested_dir.join(filename); + + let mut file = File::create(&file_path).expect("Failed to create test file"); + file.write_all(b"test content") + .expect("Failed to write to test file"); + + let random_string = "random456"; + + let result = arrow_path_to_parquet(&file_path, random_string); + + assert!(result.is_some()); + let parquet_path = result.unwrap(); + assert_eq!( + parquet_path.file_name().unwrap().to_str().unwrap(), + "date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet" + ); + } + #[test] fn get_or_create_returns_existing_stream() { let streams = Streams::default();