Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,41 @@ impl<'a> Maker<'a> {
(Schema::Union(writer_variants), Schema::Union(reader_variants)) => {
self.resolve_nullable_union(writer_variants, reader_variants, namespace)
}
// if both sides are the same complex kind (non-record), adopt the reader type.
// This aligns with Avro spec: arrays, maps, and enums resolve recursively;
// for identical shapes we can just parse the reader schema.
(Schema::Complex(ComplexType::Array(_)), Schema::Complex(ComplexType::Array(_)))
| (Schema::Complex(ComplexType::Map(_)), Schema::Complex(ComplexType::Map(_)))
| (Schema::Complex(ComplexType::Fixed(_)), Schema::Complex(ComplexType::Fixed(_)))
| (Schema::Complex(ComplexType::Enum(_)), Schema::Complex(ComplexType::Enum(_))) => {
self.parse_type(reader_schema, namespace)
}
// Named-type references (equal on both sides) – parse reader side.
(Schema::TypeName(TypeName::Ref(_)), Schema::TypeName(TypeName::Ref(_)))
| (
Schema::Type(Type {
r#type: TypeName::Ref(_),
..
}),
Schema::Type(Type {
r#type: TypeName::Ref(_),
..
}),
)
| (
Schema::TypeName(TypeName::Ref(_)),
Schema::Type(Type {
r#type: TypeName::Ref(_),
..
}),
)
| (
Schema::Type(Type {
r#type: TypeName::Ref(_),
..
}),
Schema::TypeName(TypeName::Ref(_)),
) => self.parse_type(reader_schema, namespace),
_ => Err(ArrowError::NotYetImplemented(
"Other resolutions not yet implemented".to_string(),
)),
Expand Down Expand Up @@ -955,7 +990,7 @@ impl<'a> Maker<'a> {
// Prepare outputs
let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
//let mut skip_fields: Vec<Option<AvroDataType>> = vec![None; writer_record.fields.len()];
let mut skip_fields: Vec<Option<AvroDataType>> = vec![None; writer_record.fields.len()];
//let mut default_fields: Vec<usize> = Vec::new();
// Build reader fields and mapping
for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
Expand All @@ -975,6 +1010,14 @@ impl<'a> Maker<'a> {
));
}
}
// Any writer fields not mapped should be skipped
for (writer_idx, writer_field) in writer_record.fields.iter().enumerate() {
if writer_to_reader[writer_idx].is_none() {
// Parse writer field type to know how to skip data
let writer_dt = self.parse_type(&writer_field.r#type, writer_ns)?;
skip_fields[writer_idx] = Some(writer_dt);
}
}
// Implement writer-only fields to skip in Follow-up PR here
// Build resolved record AvroDataType
let resolved = AvroDataType::new_with_resolution(
Expand All @@ -984,7 +1027,7 @@ impl<'a> Maker<'a> {
Some(ResolutionInfo::Record(ResolvedRecord {
writer_to_reader: Arc::from(writer_to_reader),
default_fields: Arc::default(),
skip_fields: Arc::default(),
skip_fields: Arc::from(skip_fields),
})),
);
// Register a resolved record by reader name+namespace for potential named type refs
Expand Down
130 changes: 129 additions & 1 deletion arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,12 +863,39 @@ mod test {
.with_reader_schema(reader_schema)
.build(BufReader::new(file))
.unwrap();

let schema = reader.schema();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
arrow::compute::concat_batches(&schema, &batches).unwrap()
}

fn make_reader_schema_with_selected_fields_in_order(
path: &str,
selected: &[&str],
) -> AvroSchema {
let mut root = load_writer_schema_json(path);
assert_eq!(root["type"], "record", "writer schema must be a record");
let writer_fields = root
.get("fields")
.and_then(|f| f.as_array())
.expect("record has fields");
let mut field_map: HashMap<String, Value> = HashMap::with_capacity(writer_fields.len());
for f in writer_fields {
if let Some(name) = f.get("name").and_then(|n| n.as_str()) {
field_map.insert(name.to_string(), f.clone());
}
}
let mut new_fields = Vec::with_capacity(selected.len());
for name in selected {
let f = field_map
.get(*name)
.unwrap_or_else(|| panic!("field '{name}' not found in writer schema"))
.clone();
new_fields.push(f);
}
root["fields"] = Value::Array(new_fields);
AvroSchema::new(root.to_string())
}

#[test]
fn test_alltypes_schema_promotion_mixed() {
let files = [
Expand Down Expand Up @@ -1537,6 +1564,107 @@ mod test {
assert!(batch.column(0).as_any().is::<StringViewArray>());
}

#[test]
fn test_alltypes_skip_writer_fields_keep_double_only() {
let file = arrow_test_data("avro/alltypes_plain.avro");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% They are solid files.

let reader_schema =
make_reader_schema_with_selected_fields_in_order(&file, &["double_col"]);
let batch = read_alltypes_with_reader_schema(&file, reader_schema);
let expected = RecordBatch::try_from_iter_with_nullable([(
"double_col",
Arc::new(Float64Array::from_iter_values(
(0..8).map(|x| (x % 2) as f64 * 10.1),
)) as _,
true,
)])
.unwrap();
assert_eq!(batch, expected);
}

#[test]
fn test_alltypes_skip_writer_fields_reorder_and_skip_many() {
let file = arrow_test_data("avro/alltypes_plain.avro");
let reader_schema =
make_reader_schema_with_selected_fields_in_order(&file, &["timestamp_col", "id"]);
let batch = read_alltypes_with_reader_schema(&file, reader_schema);
let expected = RecordBatch::try_from_iter_with_nullable([
(
"timestamp_col",
Arc::new(
TimestampMicrosecondArray::from_iter_values([
1235865600000000, // 2009-03-01T00:00:00.000
1235865660000000, // 2009-03-01T00:01:00.000
1238544000000000, // 2009-04-01T00:00:00.000
1238544060000000, // 2009-04-01T00:01:00.000
1233446400000000, // 2009-02-01T00:00:00.000
1233446460000000, // 2009-02-01T00:01:00.000
1230768000000000, // 2009-01-01T00:00:00.000
1230768060000000, // 2009-01-01T00:01:00.000
])
.with_timezone("+00:00"),
) as _,
true,
),
(
"id",
Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
true,
),
])
.unwrap();
assert_eq!(batch, expected);
}

#[test]
fn test_skippable_types_project_each_field_individually() {
let path = "test/data/skippable_types.avro";
let full = read_file(path, 1024, false);
let schema_full = full.schema();
let num_rows = full.num_rows();
let writer_json = load_writer_schema_json(path);
assert_eq!(
writer_json["type"], "record",
"writer schema must be a record"
);
let fields_json = writer_json
.get("fields")
.and_then(|f| f.as_array())
.expect("record has fields");
assert_eq!(
schema_full.fields().len(),
fields_json.len(),
"full read column count vs writer fields"
);
for (idx, f) in fields_json.iter().enumerate() {
let name = f
.get("name")
.and_then(|n| n.as_str())
.unwrap_or_else(|| panic!("field at index {idx} has no name"));
let reader_schema = make_reader_schema_with_selected_fields_in_order(path, &[name]);
let projected = read_alltypes_with_reader_schema(path, reader_schema);
assert_eq!(
projected.num_columns(),
1,
"projected batch should contain exactly the selected column '{name}'"
);
assert_eq!(
projected.num_rows(),
num_rows,
"row count mismatch for projected column '{name}'"
);
let field = schema_full.field(idx).clone();
let col = full.column(idx).clone();
let expected =
RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![col]).unwrap();
// Equality means: (1) read the right column values; and (2) all other
// writer fields were skipped correctly for this projection (no misalignment).
assert_eq!(
projected, expected,
"projected column '{name}' mismatch vs full read column"
);
}
}

#[test]
fn test_read_zero_byte_avro_file() {
let batch = read_file("test/data/zero_byte.avro", 3, false);
Expand Down
Loading
Loading