From 39cb6de1daa898e0b8948fc77649b4544b2d9c56 Mon Sep 17 00:00:00 2001 From: ViggoC Date: Sun, 29 Jun 2025 23:00:33 +0800 Subject: [PATCH 1/6] Convert Option> to Vec --- datafusion/core/src/physical_planner.rs | 11 +++--- .../core/tests/execution/logical_plan.rs | 2 +- datafusion/expr/src/expr.rs | 35 +++++++++---------- datafusion/expr/src/expr_fn.rs | 2 +- datafusion/expr/src/planner.rs | 2 +- datafusion/expr/src/test/function_stub.rs | 10 +++--- datafusion/expr/src/udaf.rs | 15 ++++---- .../src/approx_percentile_cont.rs | 2 +- datafusion/functions-aggregate/src/count.rs | 2 +- .../functions-aggregate/src/first_last.rs | 8 ++--- datafusion/functions-aggregate/src/macros.rs | 4 +-- datafusion/functions-aggregate/src/planner.rs | 2 +- .../optimizer/src/analyzer/type_coercion.rs | 14 ++++---- .../optimizer/src/common_subexpr_eliminate.rs | 2 +- .../simplify_expressions/expr_simplifier.rs | 4 +-- .../src/single_distinct_to_groupby.rs | 14 ++++---- .../proto/src/logical_plan/from_proto.rs | 5 +-- datafusion/proto/src/logical_plan/to_proto.rs | 5 +-- .../tests/cases/roundtrip_logical_plan.rs | 6 ++-- datafusion/sql/src/expr/function.rs | 7 ++-- datafusion/sql/src/unparser/expr.rs | 19 +++++----- .../consumer/expr/aggregate_function.rs | 2 +- .../consumer/rel/aggregate_rel.rs | 7 ++-- .../producer/expr/aggregate_function.rs | 12 +++---- 24 files changed, 88 insertions(+), 104 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 90cc0b572fef..bd25e7181b56 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1651,13 +1651,14 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( == NullTreatment::IgnoreNulls; let (agg_expr, filter, order_bys) = { - let order_bys = match order_by { - Some(exprs) => create_physical_sort_exprs( - exprs, + let order_bys = if !order_by.is_empty() { + create_physical_sort_exprs( + &order_by, logical_input_schema, execution_props, - )?, - None => vec![], + )? + } else { + vec![] }; let agg_expr = diff --git a/datafusion/core/tests/execution/logical_plan.rs b/datafusion/core/tests/execution/logical_plan.rs index f5a8a30e0130..da8f9807225d 100644 --- a/datafusion/core/tests/execution/logical_plan.rs +++ b/datafusion/core/tests/execution/logical_plan.rs @@ -68,7 +68,7 @@ async fn count_only_nulls() -> Result<()> { args: vec![input_col_ref], distinct: false, filter: None, - order_by: None, + order_by: vec![], null_treatment: None, }, })], diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c50268d99676..332cb8ec74c9 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -28,7 +28,7 @@ use crate::expr_fn::binary_expr; use crate::function::WindowFunctionSimplification; use crate::logical_plan::Subquery; use crate::Volatility; -use crate::{udaf, ExprSchemable, Operator, Signature, WindowFrame, WindowUDF}; +use crate::{ExprSchemable, Operator, Signature, WindowFrame, WindowUDF}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable}; @@ -994,7 +994,7 @@ pub struct AggregateFunctionParams { /// Optional filter pub filter: Option>, /// Optional ordering - pub order_by: Option>, + pub order_by: Vec, pub null_treatment: Option, } @@ -1005,7 +1005,7 @@ impl AggregateFunction { args: Vec, distinct: bool, filter: Option>, - order_by: Option>, + order_by: Vec, null_treatment: Option, ) -> Self { Self { @@ -1179,22 +1179,22 @@ impl Exists { #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct AggregateUDF { /// The function - pub fun: Arc, + pub fun: Arc, /// List of expressions to feed to the functions as arguments pub args: Vec, /// Optional filter pub filter: Option>, /// Optional ORDER BY applied prior to aggregating - pub order_by: Option>, + pub order_by: Vec, } impl AggregateUDF { /// Create a new AggregateUDF expression pub fn new( - fun: Arc, + fun: Arc, args: Vec, filter: Option>, - order_by: Option>, + order_by: Vec, ) -> Self { Self { fun, @@ -2303,18 +2303,15 @@ impl NormalizeEq for Expr { (None, None) => true, _ => false, } - && match (self_order_by, other_order_by) { - (Some(self_order_by), Some(other_order_by)) => self_order_by - .iter() - .zip(other_order_by.iter()) - .all(|(a, b)| { - a.asc == b.asc - && a.nulls_first == b.nulls_first - && a.expr.normalize_eq(&b.expr) - }), - (None, None) => true, - _ => false, - } + && self_order_by + .iter() + .zip(other_order_by.iter()) + .all(|(a, b)| { + a.asc == b.asc + && a.nulls_first == b.nulls_first + && a.expr.normalize_eq(&b.expr) + }) + && self_order_by.len() == other_order_by.len() } (Expr::WindowFunction(left), Expr::WindowFunction(other)) => { let WindowFunction { diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index e8885ed6b724..001dfce59ccd 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -821,7 +821,7 @@ impl ExprFuncBuilder { let fun_expr = match fun { ExprFuncKind::Aggregate(mut udaf) => { - udaf.params.order_by = order_by; + udaf.params.order_by = order_by.unwrap_or_default(); udaf.params.filter = filter.map(Box::new); udaf.params.distinct = distinct; udaf.params.null_treatment = null_treatment; diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index 4c03f919312e..067c7a94279f 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -294,7 +294,7 @@ pub struct RawAggregateExpr { pub args: Vec, pub distinct: bool, pub filter: Option>, - pub order_by: Option>, + pub order_by: Vec, pub null_treatment: Option, } diff --git a/datafusion/expr/src/test/function_stub.rs b/datafusion/expr/src/test/function_stub.rs index f310f31be352..8f8e84c0d111 100644 --- a/datafusion/expr/src/test/function_stub.rs +++ b/datafusion/expr/src/test/function_stub.rs @@ -60,7 +60,7 @@ pub fn sum(expr: Expr) -> Expr { vec![expr], false, None, - None, + vec![], None, )) } @@ -73,7 +73,7 @@ pub fn count(expr: Expr) -> Expr { vec![expr], false, None, - None, + vec![], None, )) } @@ -86,7 +86,7 @@ pub fn avg(expr: Expr) -> Expr { vec![expr], false, None, - None, + vec![], None, )) } @@ -282,7 +282,7 @@ pub fn min(expr: Expr) -> Expr { vec![expr], false, None, - None, + vec![], None, )) } @@ -363,7 +363,7 @@ pub fn max(expr: Expr) -> Expr { vec![expr], false, None, - None, + vec![], None, )) } diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index d1bf45ce2fe8..0e97c79f4fca 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -158,7 +158,7 @@ impl AggregateUDF { args, false, None, - None, + vec![], None, )) } @@ -394,7 +394,7 @@ where /// fn get_doc() -> &'static Documentation { /// &DOCUMENTATION /// } -/// +/// /// /// Implement the AggregateUDFImpl trait for GeoMeanUdf /// impl AggregateUDFImpl for GeoMeanUdf { /// fn as_any(&self) -> &dyn Any { self } @@ -415,7 +415,7 @@ where /// ]) /// } /// fn documentation(&self) -> Option<&Documentation> { -/// Some(get_doc()) +/// Some(get_doc()) /// } /// } /// @@ -474,7 +474,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { schema_name.write_fmt(format_args!(" FILTER (WHERE {filter})"))?; }; - if let Some(order_by) = order_by { + if !order_by.is_empty() { let clause = match self.is_ordered_set_aggregate() { true => "WITHIN GROUP", false => "ORDER BY", @@ -519,7 +519,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { schema_name.write_fmt(format_args!(" FILTER (WHERE {filter})"))?; }; - if let Some(order_by) = order_by { + if !order_by.is_empty() { schema_name.write_fmt(format_args!( " ORDER BY [{}]", schema_name_from_sorts(order_by)? @@ -608,10 +608,11 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { if let Some(fe) = filter { display_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?; } - if let Some(ob) = order_by { + if !order_by.is_empty() { display_name.write_fmt(format_args!( " ORDER BY [{}]", - ob.iter() + order_by + .iter() .map(|o| format!("{o}")) .collect::>() .join(", ") diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont.rs b/datafusion/functions-aggregate/src/approx_percentile_cont.rs index 9b0d62e936bc..55c8c847ad0a 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont.rs @@ -69,7 +69,7 @@ pub fn approx_percentile_cont( args, false, None, - Some(vec![order_by]), + vec![order_by], None, )) } diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index d1fe410321f6..09904bbad6ec 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -73,7 +73,7 @@ pub fn count_distinct(expr: Expr) -> Expr { vec![expr], true, None, - None, + vec![], None, )) } diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 42c0a57fbf28..be418fd1de9a 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -55,8 +55,8 @@ create_func!(FirstValue, first_value_udaf); create_func!(LastValue, last_value_udaf); /// Returns the first value in a group of values. -pub fn first_value(expression: Expr, order_by: Option>) -> Expr { - if let Some(order_by) = order_by { +pub fn first_value(expression: Expr, order_by: Vec) -> Expr { + if !order_by.is_empty() { first_value_udaf() .call(vec![expression]) .order_by(order_by) @@ -69,8 +69,8 @@ pub fn first_value(expression: Expr, order_by: Option>) -> Expr { } /// Returns the last value in a group of values. -pub fn last_value(expression: Expr, order_by: Option>) -> Expr { - if let Some(order_by) = order_by { +pub fn last_value(expression: Expr, order_by: Vec) -> Expr { + if !order_by.is_empty() { last_value_udaf() .call(vec![expression]) .order_by(order_by) diff --git a/datafusion/functions-aggregate/src/macros.rs b/datafusion/functions-aggregate/src/macros.rs index 18f27c3c4ae3..6c6bf7283889 100644 --- a/datafusion/functions-aggregate/src/macros.rs +++ b/datafusion/functions-aggregate/src/macros.rs @@ -28,7 +28,7 @@ macro_rules! make_udaf_expr { vec![$($arg),*], false, None, - None, + vec![], None, )) } @@ -52,7 +52,7 @@ macro_rules! make_udaf_expr_and_func { args, false, None, - None, + vec![], None, )) } diff --git a/datafusion/functions-aggregate/src/planner.rs b/datafusion/functions-aggregate/src/planner.rs index f0e37f6b1dbe..78c1e469a951 100644 --- a/datafusion/functions-aggregate/src/planner.rs +++ b/datafusion/functions-aggregate/src/planner.rs @@ -103,7 +103,7 @@ impl ExprPlanner for AggregateFunctionPlanner { vec![Expr::Literal(COUNT_STAR_EXPANSION, None)], distinct, filter, - order_by, + order_by.into_iter().map(|s| s.into()).collect(), null_treatment, )); diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index b5a3e9a2d585..a98b0fdcc3d3 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1597,7 +1597,7 @@ mod test { vec![lit(10i64)], false, None, - None, + vec![], None, )); let plan = LogicalPlan::Projection(Projection::try_new(vec![udaf], empty)?); @@ -1632,7 +1632,7 @@ mod test { vec![lit("10")], false, None, - None, + vec![], None, )); @@ -1651,7 +1651,7 @@ mod test { vec![lit(12f64)], false, None, - None, + vec![], None, )); let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?); @@ -1670,7 +1670,7 @@ mod test { vec![cast(col("a"), DataType::Float64)], false, None, - None, + vec![], None, )); let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?); @@ -1692,7 +1692,7 @@ mod test { vec![lit("1")], false, None, - None, + vec![], None, )); let err = Projection::try_new(vec![agg_expr], empty) @@ -1727,7 +1727,7 @@ mod test { let empty = empty_with_type(DataType::Int64); let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?); assert_analyzed_plan_eq!( - plan, + plan, @r" Projection: a IN ([CAST(Int32(1) AS Int64), CAST(Int8(4) AS Int64), Int64(8)]) EmptyRelation @@ -1744,7 +1744,7 @@ mod test { })); let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?); assert_analyzed_plan_eq!( - plan, + plan, @r" Projection: CAST(a AS Decimal128(24, 4)) IN ([CAST(Int32(1) AS Decimal128(24, 4)), CAST(Int8(4) AS Decimal128(24, 4)), CAST(Int64(8) AS Decimal128(24, 4))]) EmptyRelation diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 6a49e5d22087..88d51e1adea3 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -909,7 +909,7 @@ mod test { vec![inner], false, None, - None, + vec![], None, )) }; diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 2be7a2b0bd6e..2acd25bdf228 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -4324,7 +4324,7 @@ mod tests { vec![], false, None, - None, + vec![], None, )); @@ -4338,7 +4338,7 @@ mod tests { vec![], false, None, - None, + vec![], None, )); diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 50783a214342..e9a23c7c4dc5 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -79,7 +79,7 @@ fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result { }, }) = expr { - if filter.is_some() || order_by.is_some() { + if filter.is_some() || !order_by.is_empty() { return Ok(false); } aggregate_count += 1; @@ -200,7 +200,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { vec![col(SINGLE_DISTINCT_ALIAS)], false, // intentional to remove distinct here None, - None, + vec![], None, ))) // if the aggregate function is not distinct, we need to rewrite it like two phase aggregation @@ -213,7 +213,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { args, false, None, - None, + vec![], None, )) .alias(&alias_str), @@ -223,7 +223,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { vec![col(&alias_str)], false, None, - None, + vec![], None, ))) } @@ -296,7 +296,7 @@ mod tests { vec![expr], true, None, - None, + vec![], None, )) } @@ -627,7 +627,7 @@ mod tests { vec![col("a")], false, Some(Box::new(col("a").gt(lit(5)))), - None, + vec![], None, )); let plan = LogicalPlanBuilder::from(table_scan) @@ -678,7 +678,7 @@ mod tests { vec![col("a")], false, None, - Some(vec![col("a").sort(true, false)]), + vec![col("a").sort(true, false)], None, )); let plan = LogicalPlanBuilder::from(table_scan) diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 66ef0ebfe361..6c5b348698c7 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -576,10 +576,7 @@ pub fn parse_expr( parse_exprs(&pb.args, registry, codec)?, pb.distinct, parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new), - match pb.order_by.len() { - 0 => None, - _ => Some(parse_sorts(&pb.order_by, registry, codec)?), - }, + parse_sorts(&pb.order_by, registry, codec)?, None, ))) } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index b14ad7aadf58..43afaa0fbe65 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -374,10 +374,7 @@ pub fn serialize_expr( Some(e) => Some(Box::new(serialize_expr(e.as_ref(), codec)?)), None => None, }, - order_by: match order_by { - Some(e) => serialize_sorts(e, codec)?, - None => vec![], - }, + order_by: serialize_sorts(order_by, codec)?, fun_definition: (!buf.is_empty()).then_some(buf), }, ))), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 993cc6f87ca3..12c05cfcb595 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -960,8 +960,8 @@ async fn roundtrip_expr_api() -> Result<()> { array_replace_all(make_array(vec![lit(1), lit(2), lit(3)]), lit(2), lit(4)), count(lit(1)), count_distinct(lit(1)), - first_value(lit(1), None), - first_value(lit(1), Some(vec![lit(2).sort(true, true)])), + first_value(lit(1), vec![]), + first_value(lit(1), vec![lit(2).sort(true, true)]), functions_window::nth_value::first_value(lit(1)), functions_window::nth_value::last_value(lit(1)), functions_window::nth_value::nth_value(lit(1), 1), @@ -2181,7 +2181,7 @@ fn roundtrip_aggregate_udf() { vec![lit(1.0_f64)], false, Some(Box::new(lit(true))), - None, + vec![], None, )); diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 6b23bd5c2c0f..14d9a5e50a4a 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -406,21 +406,20 @@ impl SqlToRel<'_, S> { .chain(args) .collect::>(); } - (!within_group.is_empty()).then_some(within_group) + within_group } else { let order_by = if !order_by.is_empty() { order_by } else { within_group }; - let order_by = self.order_by_to_sort_expr( + self.order_by_to_sort_expr( order_by, schema, planner_context, true, None, - )?; - (!order_by.is_empty()).then_some(order_by) + )? }; let filter: Option> = filter diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index 1e39d7186d13..933302e95bfb 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -322,16 +322,15 @@ impl Unparser<'_> { Some(filter) => Some(Box::new(self.expr_to_sql_inner(filter)?)), None => None, }; - let within_group = if agg.func.is_ordered_set_aggregate() { - order_by - .as_ref() - .unwrap_or(&Vec::new()) - .iter() - .map(|sort_expr| self.sort_to_sql(sort_expr)) - .collect::>>()? - } else { - Vec::new() - }; + let within_group: Vec = + if agg.func.is_ordered_set_aggregate() { + order_by + .iter() + .map(|sort_expr| self.sort_to_sql(sort_expr)) + .collect::>>()? + } else { + Vec::new() + }; Ok(ast::Expr::Function(Function { name: ObjectName::from(vec![Ident { value: func_name.to_string(), diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs b/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs index 114fe1e7aecd..62e140acc27b 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/aggregate_function.rs @@ -30,7 +30,7 @@ pub async fn from_substrait_agg_func( f: &AggregateFunction, input_schema: &DFSchema, filter: Option>, - order_by: Option>, + order_by: Vec, distinct: bool, ) -> datafusion::common::Result> { let Some(fn_signature) = consumer diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs index 9421bb17c162..1967088f3a8f 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs @@ -89,12 +89,9 @@ pub async fn from_aggregate_rel( _ => false, }; let order_by = if !f.sorts.is_empty() { - Some( - from_substrait_sorts(consumer, &f.sorts, input.schema()) - .await?, - ) + from_substrait_sorts(consumer, &f.sorts, input.schema()).await? } else { - None + vec![] }; from_substrait_agg_func( diff --git a/datafusion/substrait/src/logical_plan/producer/expr/aggregate_function.rs b/datafusion/substrait/src/logical_plan/producer/expr/aggregate_function.rs index 0619b497532d..1e79897a1b77 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/aggregate_function.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/aggregate_function.rs @@ -43,14 +43,10 @@ pub fn from_aggregate_function( null_treatment: _null_treatment, }, } = agg_fn; - let sorts = if let Some(order_by) = order_by { - order_by - .iter() - .map(|expr| to_substrait_sort_field(producer, expr, schema)) - .collect::>>()? - } else { - vec![] - }; + let sorts = order_by + .iter() + .map(|expr| to_substrait_sort_field(producer, expr, schema)) + .collect::>>()?; let mut arguments: Vec = vec![]; for arg in args { arguments.push(FunctionArgument { From 9b6768ff7622ab77a0d4c3cb4ec95c95d8f77046 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Mon, 30 Jun 2025 21:03:26 +0800 Subject: [PATCH 2/6] clippy --- datafusion/core/src/physical_planner.rs | 2 +- datafusion/functions-aggregate/src/planner.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index bd25e7181b56..47e04f761b36 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1653,7 +1653,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( let (agg_expr, filter, order_bys) = { let order_bys = if !order_by.is_empty() { create_physical_sort_exprs( - &order_by, + order_by, logical_input_schema, execution_props, )? diff --git a/datafusion/functions-aggregate/src/planner.rs b/datafusion/functions-aggregate/src/planner.rs index 78c1e469a951..f0e37f6b1dbe 100644 --- a/datafusion/functions-aggregate/src/planner.rs +++ b/datafusion/functions-aggregate/src/planner.rs @@ -103,7 +103,7 @@ impl ExprPlanner for AggregateFunctionPlanner { vec![Expr::Literal(COUNT_STAR_EXPANSION, None)], distinct, filter, - order_by.into_iter().map(|s| s.into()).collect(), + order_by, null_treatment, )); From d6b523c5e1dc4a590012a074dc50783c4ce328ce Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Mon, 30 Jun 2025 21:12:37 +0800 Subject: [PATCH 3/6] fix comment --- datafusion/core/src/physical_planner.rs | 14 +++++--------- .../src/logical_plan/consumer/rel/aggregate_rel.rs | 7 ++----- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47e04f761b36..e4bf2f598520 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1651,15 +1651,11 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( == NullTreatment::IgnoreNulls; let (agg_expr, filter, order_bys) = { - let order_bys = if !order_by.is_empty() { - create_physical_sort_exprs( - order_by, - logical_input_schema, - execution_props, - )? - } else { - vec![] - }; + let order_bys = create_physical_sort_exprs( + order_by, + logical_input_schema, + execution_props, + )?; let agg_expr = AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec()) diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs index 1967088f3a8f..c919bd038936 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs @@ -88,11 +88,8 @@ pub async fn from_aggregate_rel( _ if f.invocation == AggregationInvocation::All as i32 => false, _ => false, }; - let order_by = if !f.sorts.is_empty() { - from_substrait_sorts(consumer, &f.sorts, input.schema()).await? - } else { - vec![] - }; + let order_by = + from_substrait_sorts(consumer, &f.sorts, input.schema()).await?; from_substrait_agg_func( consumer, From 5dd64e8c8dea33c512dd1c44b1ff733a8ea56075 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Mon, 30 Jun 2025 22:12:02 +0800 Subject: [PATCH 4/6] fix doc --- datafusion/expr/src/expr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 332cb8ec74c9..7866e7db1622 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1175,7 +1175,7 @@ impl Exists { /// User Defined Aggregate Function /// -/// See [`udaf::AggregateUDF`] for more information. +/// See [`crate::udaf::AggregateUDF`] for more information. #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct AggregateUDF { /// The function From 2748fabf4fa74c5eac214f99be30c561e8b2fb5f Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 2 Jul 2025 00:50:31 +0800 Subject: [PATCH 5/6] change back to Expr --- datafusion/expr/src/expr.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 7866e7db1622..a5e1f1c0e205 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1185,7 +1185,7 @@ pub struct AggregateUDF { /// Optional filter pub filter: Option>, /// Optional ORDER BY applied prior to aggregating - pub order_by: Vec, + pub order_by: Vec, } impl AggregateUDF { @@ -1194,7 +1194,7 @@ impl AggregateUDF { fun: Arc, args: Vec, filter: Option>, - order_by: Vec, + order_by: Vec, ) -> Self { Self { fun, From 0c496a36742583dcf82c31f0d2b16fd81b925e68 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 2 Jul 2025 00:54:25 +0800 Subject: [PATCH 6/6] remove redundant check --- .../functions-aggregate/src/first_last.rs | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index be418fd1de9a..9d3bf01dc066 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -56,30 +56,22 @@ create_func!(LastValue, last_value_udaf); /// Returns the first value in a group of values. pub fn first_value(expression: Expr, order_by: Vec) -> Expr { - if !order_by.is_empty() { - first_value_udaf() - .call(vec![expression]) - .order_by(order_by) - .build() - // guaranteed to be `Expr::AggregateFunction` - .unwrap() - } else { - first_value_udaf().call(vec![expression]) - } + first_value_udaf() + .call(vec![expression]) + .order_by(order_by) + .build() + // guaranteed to be `Expr::AggregateFunction` + .unwrap() } /// Returns the last value in a group of values. pub fn last_value(expression: Expr, order_by: Vec) -> Expr { - if !order_by.is_empty() { - last_value_udaf() - .call(vec![expression]) - .order_by(order_by) - .build() - // guaranteed to be `Expr::AggregateFunction` - .unwrap() - } else { - last_value_udaf().call(vec![expression]) - } + last_value_udaf() + .call(vec![expression]) + .order_by(order_by) + .build() + // guaranteed to be `Expr::AggregateFunction` + .unwrap() } #[user_doc(