From 72d04a713bf6a4bb0424be8050748b703fa79f25 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 29 Jul 2025 15:56:54 -0700 Subject: [PATCH 01/13] use field name to find field pos when field id is unavailable --- crates/iceberg/src/arrow/value.rs | 13 +++++++++++-- crates/iceberg/src/spec/schema/visitor.rs | 4 ++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 993e927145..c604bcd55a 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -429,7 +429,7 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { pub struct ArrowArrayAccessor; impl PartnerAccessor for ArrowArrayAccessor { - fn struct_parner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { if !matches!(schema_partner.data_type(), DataType::Struct(_)) { return Err(Error::new( ErrorKind::DataInvalid, @@ -463,10 +463,19 @@ impl PartnerAccessor for ArrowArrayAccessor { .map(|id| id == field.id) .unwrap_or(false) }) + .or_else(|| { + struct_array + .fields() + .iter() + .position(|arrow_field| arrow_field.name().clone() == field.name) + }) .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!("Field id {} not found in struct array", field.id), + format!( + "Field with id={} or name={} not found in struct array", + field.id, field.name + ), ) })?; diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index ebb9b86bba..50f7c04caa 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -190,7 +190,7 @@ pub trait SchemaWithPartnerVisitor

{ /// Accessor used to get child partner from parent partner. pub trait PartnerAccessor

{ /// Get the struct partner from schema partner. - fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; + fn struct_partner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; /// Get the field partner from struct partner. fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>; /// Get the list element partner from list partner. @@ -274,7 +274,7 @@ pub fn visit_schema_with_partner, A: PartnerAc ) -> Result { let result = visit_struct_with_partner( &schema.r#struct, - accessor.struct_parner(partner)?, + accessor.struct_partner(partner)?, visitor, accessor, )?; From 091804043089fdef60e4726ea285d8d6f6264fc8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 29 Jul 2025 16:41:55 -0700 Subject: [PATCH 02/13] add ut --- crates/iceberg/src/arrow/value.rs | 83 +++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index c604bcd55a..6635249bf8 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -908,6 +908,89 @@ mod test { assert_eq!(result, vec![None; 0]); } + #[test] + fn test_field_name_fallback_when_id_unavailable() { + // Create an Arrow struct array with fields that don't have field IDs in metadata + let int32_array = Int32Array::from(vec![Some(1), Some(2), None]); + let string_array = StringArray::from(vec![Some("hello"), Some("world"), None]); + + let struct_array = + Arc::new(StructArray::from(vec![ + ( + // Field without field ID metadata - should fallback to name matching + Arc::new(Field::new("field_a", DataType::Int32, true)), + Arc::new(int32_array) as ArrayRef, + ), + ( + // Field with wrong field ID metadata - should fallback to name matching + Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "999".to_string())]), + )), + Arc::new(string_array) as ArrayRef, + ), + ])) as ArrayRef; + + // Create Iceberg struct type with field IDs that don't match the Arrow metadata + let iceberg_struct_type = StructType::new(vec![ + Arc::new(NestedField::optional( + 1, // Different ID than what's in Arrow metadata (or no metadata) + "field_a", // Same name as Arrow field + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, // Different ID than what's in Arrow metadata (999) + "field_b", // Same name as Arrow field + Type::Primitive(PrimitiveType::String), + )), + ]); + + // This should succeed by falling back to field name matching + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); + + assert_eq!(result, vec![ + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(1)), + Some(Literal::string("hello".to_string())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(2)), + Some(Literal::string("world".to_string())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![None, None,]))), + ]); + } + + #[test] + fn test_field_not_found_error() { + // Test that we get an appropriate error when neither field ID nor name matches + + let int32_array = Int32Array::from(vec![Some(1), Some(2)]); + + let struct_array = Arc::new(StructArray::from(vec![( + Arc::new(Field::new("arrow_field_name", DataType::Int32, true)), + Arc::new(int32_array) as ArrayRef, + )])) as ArrayRef; + + // Create Iceberg struct type with field that doesn't match by ID or name + let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional( + 10, + "different_field_name", // Different name than Arrow field + Type::Primitive(PrimitiveType::Int), + ))]); + + // This should fail with an appropriate error message + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type); + + assert!(result.is_err()); + let error = result.unwrap_err(); + assert_eq!(error.kind(), ErrorKind::DataInvalid); + assert!( + error.message().contains( + "Field with id=10 or name=different_field_name not found in struct array" + ) + ); + } + #[test] fn test_complex_nested() { // complex nested type for test From c6bc506f77fa7f3efc75ef6b86662d82766fbb36 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 30 Jul 2025 16:35:41 -0700 Subject: [PATCH 03/13] lol --- crates/iceberg/src/arrow/value.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 6635249bf8..6d25cbb7b0 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -459,15 +459,9 @@ impl PartnerAccessor for ArrowArrayAccessor { .fields() .iter() .position(|arrow_field| { + // match by ID if available, otherwise try matching by name get_field_id(arrow_field) - .map(|id| id == field.id) - .unwrap_or(false) - }) - .or_else(|| { - struct_array - .fields() - .iter() - .position(|arrow_field| arrow_field.name().clone() == field.name) + .map_or(arrow_field.name() == &field.name, |id| id == field.id) }) .ok_or_else(|| { Error::new( @@ -922,9 +916,9 @@ mod test { Arc::new(int32_array) as ArrayRef, ), ( - // Field with wrong field ID metadata - should fallback to name matching + // Field with the correct field ID metadata Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "999".to_string())]), + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), )), Arc::new(string_array) as ArrayRef, ), @@ -938,7 +932,7 @@ mod test { Type::Primitive(PrimitiveType::Int), )), Arc::new(NestedField::optional( - 2, // Different ID than what's in Arrow metadata (999) + 2, // Same ID "field_b", // Same name as Arrow field Type::Primitive(PrimitiveType::String), )), From 8f79616c226d2a7d5dab685ef9ad544e6eef70f7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 1 Aug 2025 16:35:53 -0700 Subject: [PATCH 04/13] Update crates/iceberg/src/arrow/value.rs Co-authored-by: Florian Valeye --- crates/iceberg/src/arrow/value.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 6d25cbb7b0..5bfca9d92c 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -467,8 +467,15 @@ impl PartnerAccessor for ArrowArrayAccessor { Error::new( ErrorKind::DataInvalid, format!( - "Field with id={} or name={} not found in struct array", - field.id, field.name + "Field with id={} or name={} not found in struct array. Available fields: [{}]", + field.id, + field.name, + struct_array + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect::>() + .join(", ") ), ) })?; From 1b2dbcb1d44499ac703c508faa7be11c553f9480 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 4 Aug 2025 15:55:31 -0700 Subject: [PATCH 05/13] having fun with schema --- .../iceberg/src/arrow/nan_val_cnt_visitor.rs | 3 +- crates/iceberg/src/arrow/value.rs | 141 ++++++++---------- 2 files changed, 65 insertions(+), 79 deletions(-) diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index 6b75c011cb..9616300f13 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -156,7 +156,8 @@ impl NanValueCountVisitor { /// Compute nan value counts in given schema and record batch pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> { - let arrow_arr_partner_accessor = ArrowArrayAccessor {}; + let arrow_arr_partner_accessor = + ArrowArrayAccessor::new_with_table_schema(schema.as_ref())?; let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef; visit_struct_with_partner( diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 5bfca9d92c..e4a6c105f9 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -21,12 +21,12 @@ use arrow_array::{ LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; -use arrow_schema::DataType; +use arrow_schema::{DataType, Schema as ArrowSchema}; use uuid::Uuid; -use super::get_field_id; +use super::{get_field_id, schema_to_arrow_schema}; use crate::spec::{ - ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType, + ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType, Schema, SchemaWithPartnerVisitor, Struct, StructType, visit_struct_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -426,7 +426,24 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { } /// Partner type representing accessing and walking arrow arrays alongside iceberg schema -pub struct ArrowArrayAccessor; +pub struct ArrowArrayAccessor { + arrow_schema: Option, +} + +impl ArrowArrayAccessor { + /// Creates a new instance of ArrowArrayAccessor without arrow schema fallback + pub fn new() -> Result { + Ok(Self { arrow_schema: None }) + } + + /// Creates a new instance of ArrowArrayAccessor with arrow schema converted from table schema + /// for field ID resolution fallback + pub fn new_with_table_schema(table_schema: &Schema) -> Result { + Ok(Self { + arrow_schema: Some(schema_to_arrow_schema(table_schema)?), + }) + } +} impl PartnerAccessor for ArrowArrayAccessor { fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { @@ -459,9 +476,13 @@ impl PartnerAccessor for ArrowArrayAccessor { .fields() .iter() .position(|arrow_field| { - // match by ID if available, otherwise try matching by name - get_field_id(arrow_field) - .map_or(arrow_field.name() == &field.name, |id| id == field.id) + get_field_id(arrow_field).map_or(false, |id| id == field.id) + || self + .arrow_schema + .as_ref() + .and_then(|schema| schema.field_with_name(&field.name).ok()) + .and_then(|field_from_schema| get_field_id(field_from_schema).ok()) + .map_or(false, |id| id == field.id) }) .ok_or_else(|| { Error::new( @@ -559,7 +580,7 @@ pub fn arrow_struct_to_literal( ty, struct_array, &mut ArrowArrayToIcebergStructConverter, - &ArrowArrayAccessor, + &ArrowArrayAccessor::new()?, ) } @@ -910,86 +931,50 @@ mod test { } #[test] - fn test_field_name_fallback_when_id_unavailable() { - // Create an Arrow struct array with fields that don't have field IDs in metadata - let int32_array = Int32Array::from(vec![Some(1), Some(2), None]); - let string_array = StringArray::from(vec![Some("hello"), Some("world"), None]); + fn test_field_id_fallback_with_arrow_schema() { + // Create an Arrow struct array with a field that doesn't have field ID in metadata + let int32_array = Int32Array::from(vec![Some(42), Some(43), None]); - let struct_array = - Arc::new(StructArray::from(vec![ - ( - // Field without field ID metadata - should fallback to name matching - Arc::new(Field::new("field_a", DataType::Int32, true)), - Arc::new(int32_array) as ArrayRef, - ), - ( - // Field with the correct field ID metadata - Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), - )), - Arc::new(string_array) as ArrayRef, - ), - ])) as ArrayRef; + // Create the struct array with a field that has no field ID metadata + let struct_array = Arc::new(StructArray::from(vec![( + Arc::new(Field::new("field_a", DataType::Int32, true)), // No field ID metadata + Arc::new(int32_array) as ArrayRef, + )])) as ArrayRef; - // Create Iceberg struct type with field IDs that don't match the Arrow metadata - let iceberg_struct_type = StructType::new(vec![ - Arc::new(NestedField::optional( - 1, // Different ID than what's in Arrow metadata (or no metadata) - "field_a", // Same name as Arrow field + // Create an Iceberg schema with field ID + let iceberg_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![Arc::new(NestedField::optional( + 100, // Field ID that we'll look for + "field_a", Type::Primitive(PrimitiveType::Int), - )), - Arc::new(NestedField::optional( - 2, // Same ID - "field_b", // Same name as Arrow field - Type::Primitive(PrimitiveType::String), - )), - ]); - - // This should succeed by falling back to field name matching - let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); - - assert_eq!(result, vec![ - Some(Literal::Struct(Struct::from_iter(vec![ - Some(Literal::int(1)), - Some(Literal::string("hello".to_string())), - ]))), - Some(Literal::Struct(Struct::from_iter(vec![ - Some(Literal::int(2)), - Some(Literal::string("world".to_string())), - ]))), - Some(Literal::Struct(Struct::from_iter(vec![None, None,]))), - ]); - } + ))]) + .build() + .unwrap(); - #[test] - fn test_field_not_found_error() { - // Test that we get an appropriate error when neither field ID nor name matches + // Create an ArrowArrayAccessor with the table schema for fallback + let accessor = ArrowArrayAccessor::new_with_table_schema(&iceberg_schema).unwrap(); - let int32_array = Int32Array::from(vec![Some(1), Some(2)]); + // Create a nested field to look up + let field = NestedField::optional(100, "field_a", Type::Primitive(PrimitiveType::Int)); - let struct_array = Arc::new(StructArray::from(vec![( - Arc::new(Field::new("arrow_field_name", DataType::Int32, true)), - Arc::new(int32_array) as ArrayRef, - )])) as ArrayRef; + // This should succeed by using the arrow_schema fallback + let result = accessor.field_partner(&struct_array, &field); - // Create Iceberg struct type with field that doesn't match by ID or name - let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional( - 10, - "different_field_name", // Different name than Arrow field - Type::Primitive(PrimitiveType::Int), - ))]); + // Verify that the field was found + assert!(result.is_ok()); - // This should fail with an appropriate error message - let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type); + // Verify that the field has the expected value + let array_ref = result.unwrap(); + let int_array = array_ref.as_any().downcast_ref::().unwrap(); + assert_eq!(int_array.value(0), 42); + assert_eq!(int_array.value(1), 43); + assert!(int_array.is_null(2)); + // Now try with an accessor without arrow_schema - this should fail + let accessor_without_schema = ArrowArrayAccessor::new().unwrap(); + let result = accessor_without_schema.field_partner(&struct_array, &field); assert!(result.is_err()); - let error = result.unwrap_err(); - assert_eq!(error.kind(), ErrorKind::DataInvalid); - assert!( - error.message().contains( - "Field with id=10 or name=different_field_name not found in struct array" - ) - ); } #[test] From 3dc59c21e32bc6643da294858abec06cff26d51b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 4 Aug 2025 16:40:09 -0700 Subject: [PATCH 06/13] clippy is strict --- crates/iceberg/src/arrow/value.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index e4a6c105f9..aa70d29c96 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -476,13 +476,12 @@ impl PartnerAccessor for ArrowArrayAccessor { .fields() .iter() .position(|arrow_field| { - get_field_id(arrow_field).map_or(false, |id| id == field.id) + get_field_id(arrow_field).is_ok_and(|id| id == field.id) || self - .arrow_schema - .as_ref() - .and_then(|schema| schema.field_with_name(&field.name).ok()) - .and_then(|field_from_schema| get_field_id(field_from_schema).ok()) - .map_or(false, |id| id == field.id) + .arrow_schema + .as_ref() + .and_then(|schema| schema.field_with_name(&field.name).ok()) + .and_then(|field_from_schema| get_field_id(field_from_schema).ok()) == Some(field.id) }) .ok_or_else(|| { Error::new( From 78181265c08eecc70c65930f840af81764487bd5 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 4 Aug 2025 21:46:20 -0700 Subject: [PATCH 07/13] I write bugs, I fix bugs --- crates/iceberg/src/arrow/value.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index aa70d29c96..f6c5a84c5a 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -443,6 +443,21 @@ impl ArrowArrayAccessor { arrow_schema: Some(schema_to_arrow_schema(table_schema)?), }) } + + /// Check if an arrow field matches the target field ID, either directly or through schema lookup + fn arrow_field_matches_id(&self, arrow_field: &arrow_schema::Field, target_id: i32) -> bool { + // First try direct match via field metadata + if let Ok(id) = get_field_id(arrow_field) { + id == target_id + } else { + // Only if direct match fails, try fallback via schema lookup + self.arrow_schema + .as_ref() + .and_then(|schema| schema.field_with_name(arrow_field.name()).ok()) + .and_then(|field_from_schema| get_field_id(field_from_schema).ok()) + .is_some_and(|id| id == target_id) + } + } } impl PartnerAccessor for ArrowArrayAccessor { @@ -468,21 +483,17 @@ impl PartnerAccessor for ArrowArrayAccessor { .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - "The struct partner is not a struct array", + format!( + "The struct partner is not a struct array, partner: {:?}", + struct_partner + ), ) })?; let field_pos = struct_array .fields() .iter() - .position(|arrow_field| { - get_field_id(arrow_field).is_ok_and(|id| id == field.id) - || self - .arrow_schema - .as_ref() - .and_then(|schema| schema.field_with_name(&field.name).ok()) - .and_then(|field_from_schema| get_field_id(field_from_schema).ok()) == Some(field.id) - }) + .position(|arrow_field| self.arrow_field_matches_id(arrow_field, field.id)) .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, From 8729155cc4e09c16f8078be78d8c78e9f2805e30 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 22:02:37 -0700 Subject: [PATCH 08/13] Update crates/iceberg/src/spec/schema/visitor.rs Co-authored-by: Renjie Liu --- crates/iceberg/src/spec/schema/visitor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index 50f7c04caa..d4d3d484f3 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -192,7 +192,7 @@ pub trait PartnerAccessor

{ /// Get the struct partner from schema partner. fn struct_partner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; /// Get the field partner from struct partner. - fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>; + fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField, field_pos: usize) -> Result<&'a P>; /// Get the list element partner from list partner. fn list_element_partner<'a>(&self, list_partner: &'a P) -> Result<&'a P>; /// Get the map key partner from map partner. From 54bc5df49b7474076be879c1909d8a6e54835585 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 12 Aug 2025 09:55:54 -0700 Subject: [PATCH 09/13] Revert "Update crates/iceberg/src/spec/schema/visitor.rs" This reverts commit 8729155cc4e09c16f8078be78d8c78e9f2805e30. --- crates/iceberg/src/spec/schema/visitor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index d4d3d484f3..50f7c04caa 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -192,7 +192,7 @@ pub trait PartnerAccessor

{ /// Get the struct partner from schema partner. fn struct_partner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; /// Get the field partner from struct partner. - fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField, field_pos: usize) -> Result<&'a P>; + fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>; /// Get the list element partner from list partner. fn list_element_partner<'a>(&self, list_partner: &'a P) -> Result<&'a P>; /// Get the map key partner from map partner. From a33869d5edaa215d8d2b0998171bb3cd1f52953a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 12 Aug 2025 10:27:50 -0700 Subject: [PATCH 10/13] match mode --- crates/iceberg/src/arrow/value.rs | 64 ++++++++++++++----------------- 1 file changed, 28 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index f6c5a84c5a..c9f7f72839 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -21,7 +21,7 @@ use arrow_array::{ LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; -use arrow_schema::{DataType, Schema as ArrowSchema}; +use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema}; use uuid::Uuid; use super::{get_field_id, schema_to_arrow_schema}; @@ -425,38 +425,40 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { } } +/// todo doc +pub enum FieldMatchMode { + Id, + Name, +} + +impl FieldMatchMode { + pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool { + match self { + FieldMatchMode::Id => get_field_id(arrow_field) + .map(|id| id == iceberg_field.id) + .unwrap_or(false), + FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name, + } + } +} + /// Partner type representing accessing and walking arrow arrays alongside iceberg schema pub struct ArrowArrayAccessor { - arrow_schema: Option, + match_mode: FieldMatchMode, } impl ArrowArrayAccessor { /// Creates a new instance of ArrowArrayAccessor without arrow schema fallback - pub fn new() -> Result { - Ok(Self { arrow_schema: None }) + pub fn new() -> Self { + Self { + match_mode: FieldMatchMode::Id, + } } /// Creates a new instance of ArrowArrayAccessor with arrow schema converted from table schema /// for field ID resolution fallback - pub fn new_with_table_schema(table_schema: &Schema) -> Result { - Ok(Self { - arrow_schema: Some(schema_to_arrow_schema(table_schema)?), - }) - } - - /// Check if an arrow field matches the target field ID, either directly or through schema lookup - fn arrow_field_matches_id(&self, arrow_field: &arrow_schema::Field, target_id: i32) -> bool { - // First try direct match via field metadata - if let Ok(id) = get_field_id(arrow_field) { - id == target_id - } else { - // Only if direct match fails, try fallback via schema lookup - self.arrow_schema - .as_ref() - .and_then(|schema| schema.field_with_name(arrow_field.name()).ok()) - .and_then(|field_from_schema| get_field_id(field_from_schema).ok()) - .is_some_and(|id| id == target_id) - } + pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self { + Self { match_mode } } } @@ -493,21 +495,11 @@ impl PartnerAccessor for ArrowArrayAccessor { let field_pos = struct_array .fields() .iter() - .position(|arrow_field| self.arrow_field_matches_id(arrow_field, field.id)) + .position(|arrow_field| self.match_mode.match_field(arrow_field, field)) .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - format!( - "Field with id={} or name={} not found in struct array. Available fields: [{}]", - field.id, - field.name, - struct_array - .fields() - .iter() - .map(|f| f.name().as_str()) - .collect::>() - .join(", ") - ), + format!("Field id {} not found in struct array", field.id), ) })?; @@ -590,7 +582,7 @@ pub fn arrow_struct_to_literal( ty, struct_array, &mut ArrowArrayToIcebergStructConverter, - &ArrowArrayAccessor::new()?, + &ArrowArrayAccessor::new(), ) } From 3928ac5ac4ca19a374826903f83d4a32844e22e0 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 12 Aug 2025 12:53:56 -0700 Subject: [PATCH 11/13] enters match mode --- .../iceberg/src/arrow/nan_val_cnt_visitor.rs | 12 +- crates/iceberg/src/arrow/value.rs | 226 ++++++++++++++---- .../src/writer/file_writer/parquet_writer.rs | 27 ++- 3 files changed, 217 insertions(+), 48 deletions(-) diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index 9616300f13..e514457887 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -25,7 +25,7 @@ use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, StructArray use arrow_schema::DataType; use crate::Result; -use crate::arrow::ArrowArrayAccessor; +use crate::arrow::{ArrowArrayAccessor, FieldMatchMode}; use crate::spec::{ ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, visit_struct_with_partner, @@ -71,6 +71,7 @@ macro_rules! count_float_nans { pub struct NanValueCountVisitor { /// Stores field ID to NaN value count mapping pub nan_value_counts: HashMap, + match_mode: FieldMatchMode, } impl SchemaWithPartnerVisitor for NanValueCountVisitor { @@ -149,15 +150,20 @@ impl SchemaWithPartnerVisitor for NanValueCountVisitor { impl NanValueCountVisitor { /// Creates new instance of NanValueCountVisitor pub fn new() -> Self { + Self::new_with_match_mode(FieldMatchMode::Id) + } + + /// Creates new instance of NanValueCountVisitor with explicit match mode + pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self { Self { nan_value_counts: HashMap::new(), + match_mode, } } /// Compute nan value counts in given schema and record batch pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> { - let arrow_arr_partner_accessor = - ArrowArrayAccessor::new_with_table_schema(schema.as_ref())?; + let arrow_arr_partner_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode); let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef; visit_struct_with_partner( diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index c9f7f72839..a1747c846b 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -21,12 +21,12 @@ use arrow_array::{ LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; -use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema}; +use arrow_schema::{DataType, FieldRef}; use uuid::Uuid; -use super::{get_field_id, schema_to_arrow_schema}; +use super::get_field_id; use crate::spec::{ - ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType, Schema, + ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType, SchemaWithPartnerVisitor, Struct, StructType, visit_struct_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -425,13 +425,26 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { } } -/// todo doc +/// Defines how Arrow fields are matched with Iceberg fields when converting data. +/// +/// This enum provides two strategies for matching fields: +/// - `Id`: Match fields by their ID, which is stored in Arrow field metadata. +/// - `Name`: Match fields by their name, ignoring the field ID. +/// +/// The ID matching mode is the default and preferred approach as it's more robust +/// against schema evolution where field names might change but IDs remain stable. +/// The name matching mode can be useful in scenarios where field IDs are not available +/// or when working with systems that don't preserve field IDs. +#[derive(Clone, Copy, Debug)] pub enum FieldMatchMode { + /// Match fields by their ID stored in Arrow field metadata Id, + /// Match fields by their name, ignoring field IDs Name, } impl FieldMatchMode { + /// Determines if an Arrow field matches an Iceberg field based on the matching mode. pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool { match self { FieldMatchMode::Id => get_field_id(arrow_field) @@ -448,15 +461,14 @@ pub struct ArrowArrayAccessor { } impl ArrowArrayAccessor { - /// Creates a new instance of ArrowArrayAccessor without arrow schema fallback + /// Creates a new instance of ArrowArrayAccessor with the default ID matching mode pub fn new() -> Self { Self { match_mode: FieldMatchMode::Id, } } - /// Creates a new instance of ArrowArrayAccessor with arrow schema converted from table schema - /// for field ID resolution fallback + /// Creates a new instance of ArrowArrayAccessor with the specified matching mode pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self { Self { match_mode } } @@ -933,50 +945,180 @@ mod test { } #[test] - fn test_field_id_fallback_with_arrow_schema() { - // Create an Arrow struct array with a field that doesn't have field ID in metadata - let int32_array = Int32Array::from(vec![Some(42), Some(43), None]); - - // Create the struct array with a field that has no field ID metadata - let struct_array = Arc::new(StructArray::from(vec![( - Arc::new(Field::new("field_a", DataType::Int32, true)), // No field ID metadata - Arc::new(int32_array) as ArrayRef, - )])) as ArrayRef; - - // Create an Iceberg schema with field ID - let iceberg_schema = Schema::builder() - .with_schema_id(1) - .with_fields(vec![Arc::new(NestedField::optional( - 100, // Field ID that we'll look for - "field_a", - Type::Primitive(PrimitiveType::Int), - ))]) - .build() - .unwrap(); + fn test_find_field_by_id() { + // Create Arrow arrays for the nested structure + let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]); + let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]); + + // Create the nested struct array with field IDs in metadata + let nested_struct_array = + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]), + )), + Arc::new(field_a_array) as ArrayRef, + ), + ( + Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + )), + Arc::new(field_b_array) as ArrayRef, + ), + ])) as ArrayRef; - // Create an ArrowArrayAccessor with the table schema for fallback - let accessor = ArrowArrayAccessor::new_with_table_schema(&iceberg_schema).unwrap(); + let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]); - // Create a nested field to look up - let field = NestedField::optional(100, "field_a", Type::Primitive(PrimitiveType::Int)); + // Create the top-level struct array with field IDs in metadata + let struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new( + Field::new( + "nested_struct", + DataType::Struct(Fields::from(vec![ + Field::new("field_a", DataType::Int32, true).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )]), + ), + Field::new("field_b", DataType::Utf8, true).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )]), + ), + ])), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ), + nested_struct_array, + ), + ( + Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), + )), + Arc::new(field_c_array) as ArrayRef, + ), + ])) as ArrayRef; - // This should succeed by using the arrow_schema fallback - let result = accessor.field_partner(&struct_array, &field); + // Create an ArrowArrayAccessor with ID matching mode + let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id); - // Verify that the field was found - assert!(result.is_ok()); + // Test finding fields by ID + let nested_field = NestedField::optional( + 3, + "nested_struct", + Type::Struct(StructType::new(vec![ + Arc::new(NestedField::optional( + 1, + "field_a", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "field_b", + Type::Primitive(PrimitiveType::String), + )), + ])), + ); + let nested_partner = accessor + .field_partner(&struct_array, &nested_field) + .unwrap(); - // Verify that the field has the expected value - let array_ref = result.unwrap(); - let int_array = array_ref.as_any().downcast_ref::().unwrap(); + // Verify we can access the nested field + let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int)); + let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap(); + + // Verify the field has the expected value + let int_array = field_a_partner + .as_any() + .downcast_ref::() + .unwrap(); assert_eq!(int_array.value(0), 42); assert_eq!(int_array.value(1), 43); assert!(int_array.is_null(2)); + } + + #[test] + fn test_find_field_by_name() { + // Create Arrow arrays for the nested structure + let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]); + let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]); + + // Create the nested struct array WITHOUT field IDs in metadata + let nested_struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("field_a", DataType::Int32, true)), + Arc::new(field_a_array) as ArrayRef, + ), + ( + Arc::new(Field::new("field_b", DataType::Utf8, true)), + Arc::new(field_b_array) as ArrayRef, + ), + ])) as ArrayRef; + + let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]); + + // Create the top-level struct array WITHOUT field IDs in metadata + let struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new( + "nested_struct", + DataType::Struct(Fields::from(vec![ + Field::new("field_a", DataType::Int32, true), + Field::new("field_b", DataType::Utf8, true), + ])), + true, + )), + nested_struct_array, + ), + ( + Arc::new(Field::new("field_c", DataType::Int32, true)), + Arc::new(field_c_array) as ArrayRef, + ), + ])) as ArrayRef; + + // Create an ArrowArrayAccessor with Name matching mode + let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name); + + // Test finding fields by name + let nested_field = NestedField::optional( + 3, + "nested_struct", + Type::Struct(StructType::new(vec![ + Arc::new(NestedField::optional( + 1, + "field_a", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "field_b", + Type::Primitive(PrimitiveType::String), + )), + ])), + ); + let nested_partner = accessor + .field_partner(&struct_array, &nested_field) + .unwrap(); + + // Verify we can access the nested field by name + let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int)); + let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap(); - // Now try with an accessor without arrow_schema - this should fail - let accessor_without_schema = ArrowArrayAccessor::new().unwrap(); - let result = accessor_without_schema.field_partner(&struct_array, &field); - assert!(result.is_err()); + // Verify the field has the expected value + let int_array = field_a_partner + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(int_array.value(0), 42); + assert_eq!(int_array.value(1), 43); + assert!(int_array.is_null(2)); } #[test] diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index db410f47e3..4f65d64be4 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -37,8 +37,8 @@ use thrift::protocol::TOutputProtocol; use super::location_generator::{FileNameGenerator, LocationGenerator}; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ - ArrowFileReader, DEFAULT_MAP_FIELD_NAME, NanValueCountVisitor, get_parquet_stat_max_as_datum, - get_parquet_stat_min_as_datum, + ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, + get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ @@ -55,6 +55,7 @@ use crate::{Error, ErrorKind, Result}; pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef, + match_mode: FieldMatchMode, file_io: FileIO, location_generator: T, @@ -70,10 +71,30 @@ impl ParquetWriterBuilder { file_io: FileIO, location_generator: T, file_name_generator: F, + ) -> Self { + Self::new_with_match_mode( + props, + schema, + FieldMatchMode::Id, + file_io, + location_generator, + file_name_generator, + ) + } + + /// Create a new `ParquetWriterBuilder` with custom match mode + pub fn new_with_match_mode( + props: WriterProperties, + schema: SchemaRef, + match_mode: FieldMatchMode, + file_io: FileIO, + location_generator: T, + file_name_generator: F, ) -> Self { Self { props, schema, + match_mode, file_io, location_generator, file_name_generator, @@ -96,7 +117,7 @@ impl FileWriterBuilder for ParquetWr writer_properties: self.props, current_row_num: 0, out_file, - nan_value_count_visitor: NanValueCountVisitor::new(), + nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), }) } } From 7636cd5058b150a58940d52de2b7b906752caedb Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 12 Aug 2025 12:56:36 -0700 Subject: [PATCH 12/13] name matching in write exec --- crates/integrations/datafusion/src/physical_plan/write.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index a8d0b110af..4b8ef1ab11 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ execute_input_stream, }; use futures::StreamExt; -use iceberg::arrow::schema_to_arrow_schema; +use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema}; use iceberg::spec::{ DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, @@ -232,9 +232,10 @@ impl ExecutionPlan for IcebergWriteExec { } // Create data file writer builder - let parquet_file_writer_builder = ParquetWriterBuilder::new( + let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode( WriterProperties::default(), self.table.metadata().current_schema().clone(), + FieldMatchMode::Name, self.table.file_io().clone(), DefaultLocationGenerator::new(self.table.metadata().clone()) .map_err(to_datafusion_error)?, From 54d11aac79907e46d9fed92d9179edef5c991911 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 12 Aug 2025 13:08:05 -0700 Subject: [PATCH 13/13] clippy --- crates/iceberg/src/arrow/value.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index a1747c846b..9ddd941fa4 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -474,6 +474,12 @@ impl ArrowArrayAccessor { } } +impl Default for ArrowArrayAccessor { + fn default() -> Self { + Self::new() + } +} + impl PartnerAccessor for ArrowArrayAccessor { fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { if !matches!(schema_partner.data_type(), DataType::Struct(_)) {