Skip to content

Commit 9463ce2

Browse files
committed
[SPARK-39835][SQL] Fix EliminateSorts remove global sort below the local sort
Correct the `EliminateSorts` follows: - If the upper sort is global then we can remove the global or local sort recursively. - If the upper sort is local then we can only remove the local sort recursively. If a global sort below locol sort, we should not remove the global sort becuase the output partitioning can be affected. This issue is going to worse since we pull out the V1 Write sort to logcial side. yes, bug fix add test Closes #37250 from ulysses-you/remove-sort. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 421918d commit 9463ce2

File tree

2 files changed

+37
-7
lines changed

2 files changed

+37
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,7 +1332,7 @@ object EliminateSorts extends Rule[LogicalPlan] {
13321332
_.containsPattern(SORT))(applyLocally)
13331333

13341334
private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
1335-
case Sort(_, _, child) if child.maxRows.exists(_ <= 1L) => recursiveRemoveSort(child)
1335+
case Sort(_, _, child) if child.maxRows.exists(_ <= 1L) => recursiveRemoveSort(child, true)
13361336
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) =>
13371337
val newOrders = orders.filterNot(_.child.foldable)
13381338
if (newOrders.isEmpty) {
@@ -1342,28 +1342,42 @@ object EliminateSorts extends Rule[LogicalPlan] {
13421342
}
13431343
case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
13441344
applyLocally.lift(child).getOrElse(child)
1345-
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
1345+
case s @ Sort(_, global, child) => s.copy(child = recursiveRemoveSort(child, global))
13461346
case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
1347-
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
1347+
j.copy(left = recursiveRemoveSort(originLeft, true),
1348+
right = recursiveRemoveSort(originRight, true))
13481349
case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
1349-
g.copy(child = recursiveRemoveSort(originChild))
1350+
g.copy(child = recursiveRemoveSort(originChild, true))
13501351
}
13511352

1352-
private def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = {
1353+
/**
1354+
* If the upper sort is global then we can remove the global or local sort recursively.
1355+
* If the upper sort is local then we can only remove the local sort recursively.
1356+
*/
1357+
private def recursiveRemoveSort(
1358+
plan: LogicalPlan,
1359+
canRemoveGlobalSort: Boolean): LogicalPlan = {
13531360
if (!plan.containsPattern(SORT)) {
13541361
return plan
13551362
}
13561363
plan match {
1357-
case Sort(_, _, child) => recursiveRemoveSort(child)
1364+
case Sort(_, global, child) if canRemoveGlobalSort || !global =>
1365+
recursiveRemoveSort(child, canRemoveGlobalSort)
13581366
case other if canEliminateSort(other) =>
1359-
other.withNewChildren(other.children.map(recursiveRemoveSort))
1367+
other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, canRemoveGlobalSort)))
1368+
case other if canEliminateGlobalSort(other) =>
1369+
other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, true)))
13601370
case _ => plan
13611371
}
13621372
}
13631373

13641374
private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
13651375
case p: Project => p.projectList.forall(_.deterministic)
13661376
case f: Filter => f.condition.deterministic
1377+
case _ => false
1378+
}
1379+
1380+
private def canEliminateGlobalSort(plan: LogicalPlan): Boolean = plan match {
13671381
case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic)
13681382
case _: Repartition => true
13691383
case _ => false

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,4 +432,20 @@ class EliminateSortsSuite extends AnalysisTest {
432432
Optimize.execute(testRelation.limit(Literal(1)).orderBy('a.asc).orderBy('a.asc)).analyze,
433433
testRelation.limit(Literal(1)).analyze)
434434
}
435+
436+
test("SPARK-39835: Fix EliminateSorts remove global sort below the local sort") {
437+
// global -> local
438+
val plan = testRelation.orderBy($"a".asc).sortBy($"c".asc).analyze
439+
comparePlans(Optimize.execute(plan), plan)
440+
441+
// global -> global -> local
442+
val plan2 = testRelation.orderBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
443+
val expected2 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
444+
comparePlans(Optimize.execute(plan2), expected2)
445+
446+
// local -> global -> local
447+
val plan3 = testRelation.sortBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
448+
val expected3 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
449+
comparePlans(Optimize.execute(plan3), expected3)
450+
}
435451
}

0 commit comments

Comments
 (0)