diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index d5c9dc184e26..1a1fc2f066ea 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -55,8 +55,10 @@ zstd = { version = "0.13", default-features = false, optional = true } bzip2 = { version = "0.6.0", optional = true } xz = { version = "0.1", default-features = false, optional = true } crc = { version = "3.0", optional = true } -uuid = "1.17" strum_macros = "0.27" +uuid = "1.17" +indexmap = "2.10" + [dev-dependencies] arrow-data = { workspace = true } diff --git a/arrow-avro/benches/decoder.rs b/arrow-avro/benches/decoder.rs index 452f44e09e2c..df802daea154 100644 --- a/arrow-avro/benches/decoder.rs +++ b/arrow-avro/benches/decoder.rs @@ -27,58 +27,78 @@ extern crate uuid; use apache_avro::types::Value; use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema}; -use arrow_avro::{reader::ReaderBuilder, schema::Schema as AvroSchema}; +use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC}; +use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema}; use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use once_cell::sync::Lazy; -use std::{hint::black_box, io, time::Duration}; +use std::{hint::black_box, time::Duration}; use uuid::Uuid; -fn encode_records(schema: &ApacheSchema, rows: impl Iterator) -> Vec { +fn make_prefix(fp: Fingerprint) -> [u8; 10] { + let Fingerprint::Rabin(val) = fp; + let mut buf = [0u8; 10]; + buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01 + buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit + buf +} + +fn encode_records_with_prefix( + schema: &ApacheSchema, + prefix: &[u8], + rows: impl Iterator, +) -> Vec { let mut out = Vec::new(); for v in rows { + out.extend_from_slice(prefix); out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum failed")); } out } -fn gen_int(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_int(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as i32))])), ) } -fn gen_long(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_long(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long(i as i64))])), ) } -fn gen_float(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_float(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Float(i as f32 + 0.5678))])), ) } -fn gen_bool(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_bool(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Boolean(i % 2 == 0))])), ) } -fn gen_double(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_double(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Double(i as f64 + 0.1234))])), ) } -fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_bytes(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let payload = vec![(i & 0xFF) as u8; 16]; Value::Record(vec![("field1".into(), Value::Bytes(payload))]) @@ -86,9 +106,10 @@ fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_string(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_string(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let s = if i % 3 == 0 { format!("value-{i}") @@ -100,30 +121,34 @@ fn gen_string(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_date(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_date(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as i32))])), ) } -fn gen_timemillis(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_timemillis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int((i * 37) as i32))])), ) } -fn gen_timemicros(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_timemicros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long((i * 1_001) as i64))])), ) } -fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_ts_millis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { Value::Record(vec![( "field1".into(), @@ -133,9 +158,10 @@ fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_ts_micros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { Value::Record(vec![( "field1".into(), @@ -145,10 +171,11 @@ fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_map(sc: &ApacheSchema, n: usize) -> Vec { +fn gen_map(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { use std::collections::HashMap; - encode_records( + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let mut m = HashMap::new(); let int_val = |v: i32| Value::Union(0, Box::new(Value::Int(v))); @@ -165,9 +192,10 @@ fn gen_map(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_array(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_array(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let items = (0..5).map(|j| Value::Int(i as i32 + j)).collect(); Value::Record(vec![("field1".into(), Value::Array(items))]) @@ -189,9 +217,10 @@ fn trim_i128_be(v: i128) -> Vec { full[first..].to_vec() } -fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_decimal(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let unscaled = if i % 2 == 0 { i as i128 } else { -(i as i128) }; Value::Record(vec![( @@ -202,9 +231,10 @@ fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_uuid(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let mut raw = (i as u128).to_be_bytes(); raw[6] = (raw[6] & 0x0F) | 0x40; @@ -214,9 +244,10 @@ fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_fixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let mut buf = vec![0u8; 16]; buf[..8].copy_from_slice(&(i as u64).to_be_bytes()); @@ -225,9 +256,10 @@ fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_interval(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let months = (i % 24) as u32; let days = (i % 32) as u32; @@ -241,10 +273,11 @@ fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec { +fn gen_enum(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { const SYMBOLS: [&str; 3] = ["A", "B", "C"]; - encode_records( + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let idx = i % 3; Value::Record(vec![( @@ -255,9 +288,10 @@ fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_mixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { Value::Record(vec![ ("f1".into(), Value::Int(i as i32)), @@ -269,9 +303,10 @@ fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec { ) } -fn gen_nested(sc: &ApacheSchema, n: usize) -> Vec { - encode_records( +fn gen_nested(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec { + encode_records_with_prefix( sc, + prefix, (0..n).map(|i| { let sub = Value::Record(vec![ ("x".into(), Value::Int(i as i32)), @@ -290,12 +325,14 @@ fn new_decoder( batch_size: usize, utf8view: bool, ) -> arrow_avro::reader::Decoder { - let schema: AvroSchema<'static> = serde_json::from_str(schema_json).unwrap(); + let schema = AvroSchema::new(schema_json.parse().unwrap()); + let mut store = arrow_avro::schema::SchemaStore::new(); + store.register(schema.clone()).unwrap(); ReaderBuilder::new() - .with_schema(schema) + .with_writer_schema_store(store) .with_batch_size(batch_size) .with_utf8_view(utf8view) - .build_decoder(io::empty()) + .build_decoder() .expect("failed to build decoder") } @@ -325,8 +362,8 @@ const ARRAY_SCHEMA: &str = r#"{"type":"record","name":"ArrRec","fields":[{"name" const DECIMAL_SCHEMA: &str = r#"{"type":"record","name":"DecRec","fields":[{"name":"field1","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":3}}]}"#; const UUID_SCHEMA: &str = r#"{"type":"record","name":"UuidRec","fields":[{"name":"field1","type":{"type":"string","logicalType":"uuid"}}]}"#; const FIXED_SCHEMA: &str = r#"{"type":"record","name":"FixRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Fixed16","size":16}}]}"#; -const INTERVAL_SCHEMA_ENCODE: &str = r#"{"type":"record","name":"DurRecEnc","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#; const INTERVAL_SCHEMA: &str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12,"logicalType":"duration"}}]}"#; +const INTERVAL_SCHEMA_ENCODE: &str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#; const ENUM_SCHEMA: &str = r#"{"type":"record","name":"EnumRec","fields":[{"name":"field1","type":{"type":"enum","name":"MyEnum","symbols":["A","B","C"]}}]}"#; const MIX_SCHEMA: &str = r#"{"type":"record","name":"MixRec","fields":[{"name":"f1","type":"int"},{"name":"f2","type":"long"},{"name":"f3","type":"string"},{"name":"f4","type":"double"}]}"#; const NEST_SCHEMA: &str = r#"{"type":"record","name":"NestRec","fields":[{"name":"sub","type":{"type":"record","name":"Sub","fields":[{"name":"x","type":"int"},{"name":"y","type":"string"}]}}]}"#; @@ -336,7 +373,13 @@ macro_rules! dataset { static $name: Lazy>> = Lazy::new(|| { let schema = ApacheSchema::parse_str($schema_json).expect("invalid schema for generator"); - SIZES.iter().map(|&n| $gen_fn(&schema, n)).collect() + let arrow_schema = AvroSchema::new($schema_json.to_string()); + let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed"); + let prefix = make_prefix(fingerprint); + SIZES + .iter() + .map(|&n| $gen_fn(&schema, n, &prefix)) + .collect() }); }; } @@ -406,6 +449,14 @@ fn bench_scenario( fn criterion_benches(c: &mut Criterion) { for &batch_size in &[SMALL_BATCH, LARGE_BATCH] { + bench_scenario( + c, + "Interval", + INTERVAL_SCHEMA, + &INTERVAL_DATA, + false, + batch_size, + ); bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size); bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size); bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false, batch_size); @@ -480,14 +531,6 @@ fn criterion_benches(c: &mut Criterion) { false, batch_size, ); - bench_scenario( - c, - "Interval", - INTERVAL_SCHEMA, - &INTERVAL_DATA, - false, - batch_size, - ); bench_scenario( c, "Enum(Dictionary)", diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index d4bba9a1ff03..dcd39845014f 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName}; +use crate::schema::{Attributes, AvroSchema, ComplexType, PrimitiveType, Record, Schema, TypeName}; use arrow_schema::{ - ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, SchemaBuilder, SchemaRef, - TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, + ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION, + DECIMAL128_MAX_SCALE, }; use std::borrow::Cow; use std::collections::HashMap; @@ -139,6 +139,22 @@ impl AvroField { pub fn name(&self) -> &str { &self.name } + + /// Performs schema resolution between a writer and reader schema. + /// + /// This is the primary entry point for handling schema evolution. It produces an + /// `AvroField` that contains all the necessary information to read data written + /// with the `writer` schema as if it were written with the `reader` schema. + pub(crate) fn resolve_from_writer_and_reader<'a>( + writer_schema: &'a Schema<'a>, + reader_schema: &'a Schema<'a>, + use_utf8view: bool, + strict_mode: bool, + ) -> Result { + Err(ArrowError::NotYetImplemented( + "Resolving schema from a writer and reader schema is not yet implemented".to_string(), + )) + } } impl<'a> TryFrom<&Schema<'a>> for AvroField { @@ -164,21 +180,33 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField { /// Builder for an [`AvroField`] #[derive(Debug)] pub struct AvroFieldBuilder<'a> { - schema: &'a Schema<'a>, + writer_schema: &'a Schema<'a>, + reader_schema: Option, use_utf8view: bool, strict_mode: bool, } impl<'a> AvroFieldBuilder<'a> { - /// Creates a new [`AvroFieldBuilder`] - pub fn new(schema: &'a Schema<'a>) -> Self { + /// Creates a new [`AvroFieldBuilder`] for a given writer schema. + pub fn new(writer_schema: &'a Schema<'a>) -> Self { Self { - schema, + writer_schema, + reader_schema: None, use_utf8view: false, strict_mode: false, } } + /// Sets the reader schema for schema resolution. + /// + /// If a reader schema is provided, the builder will produce a resolved `AvroField` + /// that can handle differences between the writer's and reader's schemas. + #[inline] + pub fn with_reader_schema(mut self, reader_schema: AvroSchema) -> Self { + self.reader_schema = Some(reader_schema); + self + } + /// Enable or disable Utf8View support pub fn with_utf8view(mut self, use_utf8view: bool) -> Self { self.use_utf8view = use_utf8view; @@ -193,11 +221,11 @@ impl<'a> AvroFieldBuilder<'a> { /// Build an [`AvroField`] from the builder pub fn build(self) -> Result { - match self.schema { + match self.writer_schema { Schema::Complex(ComplexType::Record(r)) => { let mut resolver = Resolver::default(); let data_type = make_data_type( - self.schema, + self.writer_schema, None, &mut resolver, self.use_utf8view, @@ -210,11 +238,12 @@ impl<'a> AvroFieldBuilder<'a> { } _ => Err(ArrowError::ParseError(format!( "Expected a Record schema to build an AvroField, but got {:?}", - self.schema + self.writer_schema ))), } } } + /// An Avro encoding /// /// @@ -446,7 +475,7 @@ impl<'a> Resolver<'a> { } } -/// Parses a [`AvroDataType`] from the provided [`Schema`] and the given `name` and `namespace` +/// Parses a [`AvroDataType`] from the provided `schema` and the given `name` and `namespace` /// /// `name`: is name used to refer to `schema` in its parent /// `namespace`: an optional qualifier used as part of a type hierarchy diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs index 0f7ffd3f8d6e..2d26df07aa9c 100644 --- a/arrow-avro/src/reader/header.rs +++ b/arrow-avro/src/reader/header.rs @@ -92,7 +92,7 @@ impl Header { } /// Returns the [`Schema`] if any - pub fn schema(&self) -> Result>, ArrowError> { + pub(crate) fn schema(&self) -> Result>, ArrowError> { self.get(SCHEMA_METADATA_KEY) .map(|x| { serde_json::from_slice(x).map_err(|e| { diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 18bc498cd21d..e9bf7af61e1c 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -90,13 +90,18 @@ //! ``` //! -use crate::codec::AvroFieldBuilder; -use crate::schema::Schema as AvroSchema; -use arrow_array::{RecordBatch, RecordBatchReader}; +use crate::codec::{AvroField, AvroFieldBuilder}; +use crate::schema::{ + compare_schemas, generate_fingerprint, AvroSchema, Fingerprint, FingerprintAlgorithm, Schema, + SchemaStore, SINGLE_OBJECT_MAGIC, +}; +use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, SchemaRef}; use block::BlockDecoder; use header::{Header, HeaderDecoder}; +use indexmap::IndexMap; use record::RecordDecoder; +use std::collections::HashMap; use std::io::BufRead; mod block; @@ -128,23 +133,22 @@ fn read_header(mut reader: R) -> Result { /// A low-level interface for decoding Avro-encoded bytes into Arrow `RecordBatch`. #[derive(Debug)] pub struct Decoder { - record_decoder: RecordDecoder, + active_decoder: RecordDecoder, + active_fingerprint: Option, batch_size: usize, - decoded_rows: usize, + remaining_capacity: usize, + cache: IndexMap, + fingerprint_algorithm: FingerprintAlgorithm, + expect_prefix: bool, + utf8_view: bool, + strict_mode: bool, + pending_schema: Option<(Fingerprint, RecordDecoder)>, } impl Decoder { - fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self { - Self { - record_decoder, - batch_size, - decoded_rows: 0, - } - } - /// Return the Arrow schema for the rows decoded by this decoder pub fn schema(&self) -> SchemaRef { - self.record_decoder.schema().clone() + self.active_decoder.schema().clone() } /// Return the configured maximum number of rows per batch @@ -158,39 +162,125 @@ impl Decoder { /// /// Returns the number of bytes consumed. pub fn decode(&mut self, data: &[u8]) -> Result { + if self.expect_prefix + && data.len() >= SINGLE_OBJECT_MAGIC.len() + && !data.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Err(ArrowError::ParseError( + "Expected single‑object encoding fingerprint prefix for first message \ + (writer_schema_store is set but active_fingerprint is None)" + .into(), + )); + } let mut total_consumed = 0usize; - while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + // The loop stops when the batch is full, a schema change is staged, + // or handle_prefix indicates we need more bytes (Some(0)). + while total_consumed < data.len() && self.remaining_capacity > 0 { + if let Some(n) = self.handle_prefix(&data[total_consumed..])? { + // We either consumed a prefix (n > 0) and need a schema switch, or we need + // more bytes to make a decision. Either way, this decoding attempt is finished. + total_consumed += n; + } + // No prefix: decode one row and keep going. + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + self.remaining_capacity -= 1; + total_consumed += n; } Ok(total_consumed) } + // Attempt to handle a single‑object‑encoding prefix at the current position. + // + // * Ok(None) – buffer does not start with the prefix. + // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller should await more bytes. + // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and fingerprint). + fn handle_prefix(&mut self, buf: &[u8]) -> Result, ArrowError> { + // If there is no schema store, prefixes are unrecognized. + if !self.expect_prefix { + return Ok(None); + } + // Need at least the magic bytes to decide (2 bytes). + let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else { + return Ok(Some(0)); // Get more bytes + }; + // Bail out early if the magic does not match. + if magic_bytes != SINGLE_OBJECT_MAGIC { + return Ok(None); // Continue to decode the next record + } + // Try to parse the fingerprint that follows the magic. + let fingerprint_size = match self.fingerprint_algorithm { + FingerprintAlgorithm::Rabin => self + .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| { + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + })?, + }; + // Convert the inner result into a “bytes consumed” count. + // NOTE: Incomplete fingerprint consumes no bytes. + let consumed = fingerprint_size.map_or(0, |n| n + SINGLE_OBJECT_MAGIC.len()); + Ok(Some(consumed)) + } + + // Attempts to read and install a new fingerprint of `N` bytes. + // + // * Ok(None) – insufficient bytes (`buf.len() < `N`). + // * Ok(Some(N)) – fingerprint consumed (always `N`). + fn handle_fingerprint( + &mut self, + buf: &[u8], + fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, + ) -> Result, ArrowError> { + // Need enough bytes to get fingerprint (next N bytes) + let Some(fingerprint_bytes) = buf.get(..N) else { + return Ok(None); // Insufficient bytes + }; + // SAFETY: length checked above. + let new_fingerprint = fingerprint_from(fingerprint_bytes.try_into().unwrap()); + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fingerprint != Some(new_fingerprint) { + let Some(new_decoder) = self.cache.shift_remove(&new_fingerprint) else { + return Err(ArrowError::ParseError(format!( + "Unknown fingerprint: {new_fingerprint:?}" + ))); + }; + self.pending_schema = Some((new_fingerprint, new_decoder)); + // If there are already decoded rows, we must flush them first. + // Reducing `remaining_capacity` to 0 ensures `flush` is called next. + if self.remaining_capacity < self.batch_size { + self.remaining_capacity = 0; + } + } + Ok(Some(N)) + } + /// Produce a `RecordBatch` if at least one row is fully decoded, returning /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result, ArrowError> { - if self.decoded_rows == 0 { - Ok(None) - } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + if self.remaining_capacity == self.batch_size { + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.remaining_capacity = self.batch_size; + // Apply any staged schema switch. + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + self.cache.shift_remove(&old_fingerprint); + self.cache.insert(old_fingerprint, old_decoder); + } else { + self.active_decoder = new_decoder; + } } + Ok(Some(batch)) } /// Returns the number of rows that can be added to this decoder before it is full. pub fn capacity(&self) -> usize { - self.batch_size.saturating_sub(self.decoded_rows) + self.remaining_capacity } /// Returns true if the decoder has reached its capacity for the current batch. pub fn batch_is_full(&self) -> bool { - self.capacity() == 0 + self.remaining_capacity == 0 } } @@ -201,7 +291,9 @@ pub struct ReaderBuilder { batch_size: usize, strict_mode: bool, utf8_view: bool, - schema: Option>, + reader_schema: Option, + writer_schema_store: Option, + active_fingerprint: Option, } impl Default for ReaderBuilder { @@ -210,7 +302,9 @@ impl Default for ReaderBuilder { batch_size: 1024, strict_mode: false, utf8_view: false, - schema: None, + reader_schema: None, + writer_schema_store: None, + active_fingerprint: None, } } } @@ -220,34 +314,118 @@ impl ReaderBuilder { /// - `batch_size` = 1024 /// - `strict_mode` = false /// - `utf8_view` = false - /// - `schema` = None + /// - `reader_schema` = None + /// - `writer_schema_store` = None + /// - `active_fingerprint` = None pub fn new() -> Self { Self::default() } - fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result { - let root_field = AvroFieldBuilder::new(schema) + fn make_record_decoder( + &self, + writer_schema: &Schema, + reader_schema: Option<&AvroSchema>, + ) -> Result { + let mut builder = AvroFieldBuilder::new(writer_schema); + if let Some(reader_schema) = reader_schema { + builder = builder.with_reader_schema(reader_schema.clone()); + } + let root = builder .with_utf8view(self.utf8_view) .with_strict_mode(self.strict_mode) .build()?; - RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view) + RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view) } - fn build_impl(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { - let header = read_header(reader)?; - let record_decoder = if let Some(schema) = &self.schema { - self.make_record_decoder(schema)? - } else { - let avro_schema: Option> = header + fn make_decoder_with_parts( + &self, + active_decoder: RecordDecoder, + active_fingerprint: Option, + cache: IndexMap, + expect_prefix: bool, + fingerprint_algorithm: FingerprintAlgorithm, + ) -> Decoder { + Decoder { + batch_size: self.batch_size, + remaining_capacity: self.batch_size, + active_fingerprint, + active_decoder, + cache, + expect_prefix, + utf8_view: self.utf8_view, + fingerprint_algorithm, + strict_mode: self.strict_mode, + pending_schema: None, + } + } + + fn make_decoder( + &self, + header: Option<&Header>, + reader_schema: Option<&AvroSchema>, + ) -> Result { + if let Some(hdr) = header { + let writer_schema = hdr .schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - let avro_schema = avro_schema.ok_or_else(|| { - ArrowError::ParseError("No Avro schema present in file header".to_string()) + .map_err(|e| ArrowError::ExternalError(Box::new(e)))? + .ok_or_else(|| { + ArrowError::ParseError("No Avro schema present in file header".into()) + })?; + let record_decoder = self.make_record_decoder(&writer_schema, reader_schema)?; + return Ok(self.make_decoder_with_parts( + record_decoder, + None, + IndexMap::new(), + false, + FingerprintAlgorithm::Rabin, + )); + } + let store = self.writer_schema_store.as_ref().ok_or_else(|| { + ArrowError::ParseError("Writer schema store required for raw Avro".into()) + })?; + let fingerprints = store.fingerprints(); + if fingerprints.is_empty() { + return Err(ArrowError::ParseError( + "Writer schema store must contain at least one schema".into(), + )); + } + let start_fingerprint = self + .active_fingerprint + .or_else(|| fingerprints.first().copied()) + .ok_or_else(|| { + ArrowError::ParseError("Could not determine initial schema fingerprint".into()) })?; - self.make_record_decoder(&avro_schema)? - }; - let decoder = Decoder::new(record_decoder, self.batch_size); - Ok((header, decoder)) + let mut cache = IndexMap::with_capacity(fingerprints.len().saturating_sub(1)); + let mut active_decoder: Option = None; + for fingerprint in store.fingerprints() { + let avro_schema = match store.lookup(&fingerprint) { + Some(schema) => schema, + None => { + return Err(ArrowError::ComputeError(format!( + "Fingerprint {fingerprint:?} not found in schema store", + ))); + } + }; + let writer_schema = avro_schema.schema()?; + let decoder = self.make_record_decoder(&writer_schema, reader_schema)?; + if fingerprint == start_fingerprint { + active_decoder = Some(decoder); + } else { + cache.insert(fingerprint, decoder); + } + } + let active_decoder = active_decoder.ok_or_else(|| { + ArrowError::ComputeError(format!( + "Initial fingerprint {start_fingerprint:?} not found in schema store" + )) + })?; + Ok(self.make_decoder_with_parts( + active_decoder, + Some(start_fingerprint), + cache, + true, + store.fingerprint_algorithm(), + )) } /// Sets the row-based batch size @@ -276,17 +454,42 @@ impl ReaderBuilder { self } - /// Sets the Avro schema. + /// Sets the Avro reader schema. /// /// If a schema is not provided, the schema will be read from the Avro file header. - pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self { - self.schema = Some(schema); + pub fn with_reader_schema(mut self, schema: AvroSchema) -> Self { + self.reader_schema = Some(schema); + self + } + + /// Sets the `SchemaStore` used for resolving writer schemas. + /// + /// This is necessary when decoding single-object encoded data that identifies + /// schemas by a fingerprint. The store allows the decoder to look up the + /// full writer schema from a fingerprint embedded in the data. + /// + /// Defaults to `None`. + pub fn with_writer_schema_store(mut self, store: SchemaStore) -> Self { + self.writer_schema_store = Some(store); + self + } + + /// Sets the initial schema fingerprint for decoding single-object encoded data. + /// + /// This is useful when the data stream does not begin with a schema definition + /// or fingerprint, allowing the decoder to start with a known schema from the + /// `SchemaStore`. + /// + /// Defaults to `None`. + pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self { + self.active_fingerprint = Some(fp); self } /// Create a [`Reader`] from this builder and a `BufRead` pub fn build(self, mut reader: R) -> Result, ArrowError> { - let (header, decoder) = self.build_impl(&mut reader)?; + let header = read_header(&mut reader)?; + let decoder = self.make_decoder(Some(&header), self.reader_schema.as_ref())?; Ok(Reader { reader, header, @@ -298,20 +501,14 @@ impl ReaderBuilder { }) } - /// Create a [`Decoder`] from this builder and a `BufRead` by - /// reading and parsing the Avro file's header. This will - /// not create a full [`Reader`]. - pub fn build_decoder(self, mut reader: R) -> Result { - match self.schema { - Some(ref schema) => { - let record_decoder = self.make_record_decoder(schema)?; - Ok(Decoder::new(record_decoder, self.batch_size)) - } - None => { - let (_, decoder) = self.build_impl(&mut reader)?; - Ok(decoder) - } + /// Create a [`Decoder`] from this builder. + pub fn build_decoder(self) -> Result { + if self.writer_schema_store.is_none() { + return Err(ArrowError::InvalidArgumentError( + "Building a decoder requires a writer schema store".to_string(), + )); } + self.make_decoder(None, self.reader_schema.as_ref()) } } @@ -391,11 +588,15 @@ impl RecordBatchReader for Reader { #[cfg(test)] mod test { - use crate::codec::{AvroDataType, AvroField, Codec}; + use crate::codec::{AvroDataType, AvroField, AvroFieldBuilder, Codec}; use crate::compression::CompressionCodec; use crate::reader::record::RecordDecoder; use crate::reader::vlq::VLQDecoder; use crate::reader::{read_header, Decoder, Reader, ReaderBuilder}; + use crate::schema::{ + AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType, Schema as AvroRaw, + SchemaStore, SINGLE_OBJECT_MAGIC, + }; use crate::test_util::arrow_test_data; use arrow::array::ArrayDataBuilder; use arrow_array::builder::{ @@ -433,7 +634,7 @@ mod test { batch_size: usize, utf8_view: bool, ) -> Result>, ArrowError> { - let file = File::open(path).unwrap(); + let file = File::open(path)?; ReaderBuilder::new() .with_batch_size(batch_size) .with_utf8_view(utf8_view) @@ -460,6 +661,160 @@ mod test { } } + fn make_record_schema(pt: PrimitiveType) -> AvroSchema { + let js = format!( + r#"{{"type":"record","name":"TestRecord","fields":[{{"name":"a","type":"{}"}}]}}"#, + pt.as_ref() + ); + AvroSchema::new(js) + } + + fn make_two_schema_store() -> ( + SchemaStore, + Fingerprint, + Fingerprint, + AvroSchema, + AvroSchema, + ) { + let schema_int = make_record_schema(PrimitiveType::Int); + let schema_long = make_record_schema(PrimitiveType::Long); + let mut store = SchemaStore::new(); + let fp_int = store + .register(schema_int.clone()) + .expect("register int schema"); + let fp_long = store + .register(schema_long.clone()) + .expect("register long schema"); + (store, fp_int, fp_long, schema_int, schema_long) + } + + fn make_prefix(fp: Fingerprint) -> Vec { + match fp { + Fingerprint::Rabin(v) => { + let mut out = Vec::with_capacity(2 + 8); + out.extend_from_slice(&SINGLE_OBJECT_MAGIC); + out.extend_from_slice(&v.to_le_bytes()); + out + } + } + } + + fn make_decoder(store: &SchemaStore, fp: Fingerprint, reader_schema: &AvroSchema) -> Decoder { + ReaderBuilder::new() + .with_batch_size(8) + .with_reader_schema(reader_schema.clone()) + .with_writer_schema_store(store.clone()) + .with_active_fingerprint(fp) + .build_decoder() + .expect("decoder") + } + + #[test] + fn test_schema_store_register_lookup() { + let schema_int = make_record_schema(PrimitiveType::Int); + let schema_long = make_record_schema(PrimitiveType::Long); + let mut store = SchemaStore::new(); + let fp_int = store.register(schema_int.clone()).unwrap(); + let fp_long = store.register(schema_long.clone()).unwrap(); + assert_eq!(store.lookup(&fp_int).cloned(), Some(schema_int)); + assert_eq!(store.lookup(&fp_long).cloned(), Some(schema_long)); + assert_eq!(store.fingerprint_algorithm(), FingerprintAlgorithm::Rabin); + } + + #[test] + fn test_unknown_fingerprint_is_error() { + let (store, fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store(); + let unknown_fp = Fingerprint::Rabin(0xDEAD_BEEF_DEAD_BEEF); + let prefix = make_prefix(unknown_fp); + let mut decoder = make_decoder(&store, fp_int, &schema_int); + let err = decoder.decode(&prefix).expect_err("decode should error"); + let msg = err.to_string(); + assert!( + msg.contains("Unknown fingerprint"), + "unexpected message: {msg}" + ); + } + + #[test] + fn test_missing_initial_fingerprint_error() { + let (store, _fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store(); + let mut decoder = ReaderBuilder::new() + .with_batch_size(8) + .with_reader_schema(schema_int.clone()) + .with_writer_schema_store(store) + .build_decoder() + .unwrap(); + let buf = [0x02u8, 0x00u8]; + let err = decoder.decode(&buf).expect_err("decode should error"); + let msg = err.to_string(); + assert!( + msg.contains("Expected single‑object encoding fingerprint"), + "unexpected message: {msg}" + ); + } + + #[test] + fn test_handle_prefix_no_schema_store() { + let (store, fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store(); + let mut decoder = make_decoder(&store, fp_int, &schema_int); + decoder.expect_prefix = false; + let res = decoder + .handle_prefix(&SINGLE_OBJECT_MAGIC[..]) + .expect("handle_prefix"); + assert!(res.is_none(), "Expected None when expect_prefix is false"); + } + + #[test] + fn test_handle_prefix_incomplete_magic() { + let (store, fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store(); + let mut decoder = make_decoder(&store, fp_int, &schema_int); + let buf = &SINGLE_OBJECT_MAGIC[..1]; + let res = decoder.handle_prefix(buf).unwrap(); + assert_eq!(res, Some(0)); + assert!(decoder.pending_schema.is_none()); + } + + #[test] + fn test_handle_prefix_magic_mismatch() { + let (store, fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store(); + let mut decoder = make_decoder(&store, fp_int, &schema_int); + let buf = [0xFFu8, 0x00u8, 0x01u8]; + let res = decoder.handle_prefix(&buf).unwrap(); + assert!(res.is_none()); + } + + #[test] + fn test_handle_prefix_incomplete_fingerprint() { + let (store, fp_int, fp_long, schema_int, _schema_long) = make_two_schema_store(); + let mut decoder = make_decoder(&store, fp_int, &schema_int); + let long_bytes = match fp_long { + Fingerprint::Rabin(v) => v.to_le_bytes(), + }; + let mut buf = Vec::from(SINGLE_OBJECT_MAGIC); + buf.extend_from_slice(&long_bytes[..4]); + let res = decoder.handle_prefix(&buf).unwrap(); + assert_eq!(res, Some(0)); + assert!(decoder.pending_schema.is_none()); + } + + #[test] + fn test_handle_prefix_valid_prefix_switches_schema() { + let (store, fp_int, fp_long, schema_int, schema_long) = make_two_schema_store(); + let mut decoder = make_decoder(&store, fp_int, &schema_int); + let writer_schema_long = schema_long.schema().unwrap(); + let root_long = AvroFieldBuilder::new(&writer_schema_long).build().unwrap(); + let long_decoder = + RecordDecoder::try_new_with_options(root_long.data_type(), decoder.utf8_view).unwrap(); + let _ = decoder.cache.insert(fp_long, long_decoder); + let mut buf = Vec::from(SINGLE_OBJECT_MAGIC); + let Fingerprint::Rabin(v) = fp_long; + buf.extend_from_slice(&v.to_le_bytes()); + let consumed = decoder.handle_prefix(&buf).unwrap().unwrap(); + assert_eq!(consumed, buf.len()); + assert!(decoder.pending_schema.is_some()); + assert_eq!(decoder.pending_schema.as_ref().unwrap().0, fp_long); + } + #[test] fn test_utf8view_support() { let schema_json = r#"{ @@ -793,28 +1148,31 @@ mod test { }, ]; for test in tests { - let schema_s2: crate::schema::Schema = serde_json::from_str(test.schema).unwrap(); + let avro_schema = AvroSchema::new(test.schema.to_string()); + let mut store = SchemaStore::new(); + let fp = store.register(avro_schema.clone()).unwrap(); + let prefix = make_prefix(fp); let record_val = "some_string"; - let mut body = vec![]; + let mut body = prefix; body.push((record_val.len() as u8) << 1); body.extend_from_slice(record_val.as_bytes()); - let mut reader_placeholder = Cursor::new(&[] as &[u8]); - let builder = ReaderBuilder::new() + let decoder_res = ReaderBuilder::new() .with_batch_size(1) - .with_schema(schema_s2); - let decoder_result = builder.build_decoder(&mut reader_placeholder); - let decoder = match decoder_result { - Ok(decoder) => decoder, + .with_writer_schema_store(store) + .with_active_fingerprint(fp) + .build_decoder(); + let decoder = match decoder_res { + Ok(d) => d, Err(e) => { if let Some(expected) = test.expected_error { assert!( e.to_string().contains(expected), - "Test '{}' failed: unexpected error message at build.\nExpected to contain: '{expected}'\nActual: '{e}'", - test.name, + "Test '{}' failed at build – expected '{expected}', got '{e}'", + test.name ); continue; } else { - panic!("Test '{}' failed at decoder build: {e}", test.name); + panic!("Test '{}' failed during build: {e}", test.name); } } }; @@ -831,32 +1189,23 @@ mod test { let expected_array = Arc::new(StringArray::from(vec![record_val])); let expected_batch = RecordBatch::try_new(expected_schema, vec![expected_array]).unwrap(); - assert_eq!(batch, expected_batch, "Test '{}' failed", test.name); - assert_eq!( - batch.schema().field(0).name(), - "f2", - "Test '{}' failed", - test.name - ); + assert_eq!(batch, expected_batch, "Test '{}'", test.name); } (Err(e), Some(expected)) => { assert!( e.to_string().contains(expected), - "Test '{}' failed: unexpected error message at decode.\nExpected to contain: '{expected}'\nActual: '{e}'", - test.name, + "Test '{}' – expected error containing '{expected}', got '{e}'", + test.name ); } - (Ok(batches), Some(expected)) => { + (Ok(_), Some(expected)) => { panic!( - "Test '{}' was expected to fail with '{expected}', but it succeeded with: {:?}", - test.name, batches + "Test '{}' expected failure ('{expected}') but succeeded", + test.name ); } (Err(e), None) => { - panic!( - "Test '{}' was not expected to fail, but it did with '{e}'", - test.name - ); + panic!("Test '{}' unexpectedly failed with '{e}'", test.name); } } }