Skip to content

Commit f5d88d1

Browse files
authored
Support serialization/deserialization for custom physical exprs in proto (#11387)
* Add PhysicalExtensionExprNode * regen proto * Add ser/de extension expr logic * Add test and fix clippy lint
1 parent 08fa444 commit f5d88d1

File tree

7 files changed

+330
-3
lines changed

7 files changed

+330
-3
lines changed

datafusion/proto/proto/datafusion.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,8 @@ message PhysicalExprNode {
836836
// was PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
837837

838838
PhysicalLikeExprNode like_expr = 18;
839+
840+
PhysicalExtensionExprNode extension = 19;
839841
}
840842
}
841843

@@ -942,6 +944,11 @@ message PhysicalNegativeNode {
942944
PhysicalExprNode expr = 1;
943945
}
944946

947+
message PhysicalExtensionExprNode {
948+
bytes expr = 1;
949+
repeated PhysicalExprNode inputs = 2;
950+
}
951+
945952
message FilterExecNode {
946953
PhysicalPlanNode input = 1;
947954
PhysicalExprNode expr = 2;

datafusion/proto/src/generated/pbjson.rs

Lines changed: 124 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,14 @@ pub fn parse_physical_expr(
394394
codec,
395395
)?,
396396
)),
397+
ExprType::Extension(extension) => {
398+
let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
399+
.inputs
400+
.iter()
401+
.map(|e| parse_physical_expr(e, registry, input_schema, codec))
402+
.collect::<Result<_>>()?;
403+
(codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _
404+
}
397405
};
398406

399407
Ok(pexpr)

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,22 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync {
20182018
fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
20192019
Ok(())
20202020
}
2021+
2022+
fn try_decode_expr(
2023+
&self,
2024+
_buf: &[u8],
2025+
_inputs: &[Arc<dyn PhysicalExpr>],
2026+
) -> Result<Arc<dyn PhysicalExpr>> {
2027+
not_impl_err!("PhysicalExtensionCodec is not provided")
2028+
}
2029+
2030+
fn try_encode_expr(
2031+
&self,
2032+
_node: Arc<dyn PhysicalExpr>,
2033+
_buf: &mut Vec<u8>,
2034+
) -> Result<()> {
2035+
not_impl_err!("PhysicalExtensionCodec is not provided")
2036+
}
20212037
}
20222038

20232039
#[derive(Debug)]

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,24 @@ pub fn serialize_physical_expr(
495495
))),
496496
})
497497
} else {
498-
internal_err!("physical_plan::to_proto() unsupported expression {value:?}")
498+
let mut buf: Vec<u8> = vec![];
499+
match codec.try_encode_expr(Arc::clone(&value), &mut buf) {
500+
Ok(_) => {
501+
let inputs: Vec<protobuf::PhysicalExprNode> = value
502+
.children()
503+
.into_iter()
504+
.map(|e| serialize_physical_expr(Arc::clone(e), codec))
505+
.collect::<Result<_>>()?;
506+
Ok(protobuf::PhysicalExprNode {
507+
expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
508+
protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
509+
)),
510+
})
511+
}
512+
Err(e) => internal_err!(
513+
"Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
514+
),
515+
}
499516
}
500517
}
501518

0 commit comments

Comments
 (0)