Skip to content

Commit c4b67e9

Browse files
author
Devdutt Shenoi
committed
fix: ensure panic safety
1 parent 649c000 commit c4b67e9

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

src/parseable/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ use crate::{
5858
mod staging;
5959
mod streams;
6060

61+
/// File extension for arrow files in staging
62+
const ARROW_FILE_EXTENSION: &str = "data.arrows";
63+
6164
/// Name of a Stream
6265
/// NOTE: this used to be a struct, flattened out for simplicity
6366
pub type LogStream = String;

src/parseable/streams.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,13 @@ use super::{
6161
writer::Writer,
6262
StagingError,
6363
},
64-
LogStream,
64+
LogStream, ARROW_FILE_EXTENSION,
6565
};
6666

6767
#[derive(Debug, thiserror::Error)]
6868
#[error("Stream not found: {0}")]
6969
pub struct StreamNotFound(pub String);
7070

71-
const ARROW_FILE_EXTENSION: &str = "data.arrows";
72-
7371
pub type StreamRef = Arc<Stream>;
7472

7573
/// All state associated with a single logstream in Parseable.
@@ -486,10 +484,12 @@ impl Stream {
486484
}
487485
writer.close()?;
488486

489-
if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
487+
if part_file.metadata().expect("File was just created").len()
488+
< parquet::file::FOOTER_SIZE as u64
489+
{
490490
error!(
491-
"Invalid parquet file {:?} detected for stream {}, removing it",
492-
&part_path, &self.stream_name
491+
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
492+
&self.stream_name
493493
);
494494
remove_file(part_path).unwrap();
495495
} else {
@@ -501,15 +501,22 @@ impl Stream {
501501
}
502502

503503
for file in arrow_files {
504-
// warn!("file-\n{file:?}\n");
505-
let file_size = file.metadata().unwrap().len();
506-
let file_type = file.extension().unwrap().to_str().unwrap();
507-
if remove_file(file.clone()).is_err() {
504+
let file_size = match file.metadata() {
505+
Ok(meta) => meta.len(),
506+
Err(err) => {
507+
warn!(
508+
"Looks like the file ({}) was removed; Error = {err}",
509+
file.display()
510+
);
511+
continue;
512+
}
513+
};
514+
if remove_file(&file).is_err() {
508515
error!("Failed to delete file. Unstable state");
509516
process::abort()
510517
}
511518
metrics::STORAGE_SIZE
512-
.with_label_values(&["staging", &self.stream_name, file_type])
519+
.with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION])
513520
.sub(file_size as i64);
514521
}
515522
}

0 commit comments

Comments
 (0)