diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index b9dae11e6..5c9b6b5be 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -321,14 +321,24 @@ fn find_limit_and_type( #[cfg(test)] mod tests { - use std::{io::Cursor, sync::Arc}; + use std::{ + fs::File, + io::{self, Cursor, Read}, + path::Path, + sync::Arc, + }; use arrow_array::{ - cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray, + cast::AsArray, types::Int64Type, Array, Float64Array, Int32Array, Int64Array, RecordBatch, + StringArray, }; use arrow_ipc::writer::{ write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, }; + use arrow_schema::{DataType, Field, Schema}; + use temp_dir::TempDir; + + use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader}; use super::get_reverse_reader; @@ -442,4 +452,230 @@ mod tests { assert_eq!(sum, 10000); } + + // Helper function to create test record batches + fn create_test_batches(schema: &Arc, count: usize) -> Vec { + let mut batches = Vec::with_capacity(count); + + for batch_num in 1..=count as i32 { + let id_array = Int32Array::from_iter(batch_num * 10..=batch_num * 10 + 1); + let name_array = StringArray::from(vec![ + format!("Name {batch_num}-1"), + format!("Name {batch_num}-2"), + ]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_array), Arc::new(name_array)], + ) + .expect("Failed to create test batch"); + + batches.push(batch); + } + + batches + } + + // Helper function to write batches to a file + fn write_test_batches( + path: &Path, + schema: &Arc, + batches: &[RecordBatch], + ) -> io::Result<()> { + let file = File::create(path)?; + let mut writer = + StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter"); + + for batch in batches { + writer.write(batch).expect("Failed to write batch"); + } + + writer.finish().expect("Failed to finalize writer"); + Ok(()) + } + + #[test] + fn test_offset_reader() { + // Create a simple binary file in memory + let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let cursor = Cursor::new(data); + + // Define offset list: (offset, size) + let offsets = vec![(2, 3), (7, 2)]; // Read bytes 2-4 (3, 4, 5) and then 7-8 (8, 9) + + let mut reader = OffsetReader::new(cursor, offsets); + let mut buffer = [0u8; 10]; + + // First read should get bytes 3, 4, 5 + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 3); + assert_eq!(&buffer[..read_bytes], &[3, 4, 5]); + + // Second read should get bytes 8, 9 + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 2); + assert_eq!(&buffer[..read_bytes], &[8, 9]); + + // No more data + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 0); + } + + #[test] + fn test_merged_reverse_record_reader() -> io::Result<()> { + let dir = TempDir::new().unwrap(); + let file_path = dir.path().join("test.arrow"); + + // Create a schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + // Create test batches (3 batches) + let batches = create_test_batches(&schema, 3); + + // Write batches to file + write_test_batches(&file_path, &schema, &batches)?; + + // Now read them back in reverse order + let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None); + + // We should get batches in reverse order: 3, 2, 1 + // But first message should be schema, so we'll still read them in order + + // Read batch 3 + let batch = reader.next().expect("Failed to read batch"); + assert_eq!(batch.num_rows(), 2); + let id_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 31); // affect of reverse on each recordbatch + assert_eq!(id_array.value(1), 30); + + // Read batch 2 + let batch = reader.next().expect("Failed to read batch"); + assert_eq!(batch.num_rows(), 2); + let id_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 21); + assert_eq!(id_array.value(1), 20); + + // Read batch 1 + let batch = reader.next().expect("Failed to read batch"); + assert_eq!(batch.num_rows(), 2); + let id_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 11); + assert_eq!(id_array.value(1), 10); + + // No more batches + assert!(reader.next().is_none()); + + Ok(()) + } + + #[test] + fn test_empty_offset_list() { + // Test with empty offset list + let data = vec![1, 2, 3, 4, 5]; + let cursor = Cursor::new(data); + + let mut reader = OffsetReader::new(cursor, vec![]); + let mut buffer = [0u8; 10]; + + // Should return 0 bytes read + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 0); + } + + #[test] + fn test_partial_reads() { + // Test reading with a buffer smaller than the section size + let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let cursor = Cursor::new(data); + + // One offset of 5 bytes + let offsets = vec![(2, 5)]; // Read bytes 2-6 (3, 4, 5, 6, 7) + + let mut reader = OffsetReader::new(cursor, offsets); + let mut buffer = [0u8; 3]; // Buffer smaller than the 5 bytes we want to read + + // First read should get first 3 bytes: 3, 4, 5 + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 3); + assert_eq!(&buffer[..read_bytes], &[3, 4, 5]); + + // Second read should get remaining 2 bytes: 6, 7 + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 2); + assert_eq!(&buffer[..read_bytes], &[6, 7]); + + // No more data + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 0); + } + + #[test] + fn test_get_reverse_reader_single_message() -> io::Result<()> { + let dir = TempDir::new().unwrap(); + let file_path = dir.path().join("test_single.arrow"); + + // Create a schema + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + // Create a single batch + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![42]))]) + .expect("Failed to create batch"); + + // Write batch to file + write_test_batches(&file_path, &schema, &[batch])?; + + let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None); + + // Should get the batch + let result_batch = reader.next().expect("Failed to read batch"); + let id_array = result_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 42); + + // No more batches + assert!(reader.next().is_none()); + + Ok(()) + } + + #[test] + fn test_large_buffer_resizing() { + // Test that buffer resizes correctly for large sections + let data = vec![1; 10000]; // 10KB of data + let cursor = Cursor::new(data); + + // One large offset (8KB) + let offsets = vec![(1000, 8000)]; + + let mut reader = OffsetReader::new(cursor, offsets); + let mut buffer = [0u8; 10000]; + + // Should read 8KB + let read_bytes = reader.read(&mut buffer).unwrap(); + assert_eq!(read_bytes, 8000); + + // All bytes should be 1 + for i in 0..read_bytes { + assert_eq!(buffer[i], 1); + } + } }