Skip to content

test: recordbatch+row ordering #1237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 238 additions & 2 deletions src/parseable/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,24 @@ fn find_limit_and_type(

#[cfg(test)]
mod tests {
use std::{io::Cursor, sync::Arc};
use std::{
fs::File,
io::{self, Cursor, Read},
path::Path,
sync::Arc,
};

use arrow_array::{
cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray,
cast::AsArray, types::Int64Type, Array, Float64Array, Int32Array, Int64Array, RecordBatch,
StringArray,
};
use arrow_ipc::writer::{
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
};
use arrow_schema::{DataType, Field, Schema};
use temp_dir::TempDir;

use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader};

use super::get_reverse_reader;

Expand Down Expand Up @@ -442,4 +452,230 @@ mod tests {

assert_eq!(sum, 10000);
}

// Helper function to create test record batches
fn create_test_batches(schema: &Arc<Schema>, count: usize) -> Vec<RecordBatch> {
let mut batches = Vec::with_capacity(count);

for batch_num in 1..=count as i32 {
let id_array = Int32Array::from_iter(batch_num * 10..=batch_num * 10 + 1);
let name_array = StringArray::from(vec![
format!("Name {batch_num}-1"),
format!("Name {batch_num}-2"),
]);

let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(id_array), Arc::new(name_array)],
)
.expect("Failed to create test batch");

batches.push(batch);
}

batches
}

// Helper function to write batches to a file
fn write_test_batches(
path: &Path,
schema: &Arc<Schema>,
batches: &[RecordBatch],
) -> io::Result<()> {
let file = File::create(path)?;
let mut writer =
StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter");

for batch in batches {
writer.write(batch).expect("Failed to write batch");
}

writer.finish().expect("Failed to finalize writer");
Ok(())
}

#[test]
fn test_offset_reader() {
// Create a simple binary file in memory
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let cursor = Cursor::new(data);

// Define offset list: (offset, size)
let offsets = vec![(2, 3), (7, 2)]; // Read bytes 2-4 (3, 4, 5) and then 7-8 (8, 9)

let mut reader = OffsetReader::new(cursor, offsets);
let mut buffer = [0u8; 10];

// First read should get bytes 3, 4, 5
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 3);
assert_eq!(&buffer[..read_bytes], &[3, 4, 5]);

// Second read should get bytes 8, 9
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 2);
assert_eq!(&buffer[..read_bytes], &[8, 9]);

// No more data
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 0);
}

#[test]
fn test_merged_reverse_record_reader() -> io::Result<()> {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join("test.arrow");

// Create a schema
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

// Create test batches (3 batches)
let batches = create_test_batches(&schema, 3);

// Write batches to file
write_test_batches(&file_path, &schema, &batches)?;

// Now read them back in reverse order
let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None);

// We should get batches in reverse order: 3, 2, 1
// But first message should be schema, so we'll still read them in order

// Read batch 3
let batch = reader.next().expect("Failed to read batch");
assert_eq!(batch.num_rows(), 2);
let id_array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(id_array.value(0), 31); // affect of reverse on each recordbatch
assert_eq!(id_array.value(1), 30);

// Read batch 2
let batch = reader.next().expect("Failed to read batch");
assert_eq!(batch.num_rows(), 2);
let id_array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(id_array.value(0), 21);
assert_eq!(id_array.value(1), 20);

// Read batch 1
let batch = reader.next().expect("Failed to read batch");
assert_eq!(batch.num_rows(), 2);
let id_array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(id_array.value(0), 11);
assert_eq!(id_array.value(1), 10);

// No more batches
assert!(reader.next().is_none());

Ok(())
}

#[test]
fn test_empty_offset_list() {
// Test with empty offset list
let data = vec![1, 2, 3, 4, 5];
let cursor = Cursor::new(data);

let mut reader = OffsetReader::new(cursor, vec![]);
let mut buffer = [0u8; 10];

// Should return 0 bytes read
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 0);
}

#[test]
fn test_partial_reads() {
// Test reading with a buffer smaller than the section size
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let cursor = Cursor::new(data);

// One offset of 5 bytes
let offsets = vec![(2, 5)]; // Read bytes 2-6 (3, 4, 5, 6, 7)

let mut reader = OffsetReader::new(cursor, offsets);
let mut buffer = [0u8; 3]; // Buffer smaller than the 5 bytes we want to read

// First read should get first 3 bytes: 3, 4, 5
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 3);
assert_eq!(&buffer[..read_bytes], &[3, 4, 5]);

// Second read should get remaining 2 bytes: 6, 7
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 2);
assert_eq!(&buffer[..read_bytes], &[6, 7]);

// No more data
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 0);
}

#[test]
fn test_get_reverse_reader_single_message() -> io::Result<()> {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join("test_single.arrow");

// Create a schema
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));

// Create a single batch
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![42]))])
.expect("Failed to create batch");

// Write batch to file
write_test_batches(&file_path, &schema, &[batch])?;

let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None);

// Should get the batch
let result_batch = reader.next().expect("Failed to read batch");
let id_array = result_batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(id_array.value(0), 42);

// No more batches
assert!(reader.next().is_none());

Ok(())
}

#[test]
fn test_large_buffer_resizing() {
// Test that buffer resizes correctly for large sections
let data = vec![1; 10000]; // 10KB of data
let cursor = Cursor::new(data);

// One large offset (8KB)
let offsets = vec![(1000, 8000)];

let mut reader = OffsetReader::new(cursor, offsets);
let mut buffer = [0u8; 10000];

// Should read 8KB
let read_bytes = reader.read(&mut buffer).unwrap();
assert_eq!(read_bytes, 8000);

// All bytes should be 1
for i in 0..read_bytes {
assert_eq!(buffer[i], 1);
}
}
}
Loading