-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41812][SPARK-41823][CONNECT][SQL][PYTHON] Resolve ambiguous columns issue in Join
#39925
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
77ca731 to
a6fde92
Compare
|
cc @cloud-fan |
connector/connect/common/src/main/protobuf/spark/connect/relations.proto
Outdated
Show resolved
Hide resolved
...connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
Outdated
Show resolved
Hide resolved
connector/connect/common/src/main/protobuf/spark/connect/relations.proto
Outdated
Show resolved
Hide resolved
python/pyspark/sql/connect/plan.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this refactoring in this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here and below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another question is why plan is a better var name compared to rel? Because it's a Relation not a Plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both plan and rel are fine to me, I just want to make the naming consistent.
1f9e2ee to
57631b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we mention it's per-client global?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this really happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so
add this check just because of self._plan's type
self._plan: Optional[plan.LogicalPlan]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the plan is optional? this is not related to this PR and we can address later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't know why. Let's fix it later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the only place returning ColumnReference ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
functions.col also return ColumnReference, but without plan_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be great if we can reduce code duplication somehow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like having a private version of def col which takes an extra plan id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add some comments to explain this special branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if an attribute has a plan id, I think it should never be resolved to an outer reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I think we can narrow the case to UnresolvedAttribute -> AttributeReference
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
99fbd08 to
e5f1d21
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to do this check. Even if the attribute reference is dangling, we should still use it and fail later. You can check the behavior of normal dataframe using df1.select(df2.col)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to fail this case:
>>> df1 = spark.range(0, 10)
>>> df2 = spark.range(0, 10)
>>> df1
DataFrame[id: bigint]
>>> df1.select(df2.id)
DataFrame[id: bigint]
>>> df1.select(df2.id).show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
I guess we should make it failed if can not find the matching node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, what's more
df1 = .... columns [a, b, c]
df2 = df1.select("a")
df2.select(df1.b)
this should fail as well, with missing attribute error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, add a new test for it.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
b9afcae to
4d79c6a
Compare
23d076e to
9fa9d4b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commonNaturalJoinProcessing resolves Join to Project(Join) and discard the plan id, make this change to hold the plan id, otherwise following case will fail due to can not find the subplan:
left = spark.createDataFrame([Row(a=1)])
right = spark.createDataFrame([Row(a=1)])
df = left.join(right, on="a", how="left_outer")
df.withColumn("b", udf(lambda x: "x")(df.a)). <- can not resolve `df.a`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests for df1.select(df2.col) cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to fail here in order to fail invalid cases like df1.select(df2.col)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we reset the plan id to the new join or the new project?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure about this.
it seems that setting the plan id to new Join also works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we update the code inside commonNaturalJoinProcessing and retain the id in Join? That seems more natural.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line can be removed. the next line covers it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does the match do? can we simply write plan.resolve(u.nameParts, conf.resolver)? It seems wrong to limit the result to be AttributeReference, which rejects nested cols.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, will update
7893c70 to
68968c0
Compare
…lumns issue in `Join` ### What changes were proposed in this pull request? In Python Client - generate `plan_id` for each proto plan (It's up to the Client to guarantee the uniqueness); - attach `plan_id` to the column created by `DataFrame[col_name]` or `DataFrame.col_name`; - Note that `F.col(col_name)` doesn't have `plan_id`; In Connect Planner: - attach `plan_id` to `UnresolvedAttribute`s and `LogicalPlan `s via `TreeNodeTag` In Analyzer: - for an `UnresolvedAttribute` with `plan_id`, search the matching node in the plan, and resolve it with the found node if possible **Out of scope:** - resolve `self-join` - add a `DetectAmbiguousSelfJoin`-like rule for detection ### Why are the changes needed? Fix bug, before this PR: ``` df1.join(df2, df1["value"] == df2["value"]) <- fail due to can not resolve `value` df1.join(df2, df1["value"] == df2["value"]).select(df1.value) <- fail due to can not resolve `value` df1.select(df2.value) <- should fail, but run as `df1.select(df1.value)` and return the incorrect results ``` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? added tests, enabled tests Closes #39925 from zhengruifeng/connect_plan_id. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]> (cherry picked from commit 167bbca) Signed-off-by: Ruifeng Zheng <[email protected]>
|
merged into master/3.4 thank you all! |
### What changes were proposed in this pull request? This is the scala version of #39925. We introduce a plan_id that is both used for each plan created by the scala client, and by the columns created when calling `Dataframe.col(..)` and `Dataframe.apply(..)`. This way we can later properly resolve the columns created for a specific Dataframe. ### Why are the changes needed? Joining columns created using Dataframe.apply(...) does not work when the column names are ambiguous. We should be able to figure out where a column comes from when they are created like this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated golden files. Added test case to ClientE2ETestSuite. Closes #40156 from hvanhovell/SPARK-41823. Authored-by: Herman van Hovell <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
### What changes were proposed in this pull request? This is the scala version of #39925. We introduce a plan_id that is both used for each plan created by the scala client, and by the columns created when calling `Dataframe.col(..)` and `Dataframe.apply(..)`. This way we can later properly resolve the columns created for a specific Dataframe. ### Why are the changes needed? Joining columns created using Dataframe.apply(...) does not work when the column names are ambiguous. We should be able to figure out where a column comes from when they are created like this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated golden files. Added test case to ClientE2ETestSuite. Closes #40156 from hvanhovell/SPARK-41823. Authored-by: Herman van Hovell <[email protected]> Signed-off-by: Herman van Hovell <[email protected]> (cherry picked from commit 6a24330) Signed-off-by: Herman van Hovell <[email protected]>
### What changes were proposed in this pull request? This is the scala version of apache/spark#39925. We introduce a plan_id that is both used for each plan created by the scala client, and by the columns created when calling `Dataframe.col(..)` and `Dataframe.apply(..)`. This way we can later properly resolve the columns created for a specific Dataframe. ### Why are the changes needed? Joining columns created using Dataframe.apply(...) does not work when the column names are ambiguous. We should be able to figure out where a column comes from when they are created like this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated golden files. Added test case to ClientE2ETestSuite. Closes #40156 from hvanhovell/SPARK-41823. Authored-by: Herman van Hovell <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
…lumns issue in `Join` ### What changes were proposed in this pull request? In Python Client - generate `plan_id` for each proto plan (It's up to the Client to guarantee the uniqueness); - attach `plan_id` to the column created by `DataFrame[col_name]` or `DataFrame.col_name`; - Note that `F.col(col_name)` doesn't have `plan_id`; In Connect Planner: - attach `plan_id` to `UnresolvedAttribute`s and `LogicalPlan `s via `TreeNodeTag` In Analyzer: - for an `UnresolvedAttribute` with `plan_id`, search the matching node in the plan, and resolve it with the found node if possible **Out of scope:** - resolve `self-join` - add a `DetectAmbiguousSelfJoin`-like rule for detection ### Why are the changes needed? Fix bug, before this PR: ``` df1.join(df2, df1["value"] == df2["value"]) <- fail due to can not resolve `value` df1.join(df2, df1["value"] == df2["value"]).select(df1.value) <- fail due to can not resolve `value` df1.select(df2.value) <- should fail, but run as `df1.select(df1.value)` and return the incorrect results ``` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? added tests, enabled tests Closes apache#39925 from zhengruifeng/connect_plan_id. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]> (cherry picked from commit 167bbca) Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? This is the scala version of apache#39925. We introduce a plan_id that is both used for each plan created by the scala client, and by the columns created when calling `Dataframe.col(..)` and `Dataframe.apply(..)`. This way we can later properly resolve the columns created for a specific Dataframe. ### Why are the changes needed? Joining columns created using Dataframe.apply(...) does not work when the column names are ambiguous. We should be able to figure out where a column comes from when they are created like this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated golden files. Added test case to ClientE2ETestSuite. Closes apache#40156 from hvanhovell/SPARK-41823. Authored-by: Herman van Hovell <[email protected]> Signed-off-by: Herman van Hovell <[email protected]> (cherry picked from commit 6a24330) Signed-off-by: Herman van Hovell <[email protected]>
…in the `PLAN_ID_TAG` ### What changes were proposed in this pull request? Make rule `ExtractWindowExpressions` retain the `PLAN_ID_TAG ` ### Why are the changes needed? In #39925, we introduced a new mechanism to resolve expression with specified plan. However, sometimes the plan ID might be discarded by some analyzer rules, and then some expressions can not be correctly resolved, this issue is the main blocker of PS on Connect. ### Does this PR introduce _any_ user-facing change? yes, a lot of Pandas APIs enabled ### How was this patch tested? Enable UTs Closes #42086 from zhengruifeng/ps_connect_analyze_window. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…in the `PLAN_ID_TAG` ### What changes were proposed in this pull request? Make rule `ExtractWindowExpressions` retain the `PLAN_ID_TAG ` ### Why are the changes needed? In apache#39925, we introduced a new mechanism to resolve expression with specified plan. However, sometimes the plan ID might be discarded by some analyzer rules, and then some expressions can not be correctly resolved, this issue is the main blocker of PS on Connect. ### Does this PR introduce _any_ user-facing change? yes, a lot of Pandas APIs enabled ### How was this patch tested? Enable UTs Closes apache#42086 from zhengruifeng/ps_connect_analyze_window. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
What changes were proposed in this pull request?
In Python Client
plan_idfor each proto plan (It's up to the Client to guarantee the uniqueness);plan_idto the column created byDataFrame[col_name]orDataFrame.col_name;F.col(col_name)doesn't haveplan_id;In Connect Planner:
plan_idtoUnresolvedAttributes andLogicalPlans viaTreeNodeTagIn Analyzer:
UnresolvedAttributewithplan_id, search the matching node in the plan, and resolve it with the found node if possibleOut of scope:
self-joinDetectAmbiguousSelfJoin-like rule for detectionWhy are the changes needed?
Fix bug, before this PR:
Does this PR introduce any user-facing change?
yes
How was this patch tested?
added tests, enabled tests