Skip to content

Commit 169415f

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-30025][CORE] Continuous shuffle block fetching should be disabled by default when the old fetch protocol is used
### What changes were proposed in this pull request? Disable continuous shuffle block fetching when the old fetch protocol in use. ### Why are the changes needed? The new feature of continuous shuffle block fetching depends on the latest version of the shuffle fetch protocol. We should keep this constraint in `BlockStoreShuffleReader.fetchContinuousBlocksInBatch`. ### Does this PR introduce any user-facing change? Users will not get the exception related to continuous shuffle block fetching when old version of the external shuffle service is used. ### How was this patch tested? Existing UT. Closes #26663 from xuanyuanking/SPARK-30025. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 03ac1b7 commit 169415f

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,12 @@ class NettyBlockRpcServer(
5656
message match {
5757
case openBlocks: OpenBlocks =>
5858
val blocksNum = openBlocks.blockIds.length
59-
val blocks = for (i <- (0 until blocksNum).view)
60-
yield blockManager.getLocalBlockData(BlockId.apply(openBlocks.blockIds(i)))
59+
val blocks = (0 until blocksNum).map { i =>
60+
val blockId = BlockId.apply(openBlocks.blockIds(i))
61+
assert(!blockId.isInstanceOf[ShuffleBlockBatchId],
62+
"Continuous shuffle block fetching only works for new fetch protocol.")
63+
blockManager.getLocalBlockData(blockId)
64+
}
6165
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava,
6266
client.getChannel)
6367
logTrace(s"Registered streamId $streamId with $blocksNum buffers")

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
5050
} else {
5151
true
5252
}
53+
val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
5354

5455
val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
55-
(!compressed || codecConcatenation)
56+
(!compressed || codecConcatenation) && !useOldFetchProtocol
5657
if (shouldBatchFetch && !doBatchFetch) {
5758
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
5859
"we can not enable the feature because other conditions are not satisfied. " +
5960
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
60-
s"codec concatenation: $codecConcatenation.")
61+
s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " +
62+
s"$useOldFetchProtocol.")
6163
}
6264
doBatchFetch
6365
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,8 @@ object SQLConf {
367367
"reduce IO and improve performance. Note, multiple continuous blocks exist in single " +
368368
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
369369
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this feature also depends " +
370-
"on a relocatable serializer and the concatenation support codec in use.")
370+
"on a relocatable serializer, the concatenation support codec in use and the new version" +
371+
"shuffle fetch protocol.")
371372
.booleanConf
372373
.createWithDefault(true)
373374

0 commit comments

Comments
 (0)