Skip to content

Commit 4e25be4

Browse files
committed
add it
1 parent 6d287ef commit 4e25be4

File tree

3 files changed

+56
-9
lines changed

3 files changed

+56
-9
lines changed

crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,8 @@ mod tests {
222222

223223
fn create_test_schema() -> DFSchema {
224224
let arrow_schema = Schema::new(vec![
225-
Field::new("foo", DataType::Int32, false),
226-
Field::new("bar", DataType::Utf8, false),
225+
Field::new("foo", DataType::Int32, true),
226+
Field::new("bar", DataType::Utf8, true),
227227
]);
228228
DFSchema::try_from_qualified_schema("my_table", &arrow_schema).unwrap()
229229
}

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,13 @@ impl DisplayAs for IcebergTableScan {
139139
) -> std::fmt::Result {
140140
write!(
141141
f,
142-
"IcebergTableScan projection:[{}]",
142+
"IcebergTableScan projection:[{}] predicate:[{}]",
143143
self.projection
144144
.clone()
145-
.map_or(String::new(), |v| v.join(","))
145+
.map_or(String::new(), |v| v.join(",")),
146+
self.predicates
147+
.clone()
148+
.map_or(String::from(""), |p| format!("{}", p))
146149
)
147150
}
148151
}

crates/integrations/datafusion/tests/integration_datafusion_test.rs

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,7 @@ async fn test_table_projection() -> Result<()> {
204204
.unwrap();
205205
assert_eq!(2, s.len());
206206
// the first row is logical_plan, the second row is physical_plan
207-
assert_eq!(
208-
"IcebergTableScan projection:[foo1,foo2,foo3]",
209-
s.value(1).trim()
210-
);
207+
assert!(s.value(1).contains("projection:[foo1,foo2,foo3]"));
211208

212209
// datafusion doesn't support query foo3.s_foo1, use foo3 instead
213210
let records = table_df
@@ -226,7 +223,54 @@ async fn test_table_projection() -> Result<()> {
226223
.downcast_ref::<StringArray>()
227224
.unwrap();
228225
assert_eq!(2, s.len());
229-
assert_eq!("IcebergTableScan projection:[foo1,foo3]", s.value(1).trim());
226+
assert!(s
227+
.value(1)
228+
.contains("IcebergTableScan projection:[foo1,foo3]"));
230229

231230
Ok(())
232231
}
232+
233+
#[tokio::test]
234+
async fn test_table_predict_pushdown() -> Result<()> {
235+
let iceberg_catalog = get_iceberg_catalog();
236+
let namespace = NamespaceIdent::new("ns".to_string());
237+
set_test_namespace(&iceberg_catalog, &namespace).await?;
238+
239+
let schema = Schema::builder()
240+
.with_schema_id(0)
241+
.with_fields(vec![
242+
NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
243+
NestedField::optional(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
244+
])
245+
.build()?;
246+
let creation = get_table_creation(temp_path(), "t1", Some(schema))?;
247+
iceberg_catalog.create_table(&namespace, creation).await?;
248+
249+
let client = Arc::new(iceberg_catalog);
250+
let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
251+
252+
let ctx = SessionContext::new();
253+
ctx.register_catalog("catalog", catalog);
254+
let records = ctx
255+
.sql("select * from catalog.ns.t1 where (foo > 1 and length(bar) = 1 ) or bar is null")
256+
.await
257+
.unwrap()
258+
.explain(false, false)
259+
.unwrap()
260+
.collect()
261+
.await
262+
.unwrap();
263+
assert_eq!(1, records.len());
264+
let record = &records[0];
265+
// the first column is plan_type, the second column plan string.
266+
let s = record
267+
.column(1)
268+
.as_any()
269+
.downcast_ref::<StringArray>()
270+
.unwrap();
271+
assert_eq!(2, s.len());
272+
// the first row is logical_plan, the second row is physical_plan
273+
let expected = "predicate:[(foo > 1) OR (bar IS NULL)]";
274+
assert!(s.value(1).trim().contains(expected));
275+
Ok(())
276+
}

0 commit comments

Comments
 (0)