From 9f35502d39c5611c3849e7c0739f3b60d7ba23b7 Mon Sep 17 00:00:00 2001 From: Connor Sanders Date: Mon, 25 Aug 2025 12:35:29 -0500 Subject: [PATCH 1/3] Enable `arrow-avro` to handle writer-only fields during schema resolution. - Added skipping logic for writer-only fields in `RecordDecoder`. - Introduced `ResolvedRuntime` for runtime decoding adjustments. - Updated tests to validate skipping functionality. - Refactored block-wise processing for optimized performance. --- arrow-avro/src/codec.rs | 12 +- arrow-avro/src/reader/mod.rs | 80 ++++- arrow-avro/src/reader/record.rs | 540 ++++++++++++++++++++++++++++++-- 3 files changed, 596 insertions(+), 36 deletions(-) diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 89a66ddbaa85..dad2a5e42464 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -955,7 +955,7 @@ impl<'a> Maker<'a> { // Prepare outputs let mut reader_fields: Vec = Vec::with_capacity(reader_record.fields.len()); let mut writer_to_reader: Vec> = vec![None; writer_record.fields.len()]; - //let mut skip_fields: Vec> = vec![None; writer_record.fields.len()]; + let mut skip_fields: Vec> = vec![None; writer_record.fields.len()]; //let mut default_fields: Vec = Vec::new(); // Build reader fields and mapping for (reader_idx, r_field) in reader_record.fields.iter().enumerate() { @@ -975,6 +975,14 @@ impl<'a> Maker<'a> { )); } } + // Any writer fields not mapped should be skipped + for (writer_idx, writer_field) in writer_record.fields.iter().enumerate() { + if writer_to_reader[writer_idx].is_none() { + // Parse writer field type to know how to skip data + let writer_dt = self.parse_type(&writer_field.r#type, writer_ns)?; + skip_fields[writer_idx] = Some(writer_dt); + } + } // Implement writer-only fields to skip in Follow-up PR here // Build resolved record AvroDataType let resolved = AvroDataType::new_with_resolution( @@ -984,7 +992,7 @@ impl<'a> Maker<'a> { Some(ResolutionInfo::Record(ResolvedRecord { writer_to_reader: Arc::from(writer_to_reader), default_fields: Arc::default(), - skip_fields: Arc::default(), + skip_fields: Arc::from(skip_fields), })), ); // Register a resolved record by reader name+namespace for potential named type refs diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 3f2daff0a3b1..41aa01e6da4d 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -863,12 +863,39 @@ mod test { .with_reader_schema(reader_schema) .build(BufReader::new(file)) .unwrap(); - let schema = reader.schema(); let batches = reader.collect::, _>>().unwrap(); arrow::compute::concat_batches(&schema, &batches).unwrap() } + fn make_reader_schema_with_selected_fields_in_order( + path: &str, + selected: &[&str], + ) -> AvroSchema { + let mut root = load_writer_schema_json(path); + assert_eq!(root["type"], "record", "writer schema must be a record"); + let writer_fields = root + .get("fields") + .and_then(|f| f.as_array()) + .expect("record has fields"); + let mut field_map: HashMap = HashMap::with_capacity(writer_fields.len()); + for f in writer_fields { + if let Some(name) = f.get("name").and_then(|n| n.as_str()) { + field_map.insert(name.to_string(), f.clone()); + } + } + let mut new_fields = Vec::with_capacity(selected.len()); + for name in selected { + let f = field_map + .get(*name) + .unwrap_or_else(|| panic!("field '{name}' not found in writer schema")) + .clone(); + new_fields.push(f); + } + root["fields"] = Value::Array(new_fields); + AvroSchema::new(root.to_string()) + } + #[test] fn test_alltypes_schema_promotion_mixed() { let files = [ @@ -1537,6 +1564,57 @@ mod test { assert!(batch.column(0).as_any().is::()); } + #[test] + fn test_alltypes_skip_writer_fields_keep_double_only() { + let file = arrow_test_data("avro/alltypes_plain.avro"); + let reader_schema = + make_reader_schema_with_selected_fields_in_order(&file, &["double_col"]); + let batch = read_alltypes_with_reader_schema(&file, reader_schema); + let expected = RecordBatch::try_from_iter_with_nullable([( + "double_col", + Arc::new(Float64Array::from_iter_values( + (0..8).map(|x| (x % 2) as f64 * 10.1), + )) as _, + true, + )]) + .unwrap(); + assert_eq!(batch, expected); + } + + #[test] + fn test_alltypes_skip_writer_fields_reorder_and_skip_many() { + let file = arrow_test_data("avro/alltypes_plain.avro"); + let reader_schema = + make_reader_schema_with_selected_fields_in_order(&file, &["timestamp_col", "id"]); + let batch = read_alltypes_with_reader_schema(&file, reader_schema); + let expected = RecordBatch::try_from_iter_with_nullable([ + ( + "timestamp_col", + Arc::new( + TimestampMicrosecondArray::from_iter_values([ + 1235865600000000, // 2009-03-01T00:00:00.000 + 1235865660000000, // 2009-03-01T00:01:00.000 + 1238544000000000, // 2009-04-01T00:00:00.000 + 1238544060000000, // 2009-04-01T00:01:00.000 + 1233446400000000, // 2009-02-01T00:00:00.000 + 1233446460000000, // 2009-02-01T00:01:00.000 + 1230768000000000, // 2009-01-01T00:00:00.000 + 1230768060000000, // 2009-01-01T00:01:00.000 + ]) + .with_timezone("+00:00"), + ) as _, + true, + ), + ( + "id", + Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _, + true, + ), + ]) + .unwrap(); + assert_eq!(batch, expected); + } + #[test] fn test_read_zero_byte_avro_file() { let batch = read_file("test/data/zero_byte.avro", 3, false); diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 46f09cd0aa2a..e76122be61ad 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -70,6 +70,15 @@ pub(crate) struct RecordDecoder { schema: SchemaRef, fields: Vec, use_utf8view: bool, + resolved: Option, +} + +#[derive(Debug)] +struct ResolvedRuntime { + /// writer field index -> reader field index (or None if writer-only) + writer_to_reader: Arc<[Option]>, + /// per-writer-field skipper (Some only when writer-only) + skip_decoders: Vec>, } impl RecordDecoder { @@ -101,14 +110,35 @@ impl RecordDecoder { data_type: &AvroDataType, use_utf8view: bool, ) -> Result { - match Decoder::try_new(data_type)? { - Decoder::Record(fields, encodings) => Ok(Self { - schema: Arc::new(ArrowSchema::new(fields)), - fields: encodings, - use_utf8view, - }), - encoding => Err(ArrowError::ParseError(format!( - "Expected record got {encoding:?}" + match data_type.codec() { + Codec::Struct(reader_fields) => { + // Build Arrow schema fields and per-child decoders + let mut arrow_fields = Vec::with_capacity(reader_fields.len()); + let mut encodings = Vec::with_capacity(reader_fields.len()); + for avro_field in reader_fields.iter() { + arrow_fields.push(avro_field.field()); + encodings.push(Decoder::try_new(avro_field.data_type())?); + } + // If this record carries resolution metadata, prepare top-level runtime helpers + let resolved = match data_type.resolution.as_ref() { + Some(ResolutionInfo::Record(rec)) => { + let skip_decoders = build_skip_decoders(&rec.skip_fields)?; + Some(ResolvedRuntime { + writer_to_reader: rec.writer_to_reader.clone(), + skip_decoders, + }) + } + _ => None, + }; + Ok(Self { + schema: Arc::new(ArrowSchema::new(arrow_fields)), + fields: encodings, + use_utf8view, + resolved, + }) + } + other => Err(ArrowError::ParseError(format!( + "Expected record got {other:?}" ))), } } @@ -121,9 +151,25 @@ impl RecordDecoder { /// Decode `count` records from `buf` pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result { let mut cursor = AvroCursor::new(buf); - for _ in 0..count { - for field in &mut self.fields { - field.decode(&mut cursor)?; + match self.resolved.as_mut() { + Some(runtime) => { + // Top-level resolved record: read writer fields in writer order, + // project into reader fields, and skip writer-only fields + for _ in 0..count { + decode_with_resolution( + &mut cursor, + &mut self.fields, + &runtime.writer_to_reader, + &mut runtime.skip_decoders, + )?; + } + } + None => { + for _ in 0..count { + for field in &mut self.fields { + field.decode(&mut cursor)?; + } + } } } Ok(cursor.position()) @@ -136,11 +182,30 @@ impl RecordDecoder { .iter_mut() .map(|x| x.flush(None)) .collect::, _>>()?; - RecordBatch::try_new(self.schema.clone(), arrays) } } +fn decode_with_resolution( + buf: &mut AvroCursor<'_>, + encodings: &mut [Decoder], + writer_to_reader: &[Option], + skippers: &mut [Option], +) -> Result<(), ArrowError> { + for (w_idx, (target, skipper_opt)) in writer_to_reader.iter().zip(skippers).enumerate() { + match (*target, skipper_opt.as_mut()) { + (Some(r_idx), _) => encodings[r_idx].decode(buf)?, + (None, Some(sk)) => sk.skip(buf)?, + (None, None) => { + return Err(ArrowError::SchemaError(format!( + "No skipper available for writer-only field at index {w_idx}", + ))); + } + } + } + Ok(()) +} + #[derive(Debug)] enum Decoder { Null(usize), @@ -183,6 +248,13 @@ enum Decoder { Decimal128(usize, Option, Option, Decimal128Builder), Decimal256(usize, Option, Option, Decimal256Builder), Nullable(Nullability, NullBufferBuilder, Box), + /// Resolved record that needs writer->reader projection and skipping writer-only fields + RecordResolved { + fields: Fields, + encodings: Vec, + writer_to_reader: Arc<[Option]>, + skip_decoders: Vec>, + }, } impl Decoder { @@ -307,7 +379,17 @@ impl Decoder { arrow_fields.push(avro_field.field()); encodings.push(encoding); } - Self::Record(arrow_fields.into(), encodings) + if let Some(ResolutionInfo::Record(rec)) = data_type.resolution.as_ref() { + let skip_decoders = build_skip_decoders(&rec.skip_fields)?; + Self::RecordResolved { + fields: arrow_fields.into(), + encodings, + writer_to_reader: rec.writer_to_reader.clone(), + skip_decoders, + } + } else { + Self::Record(arrow_fields.into(), encodings) + } } (Codec::Map(child), _) => { let val_field = child.field_with_name("value").with_nullable(true); @@ -384,6 +466,9 @@ impl Decoder { null_buffer.append(false); inner.append_null(); } + Self::RecordResolved { encodings, .. } => { + encodings.iter_mut().for_each(|e| e.append_null()); + } } } @@ -485,7 +570,7 @@ impl Decoder { Nullability::NullSecond => branch == 0, }; if is_not_null { - // It is mportant to decode before appending to null buffer in case of decode error + // It is important to decode before appending to null buffer in case of decode error encoding.decode(buf)?; nb.append(true); } else { @@ -493,6 +578,14 @@ impl Decoder { nb.append(false); } } + Self::RecordResolved { + encodings, + writer_to_reader, + skip_decoders, + .. + } => { + decode_with_resolution(buf, encodings, writer_to_reader, skip_decoders)?; + } } Ok(()) } @@ -641,50 +734,79 @@ impl Decoder { .map_err(|e| ArrowError::ParseError(e.to_string()))?; Arc::new(vals) } + Self::RecordResolved { + fields, encodings, .. + } => { + let arrays = encodings + .iter_mut() + .map(|x| x.flush(None)) + .collect::, _>>()?; + Arc::new(StructArray::new(fields.clone(), arrays, nulls)) + } }) } } +#[derive(Debug, Copy, Clone)] +enum NegativeBlockBehavior { + ProcessItems, + SkipBySize, +} + +#[inline] +fn skip_blocks( + buf: &mut AvroCursor<'_>, + mut skip_item: impl FnMut(&mut AvroCursor<'_>) -> Result<(), ArrowError>, + _skip_negative_block_by_size: bool, +) -> Result { + process_blockwise( + buf, + move |c| skip_item(c), + NegativeBlockBehavior::SkipBySize, + ) +} + #[inline] fn read_blocks( buf: &mut AvroCursor, decode_entry: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, ) -> Result { - read_blockwise_items(buf, true, decode_entry) + process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems) } #[inline] -fn read_blockwise_items( - buf: &mut AvroCursor, - read_size_after_negative: bool, - mut decode_fn: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, +fn process_blockwise( + buf: &mut AvroCursor<'_>, + mut on_item: impl FnMut(&mut AvroCursor<'_>) -> Result<(), ArrowError>, + negative_behavior: NegativeBlockBehavior, ) -> Result { let mut total = 0usize; loop { - // Read the block count - // positive = that many items - // negative = that many items + read block size - // See: https://avro.apache.org/docs/1.11.1/specification/#maps let block_count = buf.get_long()?; match block_count.cmp(&0) { Ordering::Equal => break, Ordering::Less => { - // If block_count is negative, read the absolute value of count, - // then read the block size as a long and discard let count = (-block_count) as usize; - if read_size_after_negative { - let _size_in_bytes = buf.get_long()?; - } - for _ in 0..count { - decode_fn(buf)?; + // A negative count is followed by a long of the size in bytes + let size_in_bytes = buf.get_long()? as usize; + match negative_behavior { + NegativeBlockBehavior::ProcessItems => { + // Process items one-by-one after reading size + for _ in 0..count { + on_item(buf)?; + } + } + NegativeBlockBehavior::SkipBySize => { + // Skip the entire block payload at once + let _ = buf.get_fixed(size_in_bytes)?; + } } total += count; } Ordering::Greater => { - // If block_count is positive, decode that many items let count = block_count as usize; - for _i in 0..count { - decode_fn(buf)?; + for _ in 0..count { + on_item(buf)?; } total += count; } @@ -736,6 +858,166 @@ fn sign_extend_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { Ok(arr) } +/// Lightweight skipping decoder for writer-only fields +#[derive(Debug)] +enum Skipper { + Null, + Boolean, + Int32, + Int64, + Float32, + Float64, + Bytes, + String, + Date32, + TimeMillis, + TimeMicros, + TimestampMillis, + TimestampMicros, + Fixed(usize), + Decimal(Option), + UuidString, + Enum, + DurationFixed12, + List(Box), + Map(Box), + Struct(Vec), + Nullable(Nullability, Box), +} + +impl Skipper { + fn from_avro(dt: &AvroDataType) -> Result { + let mut base = match dt.codec() { + Codec::Null => Self::Null, + Codec::Boolean => Self::Boolean, + Codec::Int32 | Codec::Date32 | Codec::TimeMillis => Self::Int32, + Codec::Int64 => Self::Int64, + Codec::TimeMicros => Self::TimeMicros, + Codec::TimestampMillis(_) => Self::TimestampMillis, + Codec::TimestampMicros(_) => Self::TimestampMicros, + Codec::Float32 => Self::Float32, + Codec::Float64 => Self::Float64, + Codec::Binary => Self::Bytes, + Codec::Utf8 | Codec::Utf8View => Self::String, + Codec::Fixed(sz) => Self::Fixed(*sz as usize), + Codec::Decimal(_, _, size) => Self::Decimal(*size), + Codec::Uuid => Self::UuidString, // encoded as string + Codec::Enum(_) => Self::Enum, + Codec::List(item) => Self::List(Box::new(Skipper::from_avro(item)?)), + Codec::Struct(fields) => Self::Struct( + fields + .iter() + .map(|f| Skipper::from_avro(f.data_type())) + .collect::>()?, + ), + Codec::Map(values) => Self::Map(Box::new(Skipper::from_avro(values)?)), + Codec::Interval => Self::DurationFixed12, + _ => { + return Err(ArrowError::NotYetImplemented(format!( + "Skipper not implemented for codec {:?}", + dt.codec() + ))); + } + }; + if let Some(n) = dt.nullability() { + base = Self::Nullable(n, Box::new(base)); + } + Ok(base) + } + + fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> { + match self { + Self::Null => Ok(()), + Self::Boolean => { + buf.get_bool()?; + Ok(()) + } + Self::Int32 | Self::Date32 | Self::TimeMillis => { + buf.get_int()?; + Ok(()) + } + Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => { + buf.get_long()?; + Ok(()) + } + Self::Float32 => { + buf.get_float()?; + Ok(()) + } + Self::Float64 => { + buf.get_double()?; + Ok(()) + } + Self::Bytes | Self::String | Self::UuidString => { + buf.get_bytes()?; + Ok(()) + } + Self::Fixed(sz) => { + buf.get_fixed(*sz)?; + Ok(()) + } + Self::Decimal(size) => { + if let Some(s) = size { + buf.get_fixed(*s) + } else { + buf.get_bytes() + }?; + Ok(()) + } + Self::Enum => { + buf.get_int()?; + Ok(()) + } + Self::DurationFixed12 => { + buf.get_fixed(12)?; + Ok(()) + } + Self::List(item) => { + skip_blocks(buf, |c| item.skip(c), true)?; + Ok(()) + } + Self::Map(value) => { + skip_blocks( + buf, + |c| { + c.get_bytes()?; // key + value.skip(c) + }, + true, + )?; + Ok(()) + } + Self::Struct(fields) => { + for f in fields.iter_mut() { + f.skip(buf)? + } + Ok(()) + } + Self::Nullable(order, inner) => { + let branch = buf.read_vlq()?; + let is_not_null = match *order { + Nullability::NullFirst => branch != 0, + Nullability::NullSecond => branch == 0, + }; + if is_not_null { + inner.skip(buf)?; + } + Ok(()) + } + } + } +} + +#[inline] +fn build_skip_decoders( + skip_fields: &[Option], +) -> Result>, ArrowError> { + skip_fields + .iter() + .map(|opt| opt.as_ref().map(Skipper::from_avro).transpose()) + .collect() +} + #[cfg(test)] mod tests { use super::*; @@ -1471,4 +1753,196 @@ mod tests { assert!(int_array.is_null(0)); // row1 is null assert_eq!(int_array.value(1), 42); // row3 value is 42 } + + fn make_record_resolved_decoder( + reader_fields: &[(&str, DataType, bool)], + writer_to_reader: Vec>, + mut skip_decoders: Vec>, + ) -> Decoder { + let mut field_refs: Vec = Vec::with_capacity(reader_fields.len()); + let mut encodings: Vec = Vec::with_capacity(reader_fields.len()); + for (name, dt, nullable) in reader_fields { + field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(), *nullable))); + let enc = match dt { + DataType::Int32 => Decoder::Int32(Vec::new()), + DataType::Int64 => Decoder::Int64(Vec::new()), + DataType::Utf8 => { + Decoder::String(OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::new()) + } + other => panic!("Unsupported test reader field type: {other:?}"), + }; + encodings.push(enc); + } + let fields: Fields = field_refs.into(); + Decoder::RecordResolved { + fields, + encodings, + writer_to_reader: Arc::from(writer_to_reader), + skip_decoders, + } + } + + #[test] + fn test_skip_writer_trailing_field_int32() { + let mut dec = make_record_resolved_decoder( + &[("id", arrow_schema::DataType::Int32, false)], + vec![Some(0), None], + vec![None, Some(super::Skipper::Int32)], + ); + let mut data = Vec::new(); + data.extend_from_slice(&encode_avro_int(7)); + data.extend_from_slice(&encode_avro_int(999)); + let mut cur = AvroCursor::new(&data); + dec.decode(&mut cur).unwrap(); + assert_eq!(cur.position(), data.len()); + let arr = dec.flush(None).unwrap(); + let struct_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(struct_arr.len(), 1); + let id = struct_arr + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id.value(0), 7); + } + + #[test] + fn test_skip_writer_middle_field_string() { + let mut dec = make_record_resolved_decoder( + &[ + ("id", DataType::Int32, false), + ("score", DataType::Int64, false), + ], + vec![Some(0), None, Some(1)], + vec![None, Some(Skipper::String), None], + ); + let mut data = Vec::new(); + data.extend_from_slice(&encode_avro_int(42)); + data.extend_from_slice(&encode_avro_bytes(b"abcdef")); + data.extend_from_slice(&encode_avro_long(1000)); + let mut cur = AvroCursor::new(&data); + dec.decode(&mut cur).unwrap(); + assert_eq!(cur.position(), data.len()); + let arr = dec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::().unwrap(); + let id = s + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let score = s + .column_by_name("score") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id.value(0), 42); + assert_eq!(score.value(0), 1000); + } + + #[test] + fn test_skip_writer_array_with_negative_block_count_fast() { + let mut dec = make_record_resolved_decoder( + &[("id", DataType::Int32, false)], + vec![None, Some(0)], + vec![Some(super::Skipper::List(Box::new(Skipper::Int32))), None], + ); + let mut array_payload = Vec::new(); + array_payload.extend_from_slice(&encode_avro_int(1)); + array_payload.extend_from_slice(&encode_avro_int(2)); + array_payload.extend_from_slice(&encode_avro_int(3)); + let mut data = Vec::new(); + data.extend_from_slice(&encode_avro_long(-3)); + data.extend_from_slice(&encode_avro_long(array_payload.len() as i64)); + data.extend_from_slice(&array_payload); + data.extend_from_slice(&encode_avro_long(0)); + data.extend_from_slice(&encode_avro_int(5)); + let mut cur = AvroCursor::new(&data); + dec.decode(&mut cur).unwrap(); + assert_eq!(cur.position(), data.len()); + let arr = dec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::().unwrap(); + let id = s + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id.len(), 1); + assert_eq!(id.value(0), 5); + } + + #[test] + fn test_skip_writer_map_with_negative_block_count_fast() { + let mut dec = make_record_resolved_decoder( + &[("id", DataType::Int32, false)], + vec![None, Some(0)], + vec![Some(Skipper::Map(Box::new(Skipper::Int32))), None], + ); + let mut entries = Vec::new(); + entries.extend_from_slice(&encode_avro_bytes(b"k1")); + entries.extend_from_slice(&encode_avro_int(10)); + entries.extend_from_slice(&encode_avro_bytes(b"k2")); + entries.extend_from_slice(&encode_avro_int(20)); + let mut data = Vec::new(); + data.extend_from_slice(&encode_avro_long(-2)); + data.extend_from_slice(&encode_avro_long(entries.len() as i64)); + data.extend_from_slice(&entries); + data.extend_from_slice(&encode_avro_long(0)); + data.extend_from_slice(&encode_avro_int(123)); + let mut cur = AvroCursor::new(&data); + dec.decode(&mut cur).unwrap(); + assert_eq!(cur.position(), data.len()); + let arr = dec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::().unwrap(); + let id = s + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id.len(), 1); + assert_eq!(id.value(0), 123); + } + + #[test] + fn test_skip_writer_nullable_field_union_nullfirst() { + let mut dec = make_record_resolved_decoder( + &[("id", DataType::Int32, false)], + vec![None, Some(0)], + vec![ + Some(super::Skipper::Nullable( + Nullability::NullFirst, + Box::new(super::Skipper::Int32), + )), + None, + ], + ); + let mut row1 = Vec::new(); + row1.extend_from_slice(&encode_avro_long(0)); + row1.extend_from_slice(&encode_avro_int(5)); + let mut row2 = Vec::new(); + row2.extend_from_slice(&encode_avro_long(1)); + row2.extend_from_slice(&encode_avro_int(123)); + row2.extend_from_slice(&encode_avro_int(7)); + let mut cur1 = AvroCursor::new(&row1); + let mut cur2 = AvroCursor::new(&row2); + dec.decode(&mut cur1).unwrap(); + dec.decode(&mut cur2).unwrap(); + assert_eq!(cur1.position(), row1.len()); + assert_eq!(cur2.position(), row2.len()); + let arr = dec.flush(None).unwrap(); + let s = arr.as_any().downcast_ref::().unwrap(); + let id = s + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id.len(), 2); + assert_eq!(id.value(0), 5); + assert_eq!(id.value(1), 7); + } } From 1ed7efad0f190a07d55c8319a04c7ce5e982bbd9 Mon Sep 17 00:00:00 2001 From: Connor Sanders Date: Tue, 26 Aug 2025 16:09:26 -0500 Subject: [PATCH 2/3] Address PR Comments --- arrow-avro/src/codec.rs | 35 +++++++++++++++ arrow-avro/src/reader/mod.rs | 50 ++++++++++++++++++++++ arrow-avro/src/reader/record.rs | 26 ++++++----- arrow-avro/test/data/skippable_types.avro | Bin 0 -> 3234 bytes 4 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 arrow-avro/test/data/skippable_types.avro diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index dad2a5e42464..bf2ee6deab6d 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -851,6 +851,41 @@ impl<'a> Maker<'a> { (Schema::Union(writer_variants), Schema::Union(reader_variants)) => { self.resolve_nullable_union(writer_variants, reader_variants, namespace) } + // if both sides are the same complex kind (non-record), adopt the reader type. + // This aligns with Avro spec: arrays, maps, and enums resolve recursively; + // for identical shapes we can just parse the reader schema. + (Schema::Complex(ComplexType::Array(_)), Schema::Complex(ComplexType::Array(_))) + | (Schema::Complex(ComplexType::Map(_)), Schema::Complex(ComplexType::Map(_))) + | (Schema::Complex(ComplexType::Fixed(_)), Schema::Complex(ComplexType::Fixed(_))) + | (Schema::Complex(ComplexType::Enum(_)), Schema::Complex(ComplexType::Enum(_))) => { + self.parse_type(reader_schema, namespace) + } + // Named-type references (equal on both sides) – parse reader side. + (Schema::TypeName(TypeName::Ref(_)), Schema::TypeName(TypeName::Ref(_))) + | ( + Schema::Type(Type { + r#type: TypeName::Ref(_), + .. + }), + Schema::Type(Type { + r#type: TypeName::Ref(_), + .. + }), + ) + | ( + Schema::TypeName(TypeName::Ref(_)), + Schema::Type(Type { + r#type: TypeName::Ref(_), + .. + }), + ) + | ( + Schema::Type(Type { + r#type: TypeName::Ref(_), + .. + }), + Schema::TypeName(TypeName::Ref(_)), + ) => self.parse_type(reader_schema, namespace), _ => Err(ArrowError::NotYetImplemented( "Other resolutions not yet implemented".to_string(), )), diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 41aa01e6da4d..5df1a766ad45 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -1615,6 +1615,56 @@ mod test { assert_eq!(batch, expected); } + #[test] + fn test_skippable_types_project_each_field_individually() { + let path = "test/data/skippable_types.avro"; + let full = read_file(path, 1024, false); + let schema_full = full.schema(); + let num_rows = full.num_rows(); + let writer_json = load_writer_schema_json(path); + assert_eq!( + writer_json["type"], "record", + "writer schema must be a record" + ); + let fields_json = writer_json + .get("fields") + .and_then(|f| f.as_array()) + .expect("record has fields"); + assert_eq!( + schema_full.fields().len(), + fields_json.len(), + "full read column count vs writer fields" + ); + for (idx, f) in fields_json.iter().enumerate() { + let name = f + .get("name") + .and_then(|n| n.as_str()) + .unwrap_or_else(|| panic!("field at index {idx} has no name")); + let reader_schema = make_reader_schema_with_selected_fields_in_order(path, &[name]); + let projected = read_alltypes_with_reader_schema(path, reader_schema); + assert_eq!( + projected.num_columns(), + 1, + "projected batch should contain exactly the selected column '{name}'" + ); + assert_eq!( + projected.num_rows(), + num_rows, + "row count mismatch for projected column '{name}'" + ); + let field = schema_full.field(idx).clone(); + let col = full.column(idx).clone(); + let expected = + RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![col]).unwrap(); + // Equality means: (1) read the right column values; and (2) all other + // writer fields were skipped correctly for this projection (no misalignment). + assert_eq!( + projected, expected, + "projected column '{name}' mismatch vs full read column" + ); + } + } + #[test] fn test_read_zero_byte_avro_file() { let batch = read_file("test/data/zero_byte.avro", 3, false); diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index e76122be61ad..3266021ce75e 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -392,7 +392,7 @@ impl Decoder { } } (Codec::Map(child), _) => { - let val_field = child.field_with_name("value").with_nullable(true); + let val_field = child.field_with_name("value"); let map_field = Arc::new(ArrowField::new( "entries", DataType::Struct(Fields::from(vec![ @@ -683,14 +683,16 @@ impl Decoder { ))); } } - let entries_struct = StructArray::new( - Fields::from(vec![ - Arc::new(ArrowField::new("key", DataType::Utf8, false)), - Arc::new(ArrowField::new("value", val_arr.data_type().clone(), true)), - ]), - vec![Arc::new(key_arr), val_arr], - None, - ); + let entries_fields = match map_field.data_type() { + DataType::Struct(fields) => fields.clone(), + other => { + return Err(ArrowError::InvalidArgumentError(format!( + "Map entries field must be a Struct, got {other:?}" + ))) + } + }; + let entries_struct = + StructArray::new(entries_fields, vec![Arc::new(key_arr), val_arr], None); let map_arr = MapArray::new(map_field.clone(), moff, entries_struct, nulls, false); Arc::new(map_arr) } @@ -858,7 +860,11 @@ fn sign_extend_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { Ok(arr) } -/// Lightweight skipping decoder for writer-only fields +/// Lightweight skipper for non‑projected writer fields +/// (fields present in the writer schema but omitted by the reader/projection); +/// per Avro 1.11.1 schema resolution these fields are ignored. +/// +/// #[derive(Debug)] enum Skipper { Null, diff --git a/arrow-avro/test/data/skippable_types.avro b/arrow-avro/test/data/skippable_types.avro new file mode 100644 index 0000000000000000000000000000000000000000..b0518e0056b5ae3ecea7bb50e6a474fe676e9b82 GIT binary patch literal 3234 zcmb_eU1%It6ux^iS=O|qYOAk?g;F0nb!K*Uc2|VPG_eVYmYAXtmgUaOopj9X%rdjv zW)lKy#bWS5i!~?(eNhoXklKf$V1s=r2u1M0VDQ1d^hvEsD|ur9vtiHsg7e&V#ih6 z2T^U)uE#Z*Fsfxa_Cj0@q)|P?a>xKYUmVrcod!!gjv2_Kx@xmdy|}6`j%s>!*BlgS zAdl+mq#pO*Ef#RVc($AcQzI7C2TBM=+0_W&b+bwO0;pFf)!H2b2IPAWH$)x~vtce#Jy!lMwf=eUR6PX-q5vY#T!y-d zLbPUVQq3x{DuX+#3mvproZz-Sc&M?dHa*j^eK+8ewF~%^2D|*%*-Qqp21zNsNh$wJ zV2y^E#BO_Z;;z81ue+$-s3sH&j(h8L#IYRLPpDZ{9n0T$+1aT{5FMSJntBFaCZ0Y% zl`Z#i*>FjnQJ&x{))bR%8{c2ME2MZ8tJ zUi=!E&-#}Yki>RtIg(by+++8k9q-9`-;SC4Fj0H9eCrj*s07(3t;|@Zy3?LW-3%)% z@G$6VzBl?J1VC@F_&YqbH%w9{{SDLMx0}&_+h2Vp4rYJ_8O}&T7bdWL$H|UL`V&CU0IK6>TW^!WL++-oD3hsVh^GcyuN~f-mquyuz z;^Aq77E~=KsJyJvBKyiyUR4SjFEJ}3>A50R6qI6Cn0+qoLMV;7G~zJ9OIXECG&*X+ z9P%PUa~$Sek>gwmaonPa!5_CMA;k6ChXH^Lq13lqt*>X;%jMN8=k|Xxb72mljZ4f| z>(dX+NKMbVX{aI}}+C=6M#t&v3 z+iD`5S*EUonLpfXi_kV>V9(zn4J`7F!|kXyM(!UI#*dA9@E<@&J!bV5TzvK zHC=^(IuE{@VgWDV7ts5A+Zo7kp1zvGPrfmPCpI~Jgf$JpSjQ^D8s>J=C Date: Sat, 30 Aug 2025 14:45:38 -0500 Subject: [PATCH 3/3] cleaned up `skip_blocks` method. --- arrow-avro/src/reader/record.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index e6124e88cbc7..e219efabb937 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -756,9 +756,8 @@ enum NegativeBlockBehavior { #[inline] fn skip_blocks( - buf: &mut AvroCursor<'_>, - mut skip_item: impl FnMut(&mut AvroCursor<'_>) -> Result<(), ArrowError>, - _skip_negative_block_by_size: bool, + buf: &mut AvroCursor, + mut skip_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, ) -> Result { process_blockwise( buf, @@ -777,16 +776,22 @@ fn read_blocks( #[inline] fn process_blockwise( - buf: &mut AvroCursor<'_>, - mut on_item: impl FnMut(&mut AvroCursor<'_>) -> Result<(), ArrowError>, + buf: &mut AvroCursor, + mut on_item: impl FnMut(&mut AvroCursor) -> Result<(), ArrowError>, negative_behavior: NegativeBlockBehavior, ) -> Result { let mut total = 0usize; loop { + // Read the block count + // positive = that many items + // negative = that many items + read block size + // See: https://avro.apache.org/docs/1.11.1/specification/#maps let block_count = buf.get_long()?; match block_count.cmp(&0) { Ordering::Equal => break, Ordering::Less => { + // If block_count is negative, read the absolute value of count, + // then read the block size as a long and discard let count = (-block_count) as usize; // A negative count is followed by a long of the size in bytes let size_in_bytes = buf.get_long()? as usize; @@ -805,6 +810,7 @@ fn process_blockwise( total += count; } Ordering::Greater => { + // If block_count is positive, decode that many items let count = block_count as usize; for _ in 0..count { on_item(buf)?; @@ -978,18 +984,14 @@ impl Skipper { Ok(()) } Self::List(item) => { - skip_blocks(buf, |c| item.skip(c), true)?; + skip_blocks(buf, |c| item.skip(c))?; Ok(()) } Self::Map(value) => { - skip_blocks( - buf, - |c| { - c.get_bytes()?; // key - value.skip(c) - }, - true, - )?; + skip_blocks(buf, |c| { + c.get_bytes()?; // key + value.skip(c) + })?; Ok(()) } Self::Struct(fields) => {