-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48628][CORE] Add task peak on/off heap memory metrics #47192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
Outdated
Show resolved
Hide resolved
JoshRosen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am broadly supportive of this change:
The existing peakExecutionMemory metric is fairly inconsistent in its coverage and misses many important sources of allocation. It was originally added while the MemoryManager abstractions were being developed and was never fully updated inlight of that new abstraction. It also predated support for off-heap memory. For all of these reasons, I'm supportive of deprecating and replacing it.
I left a couple of minor nit suggestions, including a suggestion on how we can more explicitly call out the distinction between the old and new metrics in the Scaladocs. I am supportive of deprecating and removing the old metric in favor of these new ones.
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
|
||
| if (mode == MemoryMode.OFF_HEAP) { | ||
| peakOffHeapMemory = Math.max(peakOffHeapMemory, | ||
| memoryManager.getOffHeapExecutionMemoryUsageForTask(taskAttemptId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This introduces the extra lock synchronization to the underlying memory pool. I wonder if we could do a math calculation for the latest peak memory like this: peakMemory - releasedMemory + gotMemory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically we can maintain both currentMem and peakMem within TaskMemoryManager so that we don’t need to ask memoryManager, but on the other hand memoryManager is designed to maintain per-task mem usage so by doing this we kinda maintain this in two places. @JoshRosen WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although it's true that there might a bit of redundancy in counting in both places, it seems like there may be reasonable performance justifications for introducing such redundancy.
I don't think it will end up being that much additional code:
Within TaskMemoryManager, I think we'd just need to add a pair of long counter fields, one for on-heap and another for off-heap, then increment them in acquireExecutionMemory and decrement them in releaseExecutionMemory (since those are narrow waists).
Maybe we should give that a try and see how much net code it ends up adding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that releaseExecutionMemory is not locked, so we need to synchronize on these counters. But I suppose that would be better than synchronization on MemoryManager granularity.
| } | ||
|
|
||
| if (mode == MemoryMode.OFF_HEAP) { | ||
| synchronized (offHeapMemoryLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole function acquireExecutionMemory is under the protection of synchronized (this), and the release memory can be got from trySpillAndAcquire():
long released = consumerToSpill.spill(requested, requestingConsumer);So I don't think we need extra lock here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
acquireExecutionMemory is synchronized but releaseExecutionMemory is not synchronized.
While we maintain current memory in both places, we can either
synchronized (this)onreleaseExecutionMemory- or add another lock, for smaller lock granularity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
Actually, could we calculate consumers.map(.used).sum + got as the peak memory at the end of acquireExecutionMemory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's doable. That way we don't even need to maintain the current memory, instead we update the peak memory after each acquireExecutionMemory call.
Updated the code, please take another look
|
I have not looked at this PR in detail, but we already have |
Hi @mridulm I think we have executor level on/off heap execution memory metrics, but not task/stage level. (I might be wrong...feel free to point me to relevant code path) |
|
seeing this test failure Doesn't seem to be relevant... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a look at peakExecutionMemory within spark-core.
We should be exposing the new metrics as part of the api - both at task level, and at stage level (distributions for ex).
| /** | ||
| * Peak off heap execution memory as tracked by TaskMemoryManager. | ||
| */ | ||
| def peakOffHeapExecutionMemory: Long = _peakOffHeapExecutionMemory.sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discuss:
Is it required that peakExecutionMemory <= peakOnHeapExecutionMemory + peakOffHeapExecutionMemory ?
Any cases where this might get violated ?
I am trying to reason about completeness of these metrics (given we want to eventually deprecate the existing one).
I expect the above to hold, but want to make sure I am not missing anything.
+CC @JoshRosen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
peakExecutionMemory <= peakOnHeapExecutionMemory + peakOffHeapExecutionMemory?
I think yes, becauseTaskMemoryManager.acquireExecutionMemory is the only narrow waist for any execution memory acquisition and we maintain the memory here.
Instead, the legacy peakExecutionMemory is maintained in some operators (join, agg, sort), which is totally up to operator implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I agree that the peakExecutionMemory <= peakOnHeapExecutionMemory + peakOffHeapExecutionMemory should hold:
If we trace through the existing callers of incPeakExecutionMemory it looks like all of the usages flow from counts that correspond to the acquireExecutionMemory waist.
We should definitely expose this to api. But can we land this core change and then make API/UI changes in follow PRs? Actually I've created a sub task https://issues.apache.org/jira/browse/SPARK-48788 for that |
|
Hi @JoshRosen @mridulm do you mind taking another look at this PR? |
|
LGTM |
|
Merged to master, thanks @mridulm @JoshRosen @Ngone51 for review! |
| */ | ||
| // TODO: SPARK-48789: the naming is confusing since this does not really reflect the whole | ||
| // execution memory. We'd better deprecate this once we have a replacement. | ||
| def peakExecutionMemory: Long = _peakExecutionMemory.sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we change its implementation to be peakOnHeapExecutionMemory + peakOffHeapExecutionMemory? The current implementation doesn't make much sense due to https://github.com/apache/spark/pull/47192/files#r1692144786
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think we can plan this breaking change at spark 4.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
peakOnHeapExecutionMemory, peakOffHeapExecutionMemory can peak at different times, so we can't replace it with the sum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, makes sense. Let's leave it then.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @liuzqt , @JoshRosen, @cloud-fan , @jiangxb1987 , @Ngone51 , @mridulm .
This commit seems to cause a regression in some cases.
Specifically, ExternalAppendOnlyUnsafeRowArrayBenchmark is severely affected by this commit and is failing until now in CIs because it's almost hung.
I also verified that it's the same locally.
|
Although I tried to do a follow-up PR to provide a quick fix at I created a reverting PR for now. It would be great if this PR lands again properly with the relevant micro-benchmark results. Otherwise, any follow-up PR with |
|
@dongjoon-hyun thanks for the catch, I'll investigate the regression and try to fix it. |
### What changes were proposed in this pull request? This reverts commit 717a6da. ### Why are the changes needed? To fix a performance regression. During the regular performance audit, - #47743 `ExternalAppendOnlyUnsafeRowArrayBenchmark` detected a performance regression caused by SPARK-48626. - #47192 ### Does this PR introduce _any_ user-facing change? No. This is not released yet. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47747 from dongjoon-hyun/SPARK-48628. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
|
Thank you so much, @liuzqt ! |
|
Thanks for the details @dongjoon-hyun ! |
### What changes were proposed in this pull request? Add task on/off heap execution memory in `TaskMetrics`, tracked in `TaskMemoryManager`, **assuming `acquireExecutionMemory` is the only one narrow waist for acquiring execution memory.** ### Why are the changes needed? Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory](https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114) metrics, however, the semantic is a confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators and thus not really reflect the real execution memory. Therefore it's necessary to add these two metrics. Also I created two followup sub tickets: - https://issues.apache.org/jira/browse/SPARK-48788 : accumulate task metrics in stage, and display in Spark UI - https://issues.apache.org/jira/browse/SPARK-48789 : deprecate `peakExecutionMemory` once we have replacement for it. The ultimate goal is to have these two metrics ready (as accumulated stage metrics in Spark UI as well) and deprecate `peakExecutionMemory`. ### Does this PR introduce _any_ user-facing change? Supposedly no. But two followup sub tickets will have user-facing change: new metrics exposed to Spark UI, and old metrics deprecation. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? NO Closes apache#47192 from liuzqt/SPARK-48628. Authored-by: Ziqi Liu <[email protected]> Signed-off-by: Xingbo Jiang <[email protected]>
### What changes were proposed in this pull request? This PR is trying to revive #47192, which was [reverted](#47747) due to regression in `ExternalAppendOnlyUnsafeRowArrayBenchmark`. **Root cause** We eventually decided to aggregate peak memory usage from all consumers on each `acquireExecutionMemory` invocation. (see [this discussion](#47192 (comment))), which is O(n) complexity where `n` is the number of consumers. `ExternalAppendOnlyUnsafeRowArrayBenchmark` is implemented in a way that all iterations are run in a single task context, therefore the number of consumers is exploding. Notice that `TaskMemoryManager.consumers` is never cleaned up the whole lifecycle, and `TaskMemoryManager.acquireExecutionMemory` is a very frequent operation, doing a linear complexity(in terms of number of consumers) operation here might not be a good choice. This benchmark might be a corner case, but it's still possible to have a large number of consumers in a large query plan. I fallback to the previous implementation: maintain current execution memory with an extra lock. cc Ngone51 #### Benchmark result [ExternalAppendOnlyUnsafeRowArrayBenchmark-results](https://github.com/liuzqt/spark/actions/runs/10415213026) [ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk21-results](https://github.com/liuzqt/spark/actions/runs/10414246805) ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? NO Closes #47776 from liuzqt/SPARK-48628. Authored-by: Ziqi Liu <[email protected]> Signed-off-by: Josh Rosen <[email protected]>
### What changes were proposed in this pull request? This PR is trying to revive apache/spark#47192, which was [reverted](apache/spark#47747) due to regression in `ExternalAppendOnlyUnsafeRowArrayBenchmark`. **Root cause** We eventually decided to aggregate peak memory usage from all consumers on each `acquireExecutionMemory` invocation. (see [this discussion](apache/spark#47192 (comment))), which is O(n) complexity where `n` is the number of consumers. `ExternalAppendOnlyUnsafeRowArrayBenchmark` is implemented in a way that all iterations are run in a single task context, therefore the number of consumers is exploding. Notice that `TaskMemoryManager.consumers` is never cleaned up the whole lifecycle, and `TaskMemoryManager.acquireExecutionMemory` is a very frequent operation, doing a linear complexity(in terms of number of consumers) operation here might not be a good choice. This benchmark might be a corner case, but it's still possible to have a large number of consumers in a large query plan. I fallback to the previous implementation: maintain current execution memory with an extra lock. cc Ngone51 #### Benchmark result [ExternalAppendOnlyUnsafeRowArrayBenchmark-results](https://github.com/liuzqt/spark/actions/runs/10415213026) [ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk21-results](https://github.com/liuzqt/spark/actions/runs/10414246805) ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? NO Closes #47776 from liuzqt/SPARK-48628. Authored-by: Ziqi Liu <[email protected]> Signed-off-by: Josh Rosen <[email protected]>
What changes were proposed in this pull request?
Add task on/off heap execution memory in
TaskMetrics, tracked inTaskMemoryManager, assumingacquireExecutionMemoryis the only one narrow waist for acquiring execution memory.Why are the changes needed?
Currently there is no task on/off heap execution memory metrics.
There is a peakExecutionMemory metrics, however, the semantic is a confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators and thus not really reflect the real execution memory.
Therefore it's necessary to add these two metrics.
Also I created two followup sub tickets:
peakExecutionMemoryonce we have replacement for it.The ultimate goal is to have these two metrics ready (as accumulated stage metrics in Spark UI as well) and deprecate
peakExecutionMemory.Does this PR introduce any user-facing change?
Supposedly no. But two followup sub tickets will have user-facing change: new metrics exposed to Spark UI, and old metrics deprecation.
How was this patch tested?
new test
Was this patch authored or co-authored using generative AI tooling?
NO