Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ use crate::{
mod staging;
mod streams;

/// File extension for arrow files in staging
const ARROW_FILE_EXTENSION: &str = "arrows";

/// Name of a Stream
/// NOTE: this used to be a struct, flattened out for simplicity
pub type LogStream = String;
Expand Down
35 changes: 21 additions & 14 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ use super::{
writer::Writer,
StagingError,
},
LogStream,
LogStream, ARROW_FILE_EXTENSION,
};

#[derive(Debug, thiserror::Error)]
#[error("Stream not found: {0}")]
pub struct StreamNotFound(pub String);

const ARROW_FILE_EXTENSION: &str = "data.arrows";

pub type StreamRef = Arc<Stream>;

/// All state associated with a single logstream in Parseable.
Expand Down Expand Up @@ -156,7 +154,7 @@ impl Stream {
hostname.push_str(id);
}
let filename = format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
parsed_timestamp.date(),
parsed_timestamp.hour(),
Expand Down Expand Up @@ -486,10 +484,12 @@ impl Stream {
}
writer.close()?;

if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
if part_file.metadata().expect("File was just created").len()
< parquet::file::FOOTER_SIZE as u64
{
error!(
"Invalid parquet file {:?} detected for stream {}, removing it",
&part_path, &self.stream_name
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
&self.stream_name
);
remove_file(part_path).unwrap();
} else {
Expand All @@ -501,15 +501,22 @@ impl Stream {
}

for file in arrow_files {
// warn!("file-\n{file:?}\n");
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();
if remove_file(file.clone()).is_err() {
let file_size = match file.metadata() {
Ok(meta) => meta.len(),
Err(err) => {
warn!(
"File ({}) not found; Error = {err}",
file.display()
);
continue;
}
};
if remove_file(&file).is_err() {
error!("Failed to delete file. Unstable state");
process::abort()
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", &self.stream_name, file_type])
.with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION])
.sub(file_size as i64);
}
}
Expand Down Expand Up @@ -874,7 +881,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
parsed_timestamp.date(),
parsed_timestamp.hour(),
Expand Down Expand Up @@ -909,7 +916,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
parsed_timestamp.date(),
parsed_timestamp.hour(),
Expand Down
Loading