Skip to content

Commit bbf585d

Browse files
committed
merge the range reader and split the mappers based on data size
1 parent 5f56f89 commit bbf585d

File tree

10 files changed

+137
-175
lines changed

10 files changed

+137
-175
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 29 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -343,30 +343,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
343343
/**
344344
* Called from executors to get the server URIs and output sizes for each shuffle block that
345345
* needs to be read from a given range of map output partitions (startPartition is included but
346-
* endPartition is excluded from the range) and is produced by a specific mapper.
346+
* endPartition is excluded from the range) and is produced by
347+
* a range mapper (startMapId, endMapId, startMapId is included and the endMapId is excluded).
347348
*
348349
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
349350
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
350351
* tuples describing the shuffle blocks that are stored at that block manager.
351352
*/
352-
def getMapSizesByMapIndex(
353+
def getMapSizesByRange(
353354
shuffleId: Int,
354-
mapIndex: Int,
355355
startPartition: Int,
356-
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
357-
358-
/**
359-
* Called from executors to get the server URIs and output sizes for each shuffle block that
360-
* needs to be read from a specific map output partitions (partitionIndex) and is
361-
* produced by a range mapper (startMapId, endMapId)
362-
*
363-
* @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
364-
* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
365-
* tuples describing the shuffle blocks that are stored at that block manager.
366-
*/
367-
def getMapSizesByRangeMapIndex(
368-
shuffleId: Int,
369-
partitionIndex: Int,
356+
endPartition: Int,
370357
startMapId: Int,
371358
endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
372359

@@ -767,44 +754,25 @@ private[spark] class MapOutputTrackerMaster(
767754
}
768755
}
769756

770-
override def getMapSizesByMapIndex(
771-
shuffleId: Int,
772-
mapIndex: Int,
773-
startPartition: Int,
774-
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
775-
logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" +
776-
s"partitions $startPartition-$endPartition")
777-
shuffleStatuses.get(shuffleId) match {
778-
case Some (shuffleStatus) =>
779-
shuffleStatus.withMapStatuses { statuses =>
780-
MapOutputTracker.convertMapStatuses(
781-
shuffleId,
782-
startPartition,
783-
endPartition,
784-
statuses,
785-
Some(mapIndex))
786-
}
787-
case None =>
788-
Iterator.empty
789-
}
790-
}
791-
792-
override def getMapSizesByRangeMapIndex(
793-
shuffleId: Int,
794-
partitionIndex: Int,
795-
startMapId: Int,
796-
endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
757+
override def getMapSizesByRange(
758+
shuffleId: Int,
759+
startPartition: Int,
760+
endPartition: Int,
761+
startMapId: Int,
762+
endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
797763
shuffleStatuses.get(shuffleId) match {
798764
case Some(shuffleStatus) =>
799765
shuffleStatus.withMapStatuses { statuses =>
800766
MapOutputTracker.convertMapStatuses(
801-
shuffleId, partitionIndex, statuses, startMapId, endMapId)
767+
shuffleId, startPartition, endPartition, statuses, startMapId, endMapId)
802768
}
803769
case None =>
804770
Iterator.empty
805771
}
806772
}
807773

774+
775+
808776
override def stop(): Unit = {
809777
mapOutputRequests.offer(PoisonPill)
810778
threadpool.shutdown()
@@ -850,33 +818,16 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
850818
}
851819
}
852820

853-
override def getMapSizesByMapIndex(
821+
override def getMapSizesByRange(
854822
shuffleId: Int,
855-
mapIndex: Int,
856823
startPartition: Int,
857-
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
858-
logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" +
859-
s"partitions $startPartition-$endPartition")
860-
val statuses = getStatuses(shuffleId, conf)
861-
try {
862-
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition,
863-
statuses, Some(mapIndex))
864-
} catch {
865-
case e: MetadataFetchFailedException =>
866-
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
867-
mapStatuses.clear()
868-
throw e
869-
}
870-
}
871-
872-
override def getMapSizesByRangeMapIndex(
873-
shuffleId: Int,
874-
partitionIndex: Int,
824+
endPartition: Int,
875825
startMapId: Int,
876826
endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
877827
val statuses = getStatuses(shuffleId, conf)
878828
try {
879-
MapOutputTracker.convertMapStatuses(shuffleId, partitionIndex, statuses, startMapId, endMapId)
829+
MapOutputTracker.convertMapStatuses(
830+
shuffleId, startPartition, endPartition, statuses, startMapId, endMapId)
880831
} catch {
881832
case e: MetadataFetchFailedException =>
882833
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
@@ -1069,7 +1020,7 @@ private[spark] object MapOutputTracker extends Logging {
10691020
}
10701021

10711022
/**
1072-
* Given an array of map statuses, a specific map output partitions and a range
1023+
* Given an array of map statuses, a range map output partitions and a range
10731024
* mappers (startMapId, endMapId),returns a sequence that, for each block manager ID,
10741025
* lists the shuffle block IDs and corresponding shuffle
10751026
* block sizes stored at that block manager.
@@ -1079,7 +1030,8 @@ private[spark] object MapOutputTracker extends Logging {
10791030
* throws a FetchFailedException.
10801031
*
10811032
* @param shuffleId Identifier for the shuffle
1082-
* @param partitionIndex Specific of map output partition ID
1033+
* @param startPartition Start map output partition ID
1034+
* @param endPartition End map output partition ID
10831035
* @param statuses List of map statuses, indexed by map partition index.
10841036
* @param startMapId Start Map ID
10851037
* @param endMapId End map ID
@@ -1089,7 +1041,8 @@ private[spark] object MapOutputTracker extends Logging {
10891041
*/
10901042
def convertMapStatuses(
10911043
shuffleId: Int,
1092-
partitionIndex: Int,
1044+
startPartition: Int,
1045+
endPartition: Int,
10931046
statuses: Array[MapStatus],
10941047
startMapId: Int,
10951048
endMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
@@ -1100,12 +1053,14 @@ private[spark] object MapOutputTracker extends Logging {
11001053
if (status == null) {
11011054
val errorMessage = s"Missing an output location for shuffle $shuffleId"
11021055
logError(errorMessage)
1103-
throw new MetadataFetchFailedException(shuffleId, partitionIndex, errorMessage)
1056+
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
11041057
} else {
1105-
val size = status.getSizeForBlock(partitionIndex)
1106-
if (size != 0) {
1107-
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
1108-
((ShuffleBlockId(shuffleId, status.mapId, partitionIndex), size, mapIndex))
1058+
for (part <- startPartition until endPartition) {
1059+
val size = status.getSizeForBlock(part)
1060+
if (size != 0) {
1061+
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
1062+
((ShuffleBlockId(shuffleId, status.mapId, part), size, mapIndex))
1063+
}
11091064
}
11101065
}
11111066
}

core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,16 @@ private[spark] trait ShuffleManager {
5555
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
5656

5757
/**
58-
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive)
59-
* that are produced by one specific mapper. Called on executors by reduce tasks.
60-
*/
61-
def getReaderForOneMapper[K, C](
62-
handle: ShuffleHandle,
63-
mapIndex: Int,
64-
startPartition: Int,
65-
endPartition: Int,
66-
context: TaskContext,
67-
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
68-
69-
/**
70-
* Get a reader for the specific partitionIndex in map output statistics that are
71-
* produced by range mappers. Called on executors by reduce tasks.
58+
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
59+
* read from map output (startMapId to endMapId - 1, inclusive).
60+
* Called on executors by reduce tasks.
7261
*/
73-
def getReaderForRangeMapper[K, C](
62+
def getReaderForRange[K, C](
7463
handle: ShuffleHandle,
75-
partitionIndex: Int,
7664
startMapId: Int,
7765
endMapId: Int,
66+
startPartition: Int,
67+
endPartition: Int,
7868
context: TaskContext,
7969
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
8070

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,32 +131,20 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
131131
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
132132
}
133133

134-
override def getReaderForOneMapper[K, C](
134+
override def getReaderForRange[K, C](
135135
handle: ShuffleHandle,
136-
mapIndex: Int,
136+
startMapId: Int,
137+
endMapId: Int,
137138
startPartition: Int,
138139
endPartition: Int,
139140
context: TaskContext,
140141
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
141-
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByMapIndex(
142-
handle.shuffleId, mapIndex, startPartition, endPartition)
142+
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange(
143+
handle.shuffleId, startPartition, endPartition, startMapId, endMapId)
143144
new BlockStoreShuffleReader(
144145
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
145146
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
146-
}
147147

148-
override def getReaderForRangeMapper[K, C](
149-
handle: ShuffleHandle,
150-
partitionIndex: Int,
151-
startMapId: Int,
152-
endMapId: Int,
153-
context: TaskContext,
154-
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
155-
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRangeMapIndex(
156-
handle.shuffleId, partitionIndex, startMapId, endMapId)
157-
new BlockStoreShuffleReader(
158-
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
159-
shouldBatchFetch = canUseBatchFetch(partitionIndex, partitionIndex + 1, context))
160148
}
161149

162150
/** Get a writer for a given partition. Called on executors by map tasks. */

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,13 +423,6 @@ object SQLConf {
423423
.longConf
424424
.createWithDefault(64 * 1024 * 1024L)
425425

426-
val ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS =
427-
buildConf("spark.sql.adaptive.skewedPartitionMaxSplits")
428-
.doc("Configures the maximum number of task to handle a skewed partition in adaptive skewed" +
429-
"join.")
430-
.intConf
431-
.createWithDefault(5)
432-
433426
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
434427
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
435428
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
@@ -2213,8 +2206,6 @@ class SQLConf extends Serializable with Logging {
22132206
def adaptiveSkewedSizeThreshold: Long =
22142207
getConf(ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
22152208

2216-
def adaptiveSkewedMaxSplits: Int = getConf(ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS)
2217-
22182209
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
22192210

22202211
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ class LocalShuffledRowRDD(
8080
// as well as the `tempMetrics` for basic shuffle metrics.
8181
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
8282

83-
val reader = SparkEnv.get.shuffleManager.getReaderForOneMapper(
83+
val reader = SparkEnv.get.shuffleManager.getReaderForRange(
8484
dependency.shuffleHandle,
8585
mapIndex,
86+
mapIndex + 1,
8687
0,
8788
numReducers,
8889
context,

0 commit comments

Comments
 (0)