Skip to content

fix: ensure panic safety #1212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ use crate::{
mod staging;
mod streams;

/// File extension for arrow files in staging
const ARROW_FILE_EXTENSION: &str = "arrows";

/// Name of a Stream
/// NOTE: this used to be a struct, flattened out for simplicity
pub type LogStream = String;
Expand Down
22 changes: 15 additions & 7 deletions src/parseable/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader};
use arrow_schema::Schema;
use byteorder::{LittleEndian, ReadBytesExt};
use itertools::kmerge_by;
use tracing::error;
use tracing::{error, warn};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -82,14 +82,22 @@ pub struct MergedReverseRecordReader {
}

impl MergedReverseRecordReader {
pub fn try_new(files: &[PathBuf]) -> Self {
let mut readers = Vec::with_capacity(files.len());
for file in files {
let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else {
error!("Invalid file detected, ignoring it: {:?}", file);
pub fn try_new(file_paths: &[PathBuf]) -> Self {
let mut readers = Vec::with_capacity(file_paths.len());
for path in file_paths {
let Ok(file) = File::open(path) else {
warn!("Error when trying to read file: {path:?}");
continue;
};

let reader = match get_reverse_reader(file) {
Ok(r) => r,
Err(err) => {
error!("Invalid file detected, ignoring it: {path:?}; error = {err}");
continue;
}
};

readers.push(reader);
}

Expand Down Expand Up @@ -255,7 +263,7 @@ pub fn get_reverse_reader<T: Read + Seek>(
messages.push((header, offset, size));
offset += size;
}
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof && !messages.is_empty() => break,
Err(err) => return Err(err),
}
}
Expand Down
35 changes: 21 additions & 14 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ use super::{
writer::Writer,
StagingError,
},
LogStream,
LogStream, ARROW_FILE_EXTENSION,
};

#[derive(Debug, thiserror::Error)]
#[error("Stream not found: {0}")]
pub struct StreamNotFound(pub String);

const ARROW_FILE_EXTENSION: &str = "data.arrows";

pub type StreamRef = Arc<Stream>;

/// Gets the unix timestamp for the minute as described by the `SystemTime`
Expand Down Expand Up @@ -165,7 +163,7 @@ impl Stream {
hostname.push_str(id);
}
let filename = format!(
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -495,10 +493,12 @@ impl Stream {
}
writer.close()?;

if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
if part_file.metadata().expect("File was just created").len()
< parquet::file::FOOTER_SIZE as u64
{
error!(
"Invalid parquet file {:?} detected for stream {}, removing it",
&part_path, &self.stream_name
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
&self.stream_name
);
remove_file(part_path).unwrap();
} else {
Expand All @@ -510,15 +510,22 @@ impl Stream {
}

for file in arrow_files {
// warn!("file-\n{file:?}\n");
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();
if remove_file(file.clone()).is_err() {
let file_size = match file.metadata() {
Ok(meta) => meta.len(),
Err(err) => {
warn!(
"File ({}) not found; Error = {err}",
file.display()
);
continue;
}
};
if remove_file(&file).is_err() {
error!("Failed to delete file. Unstable state");
process::abort()
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", &self.stream_name, file_type])
.with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION])
.sub(file_size as i64);
}
}
Expand Down Expand Up @@ -883,7 +890,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -917,7 +924,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down
14 changes: 5 additions & 9 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,16 +461,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
) -> Result<Option<Manifest>, ObjectStorageError> {
let path = manifest_path(path.as_str());
match self.get_object(&path).await {
Ok(bytes) => Ok(Some(
serde_json::from_slice(&bytes).expect("manifest is valid json"),
)),
Err(err) => {
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
Ok(None)
} else {
Err(err)
}
Ok(bytes) => {
let manifest = serde_json::from_slice(&bytes)?;
Ok(Some(manifest))
}
Err(ObjectStorageError::NoSuchKey(_)) => Ok(None),
Err(err) => Err(err),
}
}

Expand Down
Loading