Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 11, 2019

What changes were proposed in this pull request?

A Filter predicate using PythonUDF can't be push down into join condition, currently. A predicate like that should be able to push down to join condition. For PythonUDFs that can't be evaluated in join condition, PullOutPythonUDFInJoinCondition will pull them out later.

An example like:

val pythonTestUDF = TestPythonUDF(name = "udf")

val left = Seq((1, 2), (2, 3)).toDF("a", "b")
val right = Seq((1, 2), (3, 4)).toDF("c", "d")
val df = left.crossJoin(right).where(pythonTestUDF($"a") === pythonTestUDF($"c"))

Query plan before the PR:

== Physical Plan ==                                                              
*(3) Project [a#2121, b#2122, c#2132, d#2133]                      
+- *(3) Filter (pythonUDF0#2142 = pythonUDF1#2143)                                                                                                    
   +- BatchEvalPython [udf(a#2121), udf(c#2132)], [pythonUDF0#2142, pythonUDF1#2143]
      +- BroadcastNestedLoopJoin BuildRight, Cross                                                                
         :- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]                   
         :  +- LocalTableScan [_1#2116, _2#2117]                                   
         +- BroadcastExchange IdentityBroadcastMode                       
            +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]                
               +- LocalTableScan [_1#2127, _2#2128]            

Query plan after the PR:

== Physical Plan ==
*(3) Project [a#2121, b#2122, c#2132, d#2133]
+- *(3) BroadcastHashJoin [pythonUDF0#2142], [pythonUDF0#2143], Cross, BuildRight
   :- BatchEvalPython [udf(a#2121)], [pythonUDF0#2142]
   :  +- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
   :     +- LocalTableScan [_1#2116, _2#2117]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, true]))
      +- BatchEvalPython [udf(c#2132)], [pythonUDF0#2143]
         +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
            +- LocalTableScan [_1#2127, _2#2128]

After this PR, the join can use BroadcastHashJoin, instead of BroadcastNestedLoopJoin.

How was this patch tested?

Added tests.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107503 has finished for PR 25106 at commit 4cca813.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107525 has finished for PR 25106 at commit 4cca813.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 12, 2019

cc @HyukjinKwon @cloud-fan

@HyukjinKwon
Copy link
Member

Looks fine to me from a cursory look. If it's not merged, I will take a closer look within a couple of days.

@HyukjinKwon
Copy link
Member

Merged to master.

vinodkc pushed a commit to vinodkc/spark that referenced this pull request Jul 18, 2019
…down to join

## What changes were proposed in this pull request?

A `Filter` predicate using `PythonUDF` can't be push down into join condition, currently. A predicate like that should be able to push down to join condition. For `PythonUDF`s that can't be evaluated in join condition, `PullOutPythonUDFInJoinCondition` will pull them out later.

An example like:

```scala
val pythonTestUDF = TestPythonUDF(name = "udf")

val left = Seq((1, 2), (2, 3)).toDF("a", "b")
val right = Seq((1, 2), (3, 4)).toDF("c", "d")
val df = left.crossJoin(right).where(pythonTestUDF($"a") === pythonTestUDF($"c"))
```

Query plan before the PR:
```
== Physical Plan ==
*(3) Project [a#2121, b#2122, c#2132, d#2133]
+- *(3) Filter (pythonUDF0#2142 = pythonUDF1#2143)
   +- BatchEvalPython [udf(a#2121), udf(c#2132)], [pythonUDF0#2142, pythonUDF1#2143]
      +- BroadcastNestedLoopJoin BuildRight, Cross
         :- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
         :  +- LocalTableScan [_1#2116, _2#2117]
         +- BroadcastExchange IdentityBroadcastMode
            +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
               +- LocalTableScan [_1#2127, _2#2128]
```

Query plan after the PR:
```
== Physical Plan ==
*(3) Project [a#2121, b#2122, c#2132, d#2133]
+- *(3) BroadcastHashJoin [pythonUDF0#2142], [pythonUDF0#2143], Cross, BuildRight
   :- BatchEvalPython [udf(a#2121)], [pythonUDF0#2142]
   :  +- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
   :     +- LocalTableScan [_1#2116, _2#2117]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, true]))
      +- BatchEvalPython [udf(c#2132)], [pythonUDF0#2143]
         +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
            +- LocalTableScan [_1#2127, _2#2128]
```

After this PR, the join can use `BroadcastHashJoin`, instead of `BroadcastNestedLoopJoin`.

## How was this patch tested?

Added tests.

Closes apache#25106 from viirya/pythonudf-join-condition.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
@viirya viirya deleted the pythonudf-join-condition branch December 27, 2023 18:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants