Skip to content

Commit 681ab7e

Browse files
committed
feat: use visitor to allow struct fields to be used in eq deletes
1 parent 1f687c7 commit 681ab7e

File tree

2 files changed

+224
-58
lines changed

2 files changed

+224
-58
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 224 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
use std::collections::{HashMap, HashSet};
1919
use std::ops::Not;
20+
use std::sync::Arc;
2021

21-
use arrow_array::{Array, Int64Array, StringArray};
22+
use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
2223
use futures::{StreamExt, TryStreamExt};
23-
use itertools::Itertools;
2424
use tokio::sync::oneshot::{Receiver, channel};
2525

2626
use super::delete_filter::DeleteFilter;
@@ -31,7 +31,11 @@ use crate::expr::Predicate::AlwaysTrue;
3131
use crate::expr::{Predicate, Reference};
3232
use crate::io::FileIO;
3333
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
34-
use crate::spec::{DataContentType, Datum, SchemaRef};
34+
use crate::spec::{
35+
DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor,
36+
PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type,
37+
visit_schema_with_partner,
38+
};
3539
use crate::{Error, ErrorKind, Result};
3640

3741
#[derive(Clone, Debug)]
@@ -321,6 +325,8 @@ impl CachingDeleteFileLoader {
321325
equality_ids: HashSet<i32>,
322326
) -> Result<Predicate> {
323327
let mut result_predicate = AlwaysTrue;
328+
let mut batch_schema_iceberg: Option<Schema> = None;
329+
let accessor = EqDelRecordBatchPartnerAccessor;
324330

325331
while let Some(record_batch) = stream.next().await {
326332
let record_batch = record_batch?;
@@ -329,47 +335,26 @@ impl CachingDeleteFileLoader {
329335
return Ok(AlwaysTrue);
330336
}
331337

332-
let batch_schema_arrow = record_batch.schema();
333-
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
334-
335-
let mut datum_columns_with_names: Vec<_> = record_batch
336-
.columns()
337-
.iter()
338-
.zip(batch_schema_iceberg.as_struct().fields())
339-
// only use columns that are in the set of equality_ids for this delete file
340-
.filter(|(field, value)| equality_ids.contains(&value.id))
341-
.map(|(column, field)| {
342-
let lit_vec = arrow_primitive_to_literal(column, &field.field_type)?;
343-
344-
let primitive_type = field.field_type.as_primitive_type().ok_or(Error::new(
345-
ErrorKind::Unexpected,
346-
"field is not a primitive type",
347-
))?;
348-
349-
let datum_iterator: Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>> =
350-
Box::new(lit_vec.into_iter().map(move |c| {
351-
c.map(|literal| {
352-
literal
353-
.as_primitive_literal()
354-
.map(|primitive_literal| {
355-
Datum::new(primitive_type.clone(), primitive_literal)
356-
})
357-
.ok_or(Error::new(
358-
ErrorKind::Unexpected,
359-
"failed to convert to primitive literal",
360-
))
361-
})
362-
.transpose()
363-
}));
364-
365-
Ok::<_, Error>((datum_iterator, field.name.to_string()))
366-
})
367-
.try_collect()?;
338+
let schema = match &batch_schema_iceberg {
339+
Some(schema) => schema,
340+
None => {
341+
let schema = arrow_schema_to_schema(record_batch.schema().as_ref())?;
342+
batch_schema_iceberg = Some(schema);
343+
batch_schema_iceberg.as_ref().unwrap()
344+
}
345+
};
346+
347+
let root_array: ArrayRef = Arc::new(StructArray::from(record_batch));
368348

369-
// consume all the iterators in lockstep, creating per-row predicates that get combined
370-
// into a single final predicate
349+
let mut processor = EqDelColumnProcessor::new(&equality_ids);
350+
visit_schema_with_partner(schema, &root_array, &mut processor, &accessor)?;
371351

372-
// (2025-06-12) can't use `is_empty` as it depends on unstable library feature `exact_size_is_empty`
352+
let mut datum_columns_with_names = processor.finish()?;
353+
if datum_columns_with_names.is_empty() {
354+
continue;
355+
}
356+
357+
// Process the collected columns in lockstep
373358
#[allow(clippy::len_zero)]
374359
while datum_columns_with_names[0].0.len() > 0 {
375360
let mut row_predicate = AlwaysTrue;
@@ -390,13 +375,169 @@ impl CachingDeleteFileLoader {
390375
}
391376
}
392377

378+
struct EqDelColumnProcessor<'a> {
379+
equality_ids: &'a HashSet<i32>,
380+
collected_columns: Vec<(ArrayRef, String, Type)>,
381+
}
382+
383+
impl<'a> EqDelColumnProcessor<'a> {
384+
fn new(equality_ids: &'a HashSet<i32>) -> Self {
385+
Self {
386+
equality_ids,
387+
collected_columns: Vec::with_capacity(equality_ids.len()),
388+
}
389+
}
390+
391+
#[allow(clippy::type_complexity)]
392+
fn finish(
393+
self,
394+
) -> Result<
395+
Vec<(
396+
Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>>,
397+
String,
398+
)>,
399+
> {
400+
self.collected_columns
401+
.into_iter()
402+
.map(|(array, field_name, field_type)| {
403+
let primitive_type = field_type
404+
.as_primitive_type()
405+
.ok_or_else(|| {
406+
Error::new(ErrorKind::Unexpected, "field is not a primitive type")
407+
})?
408+
.clone();
409+
410+
let lit_vec = arrow_primitive_to_literal(&array, &field_type)?;
411+
let datum_iterator: Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>> =
412+
Box::new(lit_vec.into_iter().map(move |c| {
413+
c.map(|literal| {
414+
literal
415+
.as_primitive_literal()
416+
.map(|primitive_literal| {
417+
Datum::new(primitive_type.clone(), primitive_literal)
418+
})
419+
.ok_or(Error::new(
420+
ErrorKind::Unexpected,
421+
"failed to convert to primitive literal",
422+
))
423+
})
424+
.transpose()
425+
}));
426+
427+
Ok((datum_iterator, field_name))
428+
})
429+
.collect::<Result<Vec<_>>>()
430+
}
431+
}
432+
433+
impl SchemaWithPartnerVisitor<ArrayRef> for EqDelColumnProcessor<'_> {
434+
type T = ();
435+
436+
fn schema(&mut self, _schema: &Schema, _partner: &ArrayRef, _value: ()) -> Result<()> {
437+
Ok(())
438+
}
439+
440+
fn field(&mut self, field: &NestedFieldRef, partner: &ArrayRef, _value: ()) -> Result<()> {
441+
if self.equality_ids.contains(&field.id) && field.field_type.as_primitive_type().is_some() {
442+
self.collected_columns.push((
443+
partner.clone(),
444+
field.name.clone(),
445+
field.field_type.as_ref().clone(),
446+
));
447+
}
448+
Ok(())
449+
}
450+
451+
fn r#struct(
452+
&mut self,
453+
_struct: &StructType,
454+
_partner: &ArrayRef,
455+
_results: Vec<()>,
456+
) -> Result<()> {
457+
Ok(())
458+
}
459+
460+
fn list(&mut self, _list: &ListType, _partner: &ArrayRef, _value: ()) -> Result<()> {
461+
Ok(())
462+
}
463+
464+
fn map(
465+
&mut self,
466+
_map: &MapType,
467+
_partner: &ArrayRef,
468+
_key_value: (),
469+
_value: (),
470+
) -> Result<()> {
471+
Ok(())
472+
}
473+
474+
fn primitive(&mut self, _primitive: &PrimitiveType, _partner: &ArrayRef) -> Result<()> {
475+
Ok(())
476+
}
477+
}
478+
479+
struct EqDelRecordBatchPartnerAccessor;
480+
481+
impl PartnerAccessor<ArrayRef> for EqDelRecordBatchPartnerAccessor {
482+
fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
483+
Ok(schema_partner)
484+
}
485+
486+
fn field_partner<'a>(
487+
&self,
488+
struct_partner: &'a ArrayRef,
489+
field: &NestedField,
490+
) -> Result<&'a ArrayRef> {
491+
let Some(struct_array) = struct_partner.as_any().downcast_ref::<StructArray>() else {
492+
return Err(Error::new(
493+
ErrorKind::Unexpected,
494+
"Expected struct array for field extraction",
495+
));
496+
};
497+
498+
// Find the field by name within the struct
499+
for (i, field_def) in struct_array.fields().iter().enumerate() {
500+
if field_def.name() == &field.name {
501+
return Ok(struct_array.column(i));
502+
}
503+
}
504+
505+
Err(Error::new(
506+
ErrorKind::Unexpected,
507+
format!("Field {} not found in parent struct", field.name),
508+
))
509+
}
510+
511+
fn list_element_partner<'a>(&self, _list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
512+
Err(Error::new(
513+
ErrorKind::FeatureUnsupported,
514+
"List columns are unsupported in equality deletes",
515+
))
516+
}
517+
518+
fn map_key_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
519+
Err(Error::new(
520+
ErrorKind::FeatureUnsupported,
521+
"Map columns are unsupported in equality deletes",
522+
))
523+
}
524+
525+
fn map_value_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
526+
Err(Error::new(
527+
ErrorKind::FeatureUnsupported,
528+
"Map columns are unsupported in equality deletes",
529+
))
530+
}
531+
}
532+
393533
#[cfg(test)]
394534
mod tests {
395535
use std::collections::HashMap;
396536
use std::fs::File;
397537
use std::sync::Arc;
398538

399-
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
539+
use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray};
540+
use arrow_schema::{DataType, Field, Fields};
400541
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
401542
use parquet::basic::Compression;
402543
use parquet::file::properties::WriterProperties;
@@ -419,7 +560,7 @@ mod tests {
419560
.await
420561
.expect("could not get batch stream");
421562

422-
let eq_ids = HashSet::from_iter(vec![2, 3, 4]);
563+
let eq_ids = HashSet::from_iter(vec![2, 3, 4, 6]);
423564

424565
let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
425566
record_batch_stream,
@@ -429,11 +570,19 @@ mod tests {
429570
.expect("error parsing batch stream");
430571
println!("{}", parsed_eq_delete);
431572

432-
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL))".to_string();
573+
let expected = "((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa != 4)) AND ((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 5))".to_string();
433574

434575
assert_eq!(parsed_eq_delete.to_string(), expected);
435576
}
436577

578+
/// Create a simple field with metadata.
579+
fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
580+
arrow_schema::Field::new(name, ty, nullable).with_metadata(HashMap::from([(
581+
PARQUET_FIELD_ID_META_KEY.to_string(),
582+
value.to_string(),
583+
)]))
584+
}
585+
437586
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
438587
let col_y_vals = vec![1, 2];
439588
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
@@ -444,24 +593,42 @@ mod tests {
444593
let col_a_vals = vec![Some("HELP"), None];
445594
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
446595

596+
let col_s = Arc::new(StructArray::from(vec![
597+
(
598+
Arc::new(simple_field("sa", DataType::Int32, false, "6")),
599+
Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
600+
),
601+
(
602+
Arc::new(simple_field("sb", DataType::Utf8, true, "7")),
603+
Arc::new(StringArray::from(vec![Some("x"), None])) as ArrayRef,
604+
),
605+
]));
606+
447607
let equality_delete_schema = {
608+
let struct_field = DataType::Struct(Fields::from(vec![
609+
simple_field("sa", DataType::Int32, false, "6"),
610+
simple_field("sb", DataType::Utf8, true, "7"),
611+
]));
612+
448613
let fields = vec![
449-
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
450-
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
451-
),
452-
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
453-
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
454-
),
455-
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
456-
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
457-
),
614+
Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(HashMap::from(
615+
[(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
616+
)),
617+
Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(HashMap::from(
618+
[(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
619+
)),
620+
Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(HashMap::from([
621+
(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
622+
])),
623+
simple_field("s", struct_field, false, "5"),
458624
];
459625
Arc::new(arrow_schema::Schema::new(fields))
460626
};
461627

462-
let equality_deletes_to_write =
463-
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
464-
.unwrap();
628+
let equality_deletes_to_write = RecordBatch::try_new(equality_delete_schema.clone(), vec![
629+
col_y, col_z, col_a, col_s,
630+
])
631+
.unwrap();
465632

466633
let path = format!("{}/equality-deletes-1.parquet", &table_location);
467634

crates/iceberg/src/scan/context.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ impl PlanContext {
191191

192192
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
193193
let mut filtered_mfcs = vec![];
194-
195194
for manifest_file in manifest_files {
196195
let tx = if manifest_file.content == ManifestContentType::Deletes {
197196
delete_file_tx.clone()

0 commit comments

Comments
 (0)