-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15313][SQL] EmbedSerializerInFilter rule should keep exprIds of output of surrounded SerializeFromObject. #13096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. | ||
| // We will remove it later in RemoveAliasOnlyProject rule. | ||
| val objAttrs = filter.output.zip(s.output).map { | ||
| case (fout, sout) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(it seems putting the case in the same line is preferred (map { case (fout, sout) =>) scala-style-guide#pattern-matching)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Thank you, I'll fix it.
|
Test build #58553 has finished for PR 13096 at commit
|
|
Test build #58555 has finished for PR 13096 at commit
|
|
Can you add the jira ticket somewhere as inline comment in the test case and in the analyzer code? |
|
cc @cloud-fan |
|
|
||
| // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. | ||
| // We will remove it later in RemoveAliasOnlyProject rule. | ||
| val objAttrs = filter.output.zip(s.output).map { case (fout, sout) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say it's not object attributes, maybe we should just name it attrs
|
LGTM, thanks for finding this bug! |
|
Alright I'm going to merge this in master/2.0. Thanks. |
…f output of surrounded SerializeFromObject.
## What changes were proposed in this pull request?
The following code:
```
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
ds.filter(_._1 == "b").select(expr("_1").as[String]).foreach(println(_))
```
throws an Exception:
```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: _1#420
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
...
Cause: java.lang.RuntimeException: Couldn't find _1#420 in [_1#416,_2#417]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
...
```
This is because `EmbedSerializerInFilter` rule drops the `exprId`s of output of surrounded `SerializeFromObject`.
The analyzed and optimized plans of the above example are as follows:
```
== Analyzed Logical Plan ==
_1: string
Project [_1#420]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, scala.Tuple2]._1, true) AS _1#420,input[0, scala.Tuple2]._2 AS _2#421]
+- Filter <function1>.apply
+- DeserializeToObject newInstance(class scala.Tuple2), obj#419: scala.Tuple2
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
== Optimized Logical Plan ==
!Project [_1#420]
+- Filter <function1>.apply
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
```
This PR fixes `EmbedSerializerInFilter` rule to keep `exprId`s of output of surrounded `SerializeFromObject`.
The plans after this patch are as follows:
```
== Analyzed Logical Plan ==
_1: string
Project [_1#420]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, scala.Tuple2]._1, true) AS _1#420,input[0, scala.Tuple2]._2 AS _2#421]
+- Filter <function1>.apply
+- DeserializeToObject newInstance(class scala.Tuple2), obj#419: scala.Tuple2
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
== Optimized Logical Plan ==
Project [_1#416]
+- Filter <function1>.apply
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
```
## How was this patch tested?
Existing tests and I added a test to check if `filter and then select` works.
Author: Takuya UESHIN <[email protected]>
Closes #13096 from ueshin/issues/SPARK-15313.
(cherry picked from commit d5e1c5a)
Signed-off-by: Reynold Xin <[email protected]>
What changes were proposed in this pull request?
The following code:
throws an Exception:
This is because
EmbedSerializerInFilterrule drops theexprIds of output of surroundedSerializeFromObject.The analyzed and optimized plans of the above example are as follows:
This PR fixes
EmbedSerializerInFilterrule to keepexprIds of output of surroundedSerializeFromObject.The plans after this patch are as follows:
How was this patch tested?
Existing tests and I added a test to check if
filter and then selectworks.