Skip to content

Commit d9ea5af

Browse files
committed
Address comments
1 parent c3b009f commit d9ea5af

File tree

3 files changed

+17
-6
lines changed

3 files changed

+17
-6
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public String toString() {
7070
.add("shuffleId", shuffleId)
7171
.add("mapIds", Arrays.toString(mapIds))
7272
.add("reduceIds", Arrays.deepToString(reduceIds))
73+
.add("batchFetchEnabled", batchFetchEnabled)
7374
.toString();
7475
}
7576

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ class NettyBlockRpcServer(
7373
} else {
7474
val startAndEndId = fetchShuffleBlocks.reduceIds(index)
7575
if (startAndEndId.length != 2) {
76-
throw new IllegalStateException(s"Invalid shuffle fetch request: " +
77-
s"$fetchShuffleBlocks")
76+
throw new IllegalStateException(s"Invalid shuffle fetch request when batch mode " +
77+
s"is enabled: $fetchShuffleBlocks")
7878
}
7979
Array(blockManager.getBlockData(
8080
ShuffleBlockBatchId(

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import org.apache.spark._
2121
import org.apache.spark.internal.{config, Logging}
2222
import org.apache.spark.io.CompressionCodec
2323
import org.apache.spark.serializer.SerializerManager
24-
import org.apache.spark.shuffle.sort.BypassMergeSortShuffleHandle
2524
import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
2625
import org.apache.spark.util.CompletionIterator
2726
import org.apache.spark.util.collection.ExternalSorter
@@ -48,10 +47,21 @@ private[spark] class BlockStoreShuffleReader[K, C](
4847
val compressed = conf.get(config.SHUFFLE_COMPRESS)
4948
val featureEnabled = conf.get(config.SHUFFLE_FETCH_CONTINUOUS_BLOCKS_IN_BATCH)
5049
val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects
50+
val codecConcatenation = if (compressed) {
51+
CompressionCodec.supportsConcatenationOfSerializedStreams(CompressionCodec.createCodec(conf))
52+
} else {
53+
true
54+
}
5155

52-
featureEnabled && endPartition - startPartition > 1 &&
53-
serializerRelocatable && (!compressed || CompressionCodec
54-
.supportsConcatenationOfSerializedStreams(CompressionCodec.createCodec(conf)))
56+
val res = featureEnabled && endPartition - startPartition > 1 &&
57+
serializerRelocatable && (!compressed || codecConcatenation)
58+
if (featureEnabled && !res) {
59+
logWarning("The feature tag of continuous shuffle block fetching is set to true, but " +
60+
"we can not enable the feature because other conditions are not satisfied. " +
61+
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
62+
s"codec concatenation: $codecConcatenation")
63+
}
64+
res
5565
}
5666

5767
/** Read the combined key-values for this reduce task */

0 commit comments

Comments
 (0)