Skip to content

Commit fd62135

Browse files
author
Devdutt Shenoi
committed
test: use DiskWriter
1 parent 2a1ac09 commit fd62135

File tree

3 files changed

+17
-11
lines changed

3 files changed

+17
-11
lines changed

src/parseable/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ mod streams;
6161
/// File extension for arrow files in staging
6262
const ARROW_FILE_EXTENSION: &str = "arrows";
6363

64+
/// File extension for incomplete arrow files
65+
const PART_FILE_EXTENSION: &str = "part";
66+
6467
/// Name of a Stream
6568
/// NOTE: this used to be a struct, flattened out for simplicity
6669
pub type LogStream = String;

src/parseable/staging/reader.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,6 @@ fn find_limit_and_type(
322322
#[cfg(test)]
323323
mod tests {
324324
use std::{
325-
fs::File,
326325
io::{self, Cursor, Read},
327326
path::Path,
328327
sync::Arc,
@@ -338,7 +337,10 @@ mod tests {
338337
use arrow_schema::{DataType, Field, Schema};
339338
use temp_dir::TempDir;
340339

341-
use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader};
340+
use crate::parseable::staging::{
341+
reader::{MergedReverseRecordReader, OffsetReader},
342+
writer::DiskWriter,
343+
};
342344

343345
use super::get_reverse_reader;
344346

@@ -482,15 +484,12 @@ mod tests {
482484
schema: &Arc<Schema>,
483485
batches: &[RecordBatch],
484486
) -> io::Result<()> {
485-
let file = File::create(path)?;
486-
let mut writer =
487-
StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter");
487+
let mut writer = DiskWriter::try_new(path, schema).expect("Failed to create StreamWriter");
488488

489489
for batch in batches {
490490
writer.write(batch).expect("Failed to write batch");
491491
}
492492

493-
writer.finish().expect("Failed to finalize writer");
494493
Ok(())
495494
}
496495

@@ -524,7 +523,7 @@ mod tests {
524523
#[test]
525524
fn test_merged_reverse_record_reader() -> io::Result<()> {
526525
let dir = TempDir::new().unwrap();
527-
let file_path = dir.path().join("test.arrow");
526+
let file_path = dir.path().join("test.data.arrows");
528527

529528
// Create a schema
530529
let schema = Arc::new(Schema::new(vec![
@@ -627,7 +626,7 @@ mod tests {
627626
#[test]
628627
fn test_get_reverse_reader_single_message() -> io::Result<()> {
629628
let dir = TempDir::new().unwrap();
630-
let file_path = dir.path().join("test_single.arrow");
629+
let file_path = dir.path().join("test_single.data.arrows");
631630

632631
// Create a schema
633632
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));

src/parseable/staging/writer.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use arrow_select::concat::concat_batches;
3232
use itertools::Itertools;
3333
use tracing::error;
3434

35-
use crate::{parseable::ARROW_FILE_EXTENSION, utils::arrow::adapt_batch};
35+
use crate::{
36+
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
37+
utils::arrow::adapt_batch,
38+
};
3639

3740
use super::StagingError;
3841

@@ -50,8 +53,9 @@ pub struct DiskWriter {
5053
impl DiskWriter {
5154
/// Try to create a file to stream arrows into
5255
pub fn try_new(path: impl Into<PathBuf>, schema: &Schema) -> Result<Self, StagingError> {
53-
let path = path.into();
54-
let file = OpenOptions::new().create(true).append(true).open(&path)?;
56+
let mut path = path.into();
57+
path.set_extension(PART_FILE_EXTENSION);
58+
let file = OpenOptions::new().write(true).create(true).open(&path)?;
5559
let inner = StreamWriter::try_new_buffered(file, schema)?;
5660

5761
Ok(Self { inner, path })

0 commit comments

Comments
 (0)