Skip to content

Commit 366e7ef

Browse files
author
Huaxin Gao
committed
fix error
1 parent d061f06 commit 366e7ef

File tree

9 files changed

+48
-136
lines changed

9 files changed

+48
-136
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
246246
args,
247247
filter,
248248
order_by,
249-
null_treatment,
249+
..
250250
}) => match func_def {
251251
AggregateFunctionDefinition::BuiltIn(..) => {
252252
create_function_physical_name(func_def.name(), *distinct, args)

datafusion/optimizer/src/analyzer/type_coercion.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,14 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
367367
&self.schema,
368368
&fun.signature(),
369369
)?;
370-
let expr = Expr::AggregateFunction(expr::AggregateFunction::new(fun, new_expr, distinct, filter, order_by, null_treatment));
370+
let expr = Expr::AggregateFunction(expr::AggregateFunction::new(
371+
fun,
372+
new_expr,
373+
distinct,
374+
filter,
375+
order_by,
376+
null_treatment,
377+
));
371378
Ok(expr)
372379
}
373380
AggregateFunctionDefinition::UDF(fun) => {

datafusion/optimizer/src/single_distinct_to_groupby.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result<bool> {
7676
args,
7777
filter,
7878
order_by,
79-
null_treatment,
79+
..
8080
}) = expr
8181
{
8282
if filter.is_some() || order_by.is_some() {

datafusion/physical-expr/src/aggregate/build_in.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -360,13 +360,16 @@ pub fn create_aggregate_expr(
360360
(AggregateFunction::Median, true) => {
361361
return not_impl_err!("MEDIAN(DISTINCT) aggregations are not available");
362362
}
363-
(AggregateFunction::FirstValue, _) => Arc::new(expressions::FirstValue::new(
364-
input_phy_exprs[0].clone(),
365-
name,
366-
input_phy_types[0].clone(),
367-
ordering_req.to_vec(),
368-
ordering_types,
369-
).ignore_null(ignore_nulls)),
363+
(AggregateFunction::FirstValue, _) => Arc::new(
364+
expressions::FirstValue::new(
365+
input_phy_exprs[0].clone(),
366+
name,
367+
input_phy_types[0].clone(),
368+
ordering_req.to_vec(),
369+
ordering_types,
370+
)
371+
.ignore_null(ignore_nulls),
372+
),
370373
(AggregateFunction::LastValue, _) => Arc::new(expressions::LastValue::new(
371374
input_phy_exprs[0].clone(),
372375
name,
@@ -1309,7 +1312,15 @@ mod tests {
13091312
"Invalid or wrong number of arguments passed to aggregate: '{name}'"
13101313
);
13111314
}
1312-
create_aggregate_expr(fun, distinct, &coerced_phy_exprs, &[], input_schema, name, false)
1315+
create_aggregate_expr(
1316+
fun,
1317+
distinct,
1318+
&coerced_phy_exprs,
1319+
&[],
1320+
input_schema,
1321+
name,
1322+
false,
1323+
)
13131324
}
13141325

13151326
// Returns the coerced exprs for each `input_exprs`.

datafusion/physical-expr/src/aggregate/first_last.rs

Lines changed: 1 addition & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl FirstValue {
6868
}
6969
}
7070

71-
pub fn ignore_null(mut self, ignore_null: bool) -> Self{
71+
pub fn ignore_null(mut self, ignore_null: bool) -> Self {
7272
self.ignore_null = ignore_null;
7373
self
7474
}
@@ -846,111 +846,4 @@ mod tests {
846846

847847
Ok(())
848848
}
849-
850-
#[test]
851-
fn first_ignore_null() -> Result<()> {
852-
let a: ArrayRef = Arc::new(Int32Array::from(vec![
853-
None,
854-
Some(2),
855-
None,
856-
None,
857-
Some(3),
858-
Some(9),
859-
]));
860-
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
861-
862-
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
863-
let a_expr = col("a", &schema)?;
864-
865-
let agg1 = Arc::new(FirstValue::new(
866-
a_expr.clone(),
867-
"first1",
868-
DataType::Int32,
869-
vec![],
870-
vec![],
871-
).ignore_null(true));
872-
let first1 = aggregate(&batch, agg1)?;
873-
assert_eq!(first1, ScalarValue::Int32(Some(2)));
874-
875-
let agg2 = Arc::new(FirstValue::new(
876-
a_expr.clone(),
877-
"first1",
878-
DataType::Int32,
879-
vec![],
880-
vec![],
881-
));
882-
let first2 = aggregate(&batch, agg2)?;
883-
assert_eq!(first2, ScalarValue::Int32(None));
884-
885-
Ok(())
886-
}
887-
888-
#[test]
889-
fn first_ignore_null_with_sort() -> Result<()> {
890-
let a: ArrayRef = Arc::new(Int32Array::from(vec![
891-
Some(12),
892-
None,
893-
None,
894-
None,
895-
Some(10),
896-
Some(9),
897-
]));
898-
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
899-
900-
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
901-
let a_expr = col("a", &schema)?;
902-
903-
let option_desc = SortOptions {
904-
descending: false,
905-
nulls_first: true,
906-
};
907-
let sort_expr_a = vec![PhysicalSortExpr {
908-
expr: a_expr.clone(),
909-
options: option_desc,
910-
}];
911-
912-
let agg1 = Arc::new(FirstValue::new(
913-
a_expr.clone(),
914-
"first1",
915-
DataType::Int32,
916-
sort_expr_a.clone(),
917-
vec![DataType::Int32],
918-
).ignore_null(true));
919-
let first1 = aggregate(&batch, agg1)?;
920-
assert_eq!(first1, ScalarValue::Int32(Some(9)));
921-
922-
let agg2 = Arc::new(FirstValue::new(
923-
a_expr.clone(),
924-
"first2",
925-
DataType::Int32,
926-
sort_expr_a.clone(),
927-
vec![DataType::Int32],
928-
));
929-
let first2 = aggregate(&batch, agg2)?;
930-
assert_eq!(first2, ScalarValue::Int32(None));
931-
932-
Ok(())
933-
}
934-
935-
pub fn aggregate(
936-
batch: &RecordBatch,
937-
agg: Arc<dyn AggregateExpr>,
938-
) -> Result<ScalarValue> {
939-
let mut accum = agg.create_accumulator()?;
940-
let mut expr = agg.expressions();
941-
if let Some(ordering_req) = agg.order_bys() {
942-
expr.extend(ordering_req.iter().map(|item| item.expr.clone()));
943-
}
944-
945-
let values = expr
946-
.iter()
947-
.map(|e| {
948-
e.evaluate(batch)
949-
.and_then(|v| v.into_array(batch.num_rows()))
950-
})
951-
.collect::<Result<Vec<_>>>()?;
952-
953-
accum.update_batch(&values)?;
954-
accum.evaluate()
955-
}
956849
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,16 @@ pub(crate) mod tests {
193193
.unwrap();
194194

195195
let schema = Schema::new(vec![Field::new("a", coerced[0].clone(), true)]);
196-
let agg =
197-
create_aggregate_expr(&function, distinct, &[input], &[], &schema, "agg", false)
198-
.unwrap();
196+
let agg = create_aggregate_expr(
197+
&function,
198+
distinct,
199+
&[input],
200+
&[],
201+
&schema,
202+
"agg",
203+
false,
204+
)
205+
.unwrap();
199206

200207
let result = aggregate(&batch, agg).unwrap();
201208
assert_eq!(expected, result);

datafusion/sql/src/expr/function.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
194194
.map(Box::new);
195195

196196
return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
197-
fun, args, distinct, filter, order_by, null_treatment,
197+
fun,
198+
args,
199+
distinct,
200+
filter,
201+
order_by,
202+
null_treatment,
198203
)));
199204
};
200205

datafusion/sql/src/expr/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
216216
agg_func.distinct,
217217
agg_func.filter.clone(),
218218
agg_func.order_by.clone(),
219-
agg_func.null_treatment.clone(),
219+
agg_func.null_treatment,
220220
)), true)
221221
},
222222
_ => (expr, false),

datafusion/sql/tests/sql_integration.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ use arrow_schema::*;
2424
use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
2525

2626
use datafusion_common::{
27-
assert_contains, config::ConfigOptions, DataFusionError, Result, ScalarValue,
28-
TableReference,
27+
config::ConfigOptions, DataFusionError, Result, ScalarValue, TableReference,
2928
};
3029
use datafusion_common::{plan_err, ParamValues};
3130
use datafusion_expr::{
@@ -1288,16 +1287,6 @@ fn select_simple_aggregate_repeated_aggregate_with_unique_aliases() {
12881287
);
12891288
}
12901289

1291-
#[test]
1292-
fn select_simple_aggregate_respect_nulls() {
1293-
let sql = "SELECT MIN(age) RESPECT NULLS FROM person";
1294-
let err = logical_plan(sql).expect_err("query should have failed");
1295-
1296-
assert_contains!(
1297-
err.strip_backtrace(),
1298-
"This feature is not implemented: Null treatment in aggregate functions is not supported: RESPECT NULLS"
1299-
);
1300-
}
13011290
#[test]
13021291
fn select_from_typed_string_values() {
13031292
quick_test(

0 commit comments

Comments
 (0)