diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index eb4dd6761..60ec06b55 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -58,6 +58,9 @@ use crate::{ mod staging; mod streams; +/// File extension for arrow files in staging +const ARROW_FILE_EXTENSION: &str = "arrows"; + /// Name of a Stream /// NOTE: this used to be a struct, flattened out for simplicity pub type LogStream = String; diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 6df0dc324..b9dae11e6 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; use arrow_schema::Schema; use byteorder::{LittleEndian, ReadBytesExt}; use itertools::kmerge_by; -use tracing::error; +use tracing::{error, warn}; use crate::{ event::DEFAULT_TIMESTAMP_KEY, @@ -82,14 +82,22 @@ pub struct MergedReverseRecordReader { } impl MergedReverseRecordReader { - pub fn try_new(files: &[PathBuf]) -> Self { - let mut readers = Vec::with_capacity(files.len()); - for file in files { - let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else { - error!("Invalid file detected, ignoring it: {:?}", file); + pub fn try_new(file_paths: &[PathBuf]) -> Self { + let mut readers = Vec::with_capacity(file_paths.len()); + for path in file_paths { + let Ok(file) = File::open(path) else { + warn!("Error when trying to read file: {path:?}"); continue; }; + let reader = match get_reverse_reader(file) { + Ok(r) => r, + Err(err) => { + error!("Invalid file detected, ignoring it: {path:?}; error = {err}"); + continue; + } + }; + readers.push(reader); } @@ -255,7 +263,7 @@ pub fn get_reverse_reader( messages.push((header, offset, size)); offset += size; } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break, + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof && !messages.is_empty() => break, Err(err) => return Err(err), } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 57d24a3fb..088ca509d 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -62,15 +62,13 @@ use super::{ writer::Writer, StagingError, }, - LogStream, + LogStream, ARROW_FILE_EXTENSION, }; #[derive(Debug, thiserror::Error)] #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); -const ARROW_FILE_EXTENSION: &str = "data.arrows"; - pub type StreamRef = Arc; /// Gets the unix timestamp for the minute as described by the `SystemTime` @@ -165,7 +163,7 @@ impl Stream { hostname.push_str(id); } let filename = format!( - "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", + "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), @@ -495,10 +493,12 @@ impl Stream { } writer.close()?; - if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 { + if part_file.metadata().expect("File was just created").len() + < parquet::file::FOOTER_SIZE as u64 + { error!( - "Invalid parquet file {:?} detected for stream {}, removing it", - &part_path, &self.stream_name + "Invalid parquet file {part_path:?} detected for stream {}, removing it", + &self.stream_name ); remove_file(part_path).unwrap(); } else { @@ -510,15 +510,22 @@ impl Stream { } for file in arrow_files { - // warn!("file-\n{file:?}\n"); - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - if remove_file(file.clone()).is_err() { + let file_size = match file.metadata() { + Ok(meta) => meta.len(), + Err(err) => { + warn!( + "File ({}) not found; Error = {err}", + file.display() + ); + continue; + } + }; + if remove_file(&file).is_err() { error!("Failed to delete file. Unstable state"); process::abort() } metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, file_type]) + .with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION]) .sub(file_size as i64); } } @@ -883,7 +890,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", + "{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), @@ -917,7 +924,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", + "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 0b5449c0b..0478918cf 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -461,16 +461,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { ) -> Result, ObjectStorageError> { let path = manifest_path(path.as_str()); match self.get_object(&path).await { - Ok(bytes) => Ok(Some( - serde_json::from_slice(&bytes).expect("manifest is valid json"), - )), - Err(err) => { - if matches!(err, ObjectStorageError::NoSuchKey(_)) { - Ok(None) - } else { - Err(err) - } + Ok(bytes) => { + let manifest = serde_json::from_slice(&bytes)?; + Ok(Some(manifest)) } + Err(ObjectStorageError::NoSuchKey(_)) => Ok(None), + Err(err) => Err(err), } }