Skip to content

test: arrow_path_to_parquet #1239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 24, 2025
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 110 additions & 15 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ use super::{
LogStream, ARROW_FILE_EXTENSION,
};

/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option<PathBuf> {
let filename = path.file_stem()?.to_str()?;
let (_, front) = filename.split_once('.')?;
assert!(filename.contains('.'), "contains the delim `.`");
let filename_with_random_number = format!("{front}.data.{random_string}.parquet");
let mut parquet_path = path.to_owned();
parquet_path.set_file_name(filename_with_random_number);

Some(parquet_path)
}

#[derive(Debug, thiserror::Error)]
#[error("Stream not found: {0}")]
pub struct StreamNotFound(pub String);
Expand Down Expand Up @@ -182,7 +194,10 @@ impl Stream {
let paths = dir
.flatten()
.map(|file| file.path())
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
.filter(|file| {
file.extension()
.is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION))
})
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
.collect();

Expand Down Expand Up @@ -225,12 +240,13 @@ impl Stream {
&arrow_file_path, self.stream_name
);
remove_file(&arrow_file_path).unwrap();
} else {
let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string);
} else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) {
grouped_arrow_file
.entry(key)
.or_default()
.push(arrow_file_path);
} else {
warn!("Unexpected arrows file: {}", arrow_file_path.display());
}
}
grouped_arrow_file
Expand Down Expand Up @@ -289,17 +305,6 @@ impl Stream {
}
}

fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf {
let filename = path.file_stem().unwrap().to_str().unwrap();
let (_, filename) = filename.split_once('.').unwrap();
assert!(filename.contains('.'), "contains the delim `.`");
let filename_with_random_number = format!("{filename}.{random_string}.arrows");
let mut parquet_path = path.to_owned();
parquet_path.set_file_name(filename_with_random_number);
parquet_path.set_extension("parquet");
parquet_path
}

/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
info!(
Expand Down Expand Up @@ -812,7 +817,7 @@ impl Streams {

#[cfg(test)]
mod tests {
use std::time::Duration;
use std::{io::Write, time::Duration};

use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray};
use arrow_schema::{DataType, Field, TimeUnit};
Expand Down Expand Up @@ -1187,4 +1192,94 @@ mod tests {
assert_eq!(staging.parquet_files().len(), 2);
assert_eq!(staging.arrow_files().len(), 1);
}

fn create_test_file(dir: &TempDir, filename: &str) -> PathBuf {
let file_path = dir.path().join(filename);
let mut file = File::create(&file_path).expect("Failed to create test file");
// Write some dummy content
file.write_all(b"test content")
.expect("Failed to write to test file");
file_path
}

#[test]
fn test_valid_arrow_path_conversion() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows";
let file_path = create_test_file(&temp_dir, filename);
let random_string = "random123";

let result = arrow_path_to_parquet(&file_path, random_string);

assert!(result.is_some());
let parquet_path = result.unwrap();
assert_eq!(
parquet_path.file_name().unwrap().to_str().unwrap(),
"date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet"
);
}
Comment on lines +1224 to +1239
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Test doesn't check filename structure expectation properly

The expected output filename in test_valid_arrow_path_conversion is inconsistent with the current implementation of arrow_path_to_parquet. The test assumes a format that includes the "data" component which would be created by format!("{front}.data.{random_string}.parquet"), but the implementation uses format!("{front}.{random_string}.parquet").

Either update the test expectation or change the implementation to match. If you change the implementation, be sure to update line 75 as suggested above.


#[test]
fn test_invalid_arrow_path() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
// Missing the ".data.arrows" suffix
let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30";
let file_path = create_test_file(&temp_dir, filename);
let random_string = "random123";

let result = arrow_path_to_parquet(&file_path, random_string);

assert!(result.is_none());
}

#[test]
fn test_invalid_schema_key() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
// Invalid schema key with special characters
let filename = "12345abcde&key1=value1!.date=2020-01-21.hour=10.minute=30.data.arrows";
let file_path = create_test_file(&temp_dir, filename);
let random_string = "random123";

let result = arrow_path_to_parquet(&file_path, random_string);

assert!(result.is_none());
}

#[test]
fn test_complex_path() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let nested_dir = temp_dir.path().join("nested/directory/structure");
std::fs::create_dir_all(&nested_dir).expect("Failed to create nested directories");

let filename = "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows";
let file_path = nested_dir.join(filename);

let mut file = File::create(&file_path).expect("Failed to create test file");
file.write_all(b"test content")
.expect("Failed to write to test file");

let random_string = "random456";

let result = arrow_path_to_parquet(&file_path, random_string);

assert!(result.is_some());
let parquet_path = result.unwrap();
assert_eq!(
parquet_path.file_name().unwrap().to_str().unwrap(),
"date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet"
);
}

#[test]
fn test_empty_front_part() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
// Valid but with empty front part
let filename = "schema_key..data.arrows";
let file_path = create_test_file(&temp_dir, filename);
let random_string = "random789";

let result = arrow_path_to_parquet(&file_path, random_string);

assert!(result.is_none());
}
}