Skip to content

Conversation

@eejbyfeldt
Copy link
Contributor

@eejbyfeldt eejbyfeldt commented Oct 18, 2023

What changes were proposed in this pull request?

Fixes correctness issue in 3.5.0. The problem seems to be that when AQEShuffleRead does a coalesced read it can return a HashPartitioning with the coalesced number of partitions. This causes a correctness bug as the partitioning is not compatible for joins with other HashPartitioning even though the number of partitions matches. This is resolved in this patch by introducing CoalescedHashPartitioning and making AQEShuffleRead return that instead.

The fix was suggested by @cloud-fan

AQEShuffleRead should probably return a different partitioning, e.g. CoalescedHashPartitioning. It still satisfies ClusterDistribution, so Aggregate is fine and there will be no shuffle. For joins, two CoalescedHashPartitionings are compatible if they have the same original partition number and coalesce boundaries, and CoalescedHashPartitioning is not compatible with HashPartitioning.

Why are the changes needed?

Correctness bug.

Does this PR introduce any user-facing change?

Yes, fixed correctness issue.

How was this patch tested?

New and existing unit test.

Was this patch authored or co-authored using generative AI tooling?

No

@eejbyfeldt eejbyfeldt changed the base branch from branch-3.5 to master October 18, 2023 13:58
@eejbyfeldt
Copy link
Contributor Author

Tagging @cloud-fan and @ulysses-you since they created PRs in this area and might not a better way of fixing the bug.

@github-actions github-actions bot added the SQL label Oct 18, 2023
@ulysses-you
Copy link
Contributor

I think the issue is that, we propagate a coalesced shuffle exchange through InMemoryTableScanExec, and then the EnsureRequirements use the coalesced shuffle exchange to create other side shuffle exchange.
However, the shuffle exchanges are actually not compatible. i.e., One side shuffle is from HashPartitioning(200) and then coalesce to HashPartitioning(10) and other side shuffle is HashPartitioning(10). So it causes the join data issue.

                      Scan
                       |
                   Shuffle(200)
                       |
  Scan           AQEShuffleRead(10)
   |                   |
Shuffle(10)   InMemoryTableScanExec
    \            /
         Join    

BTW, if you set spark.sql.shuffle.partitions=5 , I think this issue should be resolved.

There are two code place related to this issue:

  1. AQEShuffleRead always think the coalesced partitioning is not changed, so just refresh the partition number. I think it is based on the assumption that all the initial shuffle partition numbers are same but it seems not. The EnsureRequirements support shouldConsiderMinParallelism which cause different initial shuffle partition number in one query execution.
  2. The InMemoryTableScanExec propagates the output partitioning. InMemoryTableScanExec would introduce one more query execution which also breaks the assumption of AQEShuffleRead

@cloud-fan
Copy link
Contributor

AQEShuffleRead should probably return a different partitioning, e.g. CoalescedHashPartitioning. It still satisfies ClusterDistribution, so Aggregate is fine and there will be no shuffle. For joins, two CoalescedHashPartitionings are compatible if they have the same original partition number and coalesce boundaries, and CoalescedHashPartitioning is not compatible with HashPartitioning.

@eejbyfeldt
Copy link
Contributor Author

eejbyfeldt commented Oct 20, 2023

@cloud-fan I took a stab at implementing your suggestion, but the reproduction of the bug still fails. So either I made some mistake or missed some other part of the code that needs to be updated. Would be great if you could provide some feedback.

@cloud-fan
Copy link
Contributor

@ulysses-you can you take a look when you have time?

Copy link
Contributor

@ulysses-you ulysses-you left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me, cc @cloud-fan

@eejbyfeldt eejbyfeldt changed the title [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec Oct 23, 2023
@eejbyfeldt
Copy link
Contributor Author

We don't need this hack anymore and can safely remove the if branch.

Is the suggestion to do that in this PR or is it better to do it in a follow up?

@cloud-fan
Copy link
Contributor

@eejbyfeldt nvm, I made a mistake. This is for coalesce, we can add a new partitioning for skew join handling (split and replicate partitions). It's unrelated to this PR and we can do it latter.

override val numPartitions: Int = partitions.length

override def toString: String = from.toString
override def sql: String = from.sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, why do we need to hide CoalescedHashPartitioning? Can we run some example queries and check EXPLAIN and SQL web UI?

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with only one minor comment

@maryannxue
Copy link
Contributor

@eejbyfeldt Can you briefly describe the triggering condition of this bug? Does it only occur when coalescing happens to produce just the exact number of partitions as the other side of the join?

In the meantime, I'm wondering if it would be better to:

  1. not coalesce for the top/last shuffle of the physical plan of InMemoryTableScan
  2. have coalesce rule deal with InMemoryTableScan from the caller side (user of the cache)

This PR, just to address the correctness issue, only needs to do 1. And we can do 2 (a little trickier I suppose) for performance improvement.

@maryannxue
Copy link
Contributor

Synced with @cloud-fan offline, (2) in the above suggestion wouldn't work. Let's go ahead with current fix.

@cloud-fan
Copy link
Contributor

The failed streaming test is unrelated, and my last comment is quite minor, let's merge it first to fix the correctness bug. Thanks for you great work!

@cloud-fan cloud-fan closed this in 2be03d8 Oct 31, 2023
cloud-fan pushed a commit that referenced this pull request Oct 31, 2023
Fixes correctness issue in 3.5.0. The problem seems to be that when AQEShuffleRead does a coalesced read it can return a HashPartitioning with the coalesced number of partitions. This causes a correctness bug as the partitioning is not compatible for joins with other HashPartitioning even though the number of partitions matches. This is resolved in this patch by introducing CoalescedHashPartitioning and making AQEShuffleRead return that instead.

The fix was suggested by cloud-fan

> AQEShuffleRead should probably return a different partitioning, e.g. CoalescedHashPartitioning. It still satisfies ClusterDistribution, so Aggregate is fine and there will be no shuffle. For joins, two CoalescedHashPartitionings are compatible if they have the same original partition number and coalesce boundaries, and CoalescedHashPartitioning is not compatible with HashPartitioning.

Correctness bug.

Yes, fixed correctness issue.

New and existing unit test.

No

Closes #43435 from eejbyfeldt/SPARK-45592.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2be03d8)
Signed-off-by: Wenchen Fan <[email protected]>
@dongjoon-hyun
Copy link
Member

Thank you, @eejbyfeldt and all.

eejbyfeldt pushed a commit to eejbyfeldt/spark that referenced this pull request Nov 9, 2023
Fixes correctness issue in 3.5.0. The problem seems to be that when AQEShuffleRead does a coalesced read it can return a HashPartitioning with the coalesced number of partitions. This causes a correctness bug as the partitioning is not compatible for joins with other HashPartitioning even though the number of partitions matches. This is resolved in this patch by introducing CoalescedHashPartitioning and making AQEShuffleRead return that instead.

The fix was suggested by cloud-fan

> AQEShuffleRead should probably return a different partitioning, e.g. CoalescedHashPartitioning. It still satisfies ClusterDistribution, so Aggregate is fine and there will be no shuffle. For joins, two CoalescedHashPartitionings are compatible if they have the same original partition number and coalesce boundaries, and CoalescedHashPartitioning is not compatible with HashPartitioning.

Correctness bug.

Yes, fixed correctness issue.

New and existing unit test.

No

Closes apache#43435 from eejbyfeldt/SPARK-45592.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2be03d8)
Signed-off-by: Wenchen Fan <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Nov 12, 2023
…MemoryTableScanExec

### What changes were proposed in this pull request?

This backports #43435 SPARK-45592  to the 3.4 branch. This is because it was already reported there as SPARK-45282 but it required enabling some extra configuration to hit the bug.

### Why are the changes needed?

Fix correctness issue.

### Does this PR introduce _any_ user-facing change?

Yes, fixing correctness issue.

### How was this patch tested?

New tests based on the reproduction example in SPARK-45282

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43729 from eejbyfeldt/SPARK-45282.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@JJACOB0806
Copy link

Hello, is there a timeline for 3.5.1 release? We are facing the issue in 3.5.0 and would like to know when the next stable version will be rolled out.

@deepakcv
Copy link

deepakcv commented Jan 12, 2024

Hi, is there a tentative timeline for releasing spark-3.5.1 with these changes?

cloud-fan pushed a commit that referenced this pull request Feb 7, 2024
### What changes were proposed in this pull request?

#43435 and #43760 are fixing a correctness issue which will be triggered when AQE applied on cached query plan, specifically, when AQE coalescing the final result stage of the cached plan.

The current semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning`

([source code](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411)):

when true, we enable AQE, but disable coalescing final stage (default)
when false, we disable AQE

But let’s revisit the semantic of this config: actually for caller the only thing that matters is whether we change the output partitioning of the cached plan. And we should only try to apply AQE if possible.  Thus we want to modify the semantic of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning

when true, we enable AQE and allow coalescing final: this might lead to perf regression, because it introduce extra shuffle
when false, we enable AQE, but disable coalescing final stage. (this is actually the `true` semantic of old behavior)
Also, to keep the default behavior unchanged, we might want to flip the default value of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to `false`

### Why are the changes needed?

To allow AQE coalesce final stage in SQL cached plan. Also make the semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` more reasonable.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Updated UTs.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45054 from liuzqt/SPARK-46995.

Authored-by: Ziqi Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…MemoryTableScanExec

### What changes were proposed in this pull request?

This backports apache#43435 SPARK-45592  to the 3.4 branch. This is because it was already reported there as SPARK-45282 but it required enabling some extra configuration to hit the bug.

### Why are the changes needed?

Fix correctness issue.

### Does this PR introduce _any_ user-facing change?

Yes, fixing correctness issue.

### How was this patch tested?

New tests based on the reproduction example in SPARK-45282

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#43729 from eejbyfeldt/SPARK-45282.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
Fixes correctness issue in 3.5.0. The problem seems to be that when AQEShuffleRead does a coalesced read it can return a HashPartitioning with the coalesced number of partitions. This causes a correctness bug as the partitioning is not compatible for joins with other HashPartitioning even though the number of partitions matches. This is resolved in this patch by introducing CoalescedHashPartitioning and making AQEShuffleRead return that instead.

The fix was suggested by cloud-fan

> AQEShuffleRead should probably return a different partitioning, e.g. CoalescedHashPartitioning. It still satisfies ClusterDistribution, so Aggregate is fine and there will be no shuffle. For joins, two CoalescedHashPartitionings are compatible if they have the same original partition number and coalesce boundaries, and CoalescedHashPartitioning is not compatible with HashPartitioning.

Correctness bug.

Yes, fixed correctness issue.

New and existing unit test.

No

Closes apache#43435 from eejbyfeldt/SPARK-45592.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2be03d8)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants