-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-46741][SQL] Cache Table with CTE should work when CTE in plan expression subquery #53526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ping @cloud-fan , could you take a look? also add a corresponding unit suite case, verified that before change to |
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Outdated
Show resolved
Hide resolved
|
@AngersZhuuuu can you re-trigger failed CI jobs? |
DOne |
| object NormalizeCTEIds extends Rule[LogicalPlan]{ | ||
| object NormalizeCTEIds extends Rule[LogicalPlan] { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = { | ||
| val curId = new java.util.concurrent.atomic.AtomicLong() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we have unique CTE ids per query? then we need a new AtomicLong instance per apply invocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this, directly change to transformDownWithSubqueries will cause UT SPARK-51109 failed.
For query
test("SPARK-51109: CTE in subquery expression as grouping column") {
withTable("t") {
Seq(1 -> 1).toDF("c1", "c2").write.saveAsTable("t")
withView("v") {
sql(
"""
|CREATE VIEW v AS
|WITH r AS (SELECT c1 + c2 AS c FROM t)
|SELECT * FROM r
|""".stripMargin)
checkAnswer(
sql("SELECT (SELECT max(c) FROM v WHERE c > id) FROM range(1) GROUP BY 1"),
Row(2)
)
}
}
}
Plan will be normalized from
Aggregate [scalar-subquery#15 [id#16L]], [scalar-subquery#15 [id#16L] AS scalarsubquery(id)#21]
: :- Aggregate [max(c#18) AS max(c)#20]
: : +- Filter (cast(c#18 as bigint) > outer(id#16L))
: : +- SubqueryAlias spark_catalog.default.v
: : +- View (`spark_catalog`.`default`.`v`, [c#18])
: : +- Project [cast(c#17 as int) AS c#18]
: : +- WithCTE
: : :- CTERelationDef 1, false
: : : +- SubqueryAlias r
: : : +- Project [(c1#12 + c2#13) AS c#17]
: : : +- SubqueryAlias spark_catalog.default.t
: : : +- Relation spark_catalog.default.t[c1#12,c2#13] parquet
: : +- Project [c#17]
: : +- SubqueryAlias r
: : +- CTERelationRef 1, true, [c#17], false, false
: +- Aggregate [max(c#26) AS max(c)#27]
: +- Filter (cast(c#26 as bigint) > outer(id#16L))
: +- SubqueryAlias spark_catalog.default.v
: +- View (`spark_catalog`.`default`.`v`, [c#26])
: +- Project [cast(c#25 as int) AS c#26]
: +- WithCTE
: :- CTERelationDef 1, false
: : +- SubqueryAlias r
: : +- Project [(c1#22 + c2#23) AS c#24]
: : +- SubqueryAlias spark_catalog.default.t
: : +- Relation spark_catalog.default.t[c1#22,c2#23] parquet
: +- Project [c#25]
: +- SubqueryAlias r
: +- CTERelationRef 1, true, [c#25], false, false
+- Range (0, 1, step=1)
to
Aggregate [scalar-subquery#15 [id#16L]], [scalar-subquery#15 [id#16L] AS scalarsubquery(id)#21]
: :- Aggregate [max(c#18) AS max(c)#20]
: : +- Filter (cast(c#18 as bigint) > outer(id#16L))
: : +- SubqueryAlias spark_catalog.default.v
: : +- View (`spark_catalog`.`default`.`v`, [c#18])
: : +- Project [cast(c#17 as int) AS c#18]
: : +- WithCTE
: : :- CTERelationDef 0, false
: : : +- SubqueryAlias r
: : : +- Project [(c1#12 + c2#13) AS c#17]
: : : +- SubqueryAlias spark_catalog.default.t
: : : +- Relation spark_catalog.default.t[c1#12,c2#13] parquet
: : +- Project [c#17]
: : +- SubqueryAlias r
: : +- CTERelationRef 0, true, [c#17], false, false
: +- Aggregate [max(c#26) AS max(c)#27]
: +- Filter (cast(c#26 as bigint) > outer(id#16L))
: +- SubqueryAlias spark_catalog.default.v
: +- View (`spark_catalog`.`default`.`v`, [c#26])
: +- Project [cast(c#25 as int) AS c#26]
: +- WithCTE
: :- CTERelationDef 1, false
: : +- SubqueryAlias r
: : +- Project [(c1#22 + c2#23) AS c#24]
: : +- SubqueryAlias spark_catalog.default.t
: : +- Relation spark_catalog.default.t[c1#22,c2#23] parquet
: +- Project [c#25]
: +- SubqueryAlias r
: +- CTERelationRef 1, true, [c#25], false, false
+- Range (0, 1, step=1)
in same plan the normalized cte id changed causing throw
[info] is not a valid aggregate expression: [SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar subquery '"scalarsubquery(id)"' is neither present in GROUP BY, nor in an aggregate function.
[info] Add it to GROUP BY using ordinal position or wrap it in `first()` (or `first_value`) if you don't care which value you get. SQLSTATE: 0A000; line 1 pos 7
[info] Previous schema:scalarsubquery(id)#21
I am still trying how to fix such problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means we should handle the case when CTE def IDs can be duplicated. In such cases, we should not generate new IDs blindly.
val defIdToNewId = withCTE.cteDefs.map(_.id).map((_, curId.getAndIncrement())).toMap
We need to fix this line. The id map should be per apply invocation, not per WithCTE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or use a global map in one traversal: #53333 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @cloud-fan How about current?
This reverts commit 4b7e1d8.
| plan transformDown { | ||
| val defIdToNewId = new HashMap[Long, Long]() | ||
|
|
||
| plan transformDownWithSubqueries { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I tried foreachWithSubqueries..but meet strange exception== @cloud-fan
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
Outdated
Show resolved
Hide resolved
peter-toth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only a minor nit.
|
thanks, merging to master/4.1! |
…expression subquery ### What changes were proposed in this pull request? Follow comment #53333 (comment) ### Why are the changes needed? Support all case ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #53526 from AngersZhuuuu/SPARK-46741-FOLLOWUP. Lead-authored-by: Angerszhuuuu <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit d65ee81) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Follow comment #53333 (comment)
Why are the changes needed?
Support all case
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT
Was this patch authored or co-authored using generative AI tooling?
No