Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 19, 2019

What changes were proposed in this pull request?

In SPARK-15370, We checked the expression at the root of the correlated subquery, in order to fix count bug. If a PythonUDF in in the checking path, evaluating it causes the failure as we can't statically evaluate PythonUDF. The Python UDF test added at SPARK-28277 shows this issue.

If we can statically evaluate the expression, we intercept NULL values coming from the outer join and replace them with the value that the subquery's expression like before, if it is not, we replace them with the PythonUDF expression, with statically evaluated parameters.

After this, the last query in udf-except.sql which throws java.lang.UnsupportedOperationException can be run:

SELECT t1.k
FROM   t1
WHERE  t1.v <= (SELECT   udf(max(udf(t2.v)))
                FROM     t2
                WHERE    udf(t2.k) = udf(t1.k))
MINUS
SELECT t1.k
FROM   t1
WHERE  udf(t1.v) >= (SELECT   min(udf(t2.v))
                FROM     t2
                WHERE    t2.k = t1.k)
-- !query 2 schema
struct<k:string>
-- !query 2 output
two

Note that this issue is also for other non-foldable expressions, like rand. As like PythonUDF, we can't call eval on this kind of expressions in optimization. The evaluation needs to defer to query runtime.

How was this patch tested?

Added tests.

@SparkQA
Copy link

SparkQA commented Jul 19, 2019

Test build #107915 has finished for PR 25204 at commit 725304c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 20, 2019

Looks making sense to me from a cursory look. I will take a closer look if this doesn't get merged or reviewed. cc @cloud-fan too.

}
Option(rewrittenExpr.eval())
if (rewrittenExpr.find(_.isInstanceOf[PythonUDF]).isDefined) {
// SPARK-28441: `PythonUDF` can't be statically evaluated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many expressions can't be statically evaluated, why only special-case python udf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue was found by PythonUDF. I think of covering all unevaluable expressions here, but not sure if it is too aggressive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should cover all unevaluable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you define "can't be statically evaluated"? Do you mean !expr.foldable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PythonUDF is Unevaluable. So you can't call eval on it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it fakes the empty input case, and evaluate the expressions in subquery. So it doesn't require foldable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't call Expression.eval(null) if it's not foldable, otherwise exception may be thrown:

  1. AttributeReference.eval(null) fails with NPE
  2. Nondeterministic.eval(null) fails because it needs to be initialized first

Whatever hack we use, I'd expect it makes the expression foldable.

Copy link
Member Author

@viirya viirya Jul 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, AttributeReference was replaced with pre-evaluated value, if it comes from aggregate function. It uses default value. It fakes empty input case. Or null, if it is not.

For 2, I think it is potential issue.

Yeah, here the hack looks like foldable expression. It simulates empty input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems necessary to me to check foldable before calling .eval(), otherwise there is no guarantee that .eval() can success.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Use foldable here to check.

}
}
Option(rewrittenExpr.eval())
if (!rewrittenExpr.foldable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we apply the check in more places? evalAggOnZeroTups also calls eval() directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this is not possible for PythonUDF, but it is potential for other not foldable expression.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it is not covered by added test. Let me add test for it...

.asInstanceOf[Boolean]
if (exprResult) bindings else Map.empty
}
evalPlan(child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we evaluate the filter condition?

@SparkQA
Copy link

SparkQA commented Jul 22, 2019

Test build #108005 has finished for PR 25204 at commit 7972d7c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Option(rewrittenExpr.eval())

// Removes Alias over given expression, because Alias is not foldable.
if (!removeAlias(rewrittenExpr).foldable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we can move the following code into a common method?

@viirya viirya changed the title [SPARK-28441][SQL][Python] Fix error when PythonUDF is used in correlated scalar subquery [SPARK-28441][SQL][Python] Fix error when non-foldable expression is used in correlated scalar subquery Jul 23, 2019
} else {
val exprVal = rewrittenExpr.eval()
if (exprVal == null) {
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why we need to return None here instead of a null literal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it uses None to make checking bindings easier.

In other way, to use null literal, Option[Expression] can be changed to Expression in methods like evalSubqueryOnZeroTups, evalPlan. Then we check bindings by literal instead of None. Good thing is we can write Literal.create(rewrittenExpr.eval(), expr.dataType), instead of checking null. Looks like just a choice problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryEvalExpr?

bindings
} else {
val bindExpr = bindingExpr(condition, bindings)
.getOrElse(Literal.create(false, BooleanType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For filter condition, null is the same as false. This is one place that makes me think bindingExpr should return Option[Expression].

If this is the only place, I think it's simpler to always return expression, and handle null especially here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I may try this way tomorrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this works. Looks good as it's simple.

* This replaces original expression id used in attributes and aliases in expression.
*/
private def replaceOldExprId(
orgExprId: ExprId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

orgExprId -> oldExprId ?

// We replace original expression id with a new one. The added Alias column
// must use expr id of original output. If we don't replace old expr id in the
// query, the added Project in potential Project-Filter-Project can be removed
// by removeProjectBeforeFilter in ColumnPruning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fix removeProjectBeforeFilter to only remove attribute-only projects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth trying, right now not sure if any other thing will be affected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried locally. Added subquery tests are passed. We can see if Jenkins passes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Seems fine. Jenkins passes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2019

Test build #108059 has finished for PR 25204 at commit 110a39e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2019

Test build #108057 has finished for PR 25204 at commit 33441a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2019

Test build #108109 has finished for PR 25204 at commit 0158d85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2019

Test build #108112 has finished for PR 25204 at commit 2dd29c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* This replaces original expression id used in attributes and aliases in expression.
*/
private def replaceOldExprId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this.

Row(1) :: Row(1) ::Row(null) :: Row(null) :: Row(6) :: Nil)
}

test("SPARK-28441: COUNT bug in subquery in subquery in subquery with non-foldable expr") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if p2.outputSet.subsetOf(child.outputSet) =>
if p2.outputSet.subsetOf(child.outputSet) &&
// We only remove attribute-only project.
p2.projectList.forall(_.isInstanceOf[AttributeReference]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this change. This may cause serious perf regression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we remove project that's not attribute-only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it was wrong previously, but if a project's output has same expr IDs with its child, it's usually attribute-only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmmh... I may be missing something, but I'd imagine a case like this:

select a, b from
(select a, b, very_expensive_operation as c from ... where a = 1)

Before this change, would be optimized as:

select a, b from
(select a, b from ... where a = 1)

while after it is not. Am I wrong?

Copy link
Member Author

@viirya viirya Jul 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In above case, it has a Alias in project list, so it's not an attribute-only project. And I think it also create new attr c, so p2.outputSet.subsetOf(child.outputSet) is not met too.

I think the rules in ColumnPruning will trim very_expensive_operation in the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now, sorry. Why do we need this? Seems an unrelated change to the fix in this PR, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, the issue was seen in previous comment 33441a3. It was overwritten now.

We added a column for count bug. The column checks a always-true leading column alwaysTrueExpr, returns special value if alwaysTrueExpr is null, to simulate empty input case.

This column reuses expr id of original output in the subquery. In non-foldable expression case, the added column in a potential Project-Filter-Project, will be trimmed by removeProjectBeforeFilter, because the second project meets p2.outputSet.subsetOf(child.outputSet).

My original fix is to create an expr id. Replace original expr id with new one in the subquery. Looks complicated. This seems a simple fix, and looks reasonable.

newExpression.asInstanceOf[E]
}

private def removeAlias(expr: Expression): Expression = expr match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there are several aliases? Shall we use CleanupAliases instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We track expressions from aggregate expressions as root. I think aliases should be continuous on top. Using CleanupAliases is also good, at least we don't need adding new method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, sorry, this is recursive too, but I think it is good to avoid a new method. Thanks.

checkAnswer(
sql("select l.a from l where " +
"(select case when udf(count(*)) = 1 then null else udf(count(*)) end as cnt " +
"from r where l.a = r.c) = 0"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use multi-line string to write long SQL? Let's also upper case the keywords.

"""
|select l.b, (select (r.c + udf(count(*))) is null
|from r
|where l.a = r.c group by r.c) from l
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's format the SQL in a more readable way. For this particular example

select
  l.b,
  (
    select (r.c + udf(count(*))) is null
    from r
    where l.a = r.c
    group by r.c
  )
from l

@cloud-fan
Copy link
Contributor

the fix looks good, some comments about the tests. Thanks for catching and fixing this nasty bug!

registerTestUDF(pythonTestUDF, spark)

checkAnswer(
sql("""SELECT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: AFAIK the multi-line string should be written as

"""
  |line1
  |line2
"""

not

"""line1
  |line2
"""

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@SparkQA
Copy link

SparkQA commented Jul 25, 2019

Test build #108165 has finished for PR 25204 at commit 9aea844.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2019

Test build #108166 has finished for PR 25204 at commit 1f6b717.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2019

Test build #108175 has finished for PR 25204 at commit d7d023d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

|FROM l WHERE
| (
| SELECT udf(count(*)) + udf(sum(r.d)
| )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is wrong, it should be

WHERE
  (
    SELECT udf(count(*)) + udf(sum(r.d))
    FROM r WHERE l.a = r.c
  ) = 0

val df = sql("""
|SELECT
| l.a
| FROM l WHERE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no indentation here.

sql("""
|SELECT l.a FROM l
|WHERE (
| SELECT cntPlusOne + 1 AS cntPlusTwo FROM (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we be consistent with 2 space indentation?

@viirya
Copy link
Member Author

viirya commented Jul 26, 2019

@cloud-fan thanks for identifying the style issue. Fixed.

@SparkQA
Copy link

SparkQA commented Jul 26, 2019

Test build #108215 has finished for PR 25204 at commit fd29677.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 558dd23 Jul 27, 2019
@viirya
Copy link
Member Author

viirya commented Jul 27, 2019

thanks @cloud-fan @HyukjinKwon @mgaido91

import IntegratedUDFTestUtils._

val pythonTestUDF = TestPythonUDF(name = "udf")
registerTestUDF(pythonTestUDF, spark)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we should add assume(shouldTestPythonUDFs). Maybe it's not a biggie in general but it can matter in other venders' testing base. For instance, if somebody launches a test in a minimal docker image, it might make the tests failed suddenly.

This skipping stuff isn't completely new in our test base. See TestUtils.testCommandAvailable for instance.

@HyukjinKwon
Copy link
Member

@huaxingao, can you make a followup of SPARK-28277 to re-enable?

-- Except operation that will be replaced by left anti join
--- [SPARK-28441] udf(max(udf(column))) throws java.lang.UnsupportedOperationException: Cannot evaluate expression: udf(null)
--- SELECT t1.k
--- FROM t1
--- WHERE t1.v <= (SELECT udf(max(udf(t2.v)))
--- FROM t2
--- WHERE udf(t2.k) = udf(t1.k))
--- MINUS
--- SELECT t1.k
--- FROM t1
--- WHERE udf(t1.v) >= (SELECT min(udf(t2.v))
--- FROM t2
--- WHERE t2.k = t1.k);

@HyukjinKwon
Copy link
Member

LGTM too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants