Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 31 additions & 57 deletions datafusion/functions/src/core/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{new_null_array, BooleanArray};
use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, is_not_null, is_null};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_common::{exec_err, internal_err, plan_err, Result};
use datafusion_expr::binary::try_type_union_resolution;
use datafusion_expr::conditional_expressions::CaseBuilder;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
};
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use datafusion_macros::user_doc;
Expand Down Expand Up @@ -95,61 +94,36 @@ impl ScalarUDFImpl for CoalesceFunc {
Ok(Field::new(self.name(), return_type, nullable).into())
}

/// coalesce evaluates to the first value which is not NULL
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
// do not accept 0 arguments.
fn simplify(
&self,
args: Vec<Expr>,
_info: &dyn SimplifyInfo,
) -> Result<ExprSimplifyResult> {
if args.is_empty() {
return exec_err!(
"coalesce was called with {} arguments. It requires at least 1.",
args.len()
);
return plan_err!("coalesce must have at least one argument");
}

let return_type = args[0].data_type();
let mut return_array = args.iter().filter_map(|x| match x {
ColumnarValue::Array(array) => Some(array.len()),
_ => None,
});

if let Some(size) = return_array.next() {
// start with nulls as default output
let mut current_value = new_null_array(&return_type, size);
let mut remainder = BooleanArray::from(vec![true; size]);

for arg in args {
match arg {
ColumnarValue::Array(ref array) => {
let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
current_value = zip(&to_apply, array, &current_value)?;
remainder = and(&remainder, &is_null(array)?)?;
}
ColumnarValue::Scalar(value) => {
if value.is_null() {
continue;
} else {
let last_value = value.to_scalar()?;
current_value = zip(&remainder, &last_value, &current_value)?;
break;
}
}
}
if remainder.iter().all(|x| x == Some(false)) {
break;
}
}
Ok(ColumnarValue::Array(current_value))
} else {
let result = args
.iter()
.filter_map(|x| match x {
ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
_ => None,
})
.next()
.unwrap_or_else(|| args[0].clone());
Ok(result)
if args.len() == 1 {
return Ok(ExprSimplifyResult::Simplified(
args.into_iter().next().unwrap(),
));
}

let n = args.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite a clever implementation.

However, I worry it may cause problems for comet which uses physical evaluation directly

@comphead or @andygrove do you know if comet uses the COALESCE implementation directly?

let (init, last_elem) = args.split_at(n - 1);
let whens = init
.iter()
.map(|x| x.clone().is_not_null())
.collect::<Vec<_>>();
let cases = init.to_vec();
Ok(ExprSimplifyResult::Simplified(
CaseBuilder::new(None, whens, cases, Some(Box::new(last_elem[0].clone())))
.end()?,
))
}

/// coalesce evaluates to the first value which is not NULL
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
internal_err!("coalesce should have been simplified to case")
}

fn short_circuits(&self) -> bool {
Expand Down
16 changes: 11 additions & 5 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1656,10 +1656,10 @@ query TT
explain select coalesce(1, y/x), coalesce(2, y/x) from t;
----
logical_plan
01)Projection: coalesce(Int64(1), CAST(t.y / t.x AS Int64)), coalesce(Int64(2), CAST(t.y / t.x AS Int64))
01)Projection: CASE WHEN Boolean(true) THEN Int64(1) ELSE CAST(t.y / t.x AS Int64) END AS coalesce(Int64(1),t.y / t.x), CASE WHEN Boolean(true) THEN Int64(2) ELSE CAST(t.y / t.x AS Int64) END AS coalesce(Int64(2),t.y / t.x)
02)--TableScan: t projection=[x, y]
physical_plan
01)ProjectionExec: expr=[coalesce(1, CAST(y@1 / x@0 AS Int64)) as coalesce(Int64(1),t.y / t.x), coalesce(2, CAST(y@1 / x@0 AS Int64)) as coalesce(Int64(2),t.y / t.x)]
01)ProjectionExec: expr=[CASE WHEN true THEN 1 ELSE CAST(y@1 / x@0 AS Int64) END as coalesce(Int64(1),t.y / t.x), CASE WHEN true THEN 2 ELSE CAST(y@1 / x@0 AS Int64) END as coalesce(Int64(2),t.y / t.x)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should be able to be simplifed even more -- CASE WHEN true THEN ... should go to 1

I filed a ticket to track this idea:

02)--DataSourceExec: partitions=1, partition_sizes=[1]

query TT
Expand All @@ -1686,11 +1686,17 @@ physical_plan
02)--ProjectionExec: expr=[y@1 = 0 as __common_expr_1, x@0 as x, y@1 as y]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

# due to the reason describe in https://github.com/apache/datafusion/issues/8927,
# the following queries will fail
query error
query II
select coalesce(1, y/x), coalesce(2, y/x) from t;
----
1 2
1 2
1 2
1 2
1 2

# due to the reason describe in https://github.com/apache/datafusion/issues/8927,
# the following queries will fail
query error
SELECT y > 0 and 1 / y < 1, x > 0 and y > 0 and 1 / y < 1 / x from t;

Expand Down