Skip to content

Commit 082b6a1

Browse files
committed
[SPARK-27736][Core][SHUFFLE] Improve handling of FetchFailures caused by ExternalShuffleService losing track of executor registrations
1 parent 0a70951 commit 082b6a1

File tree

7 files changed

+168
-7
lines changed

7 files changed

+168
-7
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ protected void handleMessage(
150150
int numRemovedBlocks = blockManager.removeBlocks(msg.appId, msg.execId, msg.blockIds);
151151
callback.onSuccess(new BlocksRemoved(numRemovedBlocks).toByteBuffer());
152152

153+
} else if (msgObj instanceof AreExecutorsRegistered) {
154+
AreExecutorsRegistered msg = (AreExecutorsRegistered) msgObj;
155+
checkAuth(client, msg.appId);
156+
callback.onSuccess(blockManager.areExecutorsRegistered(msg.appId, msg.execIds));
157+
153158
} else {
154159
throw new UnsupportedOperationException("Unexpected message: " + msgObj);
155160
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ public void fetchBlocks(
128128
}
129129
}
130130

131+
public ByteBuffer queryExecutorStatus(
132+
String host,
133+
int port,
134+
String[] execIds) throws IOException, InterruptedException{
135+
checkInit();
136+
logger.debug("Query executor statuses in External shuffle service from {}:{}", host, port);
137+
TransportClient client = clientFactory.createClient(host, port);
138+
ByteBuffer queryExecutorStatusMsg = new AreExecutorsRegistered(appId, execIds).toByteBuffer();
139+
int timeout = conf.connectionTimeoutMs() + conf.ioRetryWaitTimeMs();
140+
return client.sendRpcSync(queryExecutorStatusMsg, timeout);
141+
}
142+
131143
@Override
132144
public MetricSet shuffleMetrics() {
133145
checkInit();

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.network.shuffle;
1919

2020
import java.io.*;
21+
import java.nio.ByteBuffer;
2122
import java.nio.charset.StandardCharsets;
2223
import java.util.*;
2324
import java.util.concurrent.ConcurrentMap;
@@ -164,6 +165,19 @@ public void registerExecutor(
164165
executors.put(fullId, executorInfo);
165166
}
166167

168+
/**
169+
* Judge whether these executors are registered.
170+
*/
171+
public ByteBuffer areExecutorsRegistered(String appId, String[] execIds) {
172+
byte[] result = new byte[execIds.length];
173+
for (int i = 0; i< execIds.length; i++) {
174+
if(executors.containsKey(new AppExecId(appId, execIds[i]))) {
175+
result[i] = 1;
176+
}
177+
}
178+
return ByteBuffer.wrap(result);
179+
}
180+
167181
/**
168182
* Obtains a FileSegmentManagedBuffer from a single block (shuffleId, mapId, reduceId).
169183
*/
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle.protocol;
19+
20+
import com.google.common.base.Objects;
21+
import io.netty.buffer.ByteBuf;
22+
import org.apache.spark.network.protocol.Encoders;
23+
24+
import java.util.Arrays;
25+
26+
// Needed by ScalaDoc. See SPARK-7726
27+
28+
/** Request to query whether these executors are registered. */
29+
public class AreExecutorsRegistered extends BlockTransferMessage {
30+
public final String appId;
31+
public final String[] execIds;
32+
33+
public AreExecutorsRegistered(String appId, String[] execIds) {
34+
this.appId = appId;
35+
this.execIds = execIds;
36+
}
37+
38+
@Override
39+
protected Type type() { return Type.ARE_EXECUTORS_REGISTERED; }
40+
41+
@Override
42+
public int hashCode() {
43+
return Objects.hashCode(appId) * 41 + Arrays.hashCode(execIds);
44+
}
45+
46+
@Override
47+
public String toString() {
48+
return Objects.toStringHelper(this)
49+
.add("appId", appId)
50+
.add("execIds", Arrays.toString(execIds))
51+
.toString();
52+
}
53+
54+
@Override
55+
public boolean equals(Object other) {
56+
if (other != null && other instanceof AreExecutorsRegistered) {
57+
AreExecutorsRegistered o = (AreExecutorsRegistered) other;
58+
return Objects.equal(appId, o.appId)
59+
&& Arrays.equals(execIds, o.execIds);
60+
}
61+
return false;
62+
}
63+
64+
@Override
65+
public int encodedLength() {
66+
return Encoders.Strings.encodedLength(appId)
67+
+ Encoders.StringArrays.encodedLength(execIds);
68+
}
69+
70+
@Override
71+
public void encode(ByteBuf buf) {
72+
Encoders.Strings.encode(buf, appId);
73+
Encoders.StringArrays.encode(buf, execIds);
74+
}
75+
76+
public static AreExecutorsRegistered decode(ByteBuf buf) {
77+
String appId = Encoders.Strings.decode(buf);
78+
String[] execIds = Encoders.StringArrays.decode(buf);
79+
return new AreExecutorsRegistered(appId, execIds);
80+
}
81+
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public abstract class BlockTransferMessage implements Encodable {
4747
public enum Type {
4848
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
4949
HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8),
50-
FETCH_SHUFFLE_BLOCKS(9);
50+
FETCH_SHUFFLE_BLOCKS(9), ARE_EXECUTORS_REGISTERED(10);
5151

5252
private final byte id;
5353

@@ -76,6 +76,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
7676
case 7: return RemoveBlocks.decode(buf);
7777
case 8: return BlocksRemoved.decode(buf);
7878
case 9: return FetchShuffleBlocks.decode(buf);
79+
case 10: return AreExecutorsRegistered.decode(buf);
7980
default: throw new IllegalArgumentException("Unknown message type: " + type);
8081
}
8182
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,15 @@ private class ShuffleStatus(numPartitions: Int) {
152152
removeOutputsByFilter(x => x.executorId == execId)
153153
}
154154

155+
/**
156+
* Removes all map outputs associated with these specified executors. Note that this will also
157+
* remove outputs which are served by an external shuffle server (if one exists), as they are
158+
* still registered with these execIds.
159+
*/
160+
def removeOutputsOnExecutors(execIds: Seq[String]): Unit = withWriteLock {
161+
removeOutputsByFilter(x => execIds.contains(x.executorId))
162+
}
163+
155164
/**
156165
* Removes all shuffle outputs which satisfies the filter. Note that this will also
157166
* remove outputs which are served by an external shuffle server (if one exists).
@@ -518,6 +527,14 @@ private[spark] class MapOutputTrackerMaster(
518527
incrementEpoch()
519528
}
520529

530+
/**
531+
* Get all executors registered on this host.
532+
*/
533+
def getExecutorsRegisteredOnHost(host: String): Array[String] = {
534+
shuffleStatuses.valuesIterator.flatMap(_.mapStatuses.map(_.location).filter(_ == host)
535+
.map(_.executorId)).toSet.toArray
536+
}
537+
521538
/**
522539
* Removes all shuffle outputs associated with this executor. Note that this will also remove
523540
* outputs which are served by an external shuffle server (if one exists), as they are still
@@ -528,6 +545,16 @@ private[spark] class MapOutputTrackerMaster(
528545
incrementEpoch()
529546
}
530547

548+
/**
549+
* Removes all shuffle outputs associated with these executors. Note that this will also remove
550+
* outputs which are served by an external shuffle server (if one exists), as they are still
551+
* registered with these execIds.
552+
*/
553+
def removeOutputsOnExecutors(execIds: Seq[String]): Unit = {
554+
shuffleStatuses.valuesIterator.foreach{ _.removeOutputsOnExecutors(execIds) }
555+
incrementEpoch()
556+
}
557+
531558
/** Check if the given shuffle is being tracked */
532559
def containsShuffle(shuffleId: Int): Boolean = shuffleStatuses.contains(shuffleId)
533560

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3535
import org.apache.spark.internal.Logging
3636
import org.apache.spark.internal.config
3737
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
38+
import org.apache.spark.network.shuffle.ExternalBlockStoreClient
3839
import org.apache.spark.network.util.JavaUtils
3940
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
4041
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
@@ -1668,8 +1669,9 @@ private[spark] class DAGScheduler(
16681669

16691670
// TODO: mark the executor as failed only if there were lots of fetch failures on it
16701671
if (bmAddress != null) {
1672+
val executorNotRegistered = failureMessage.contains("Executor is not registered")
16711673
val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled &&
1672-
unRegisterOutputOnHostOnFetchFailure) {
1674+
(unRegisterOutputOnHostOnFetchFailure || executorNotRegistered)) {
16731675
// We had a fetch failure with the external shuffle service, so we
16741676
// assume all shuffle data on the node is bad.
16751677
Some(bmAddress.host)
@@ -1682,7 +1684,8 @@ private[spark] class DAGScheduler(
16821684
execId = bmAddress.executorId,
16831685
fileLost = true,
16841686
hostToUnregisterOutputs = hostToUnregisterOutputs,
1685-
maybeEpoch = Some(task.epoch))
1687+
maybeEpoch = Some(task.epoch),
1688+
unRegisterOutputOnHostOnFetchFailure)
16861689
}
16871690
}
16881691

@@ -1838,18 +1841,36 @@ private[spark] class DAGScheduler(
18381841
execId: String,
18391842
fileLost: Boolean,
18401843
hostToUnregisterOutputs: Option[String],
1841-
maybeEpoch: Option[Long] = None): Unit = {
1844+
maybeEpoch: Option[Long] = None,
1845+
unRegisterOutputOnHost: Boolean = false): Unit = {
18421846
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
18431847
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
18441848
failedEpoch(execId) = currentEpoch
18451849
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
18461850
blockManagerMaster.removeExecutor(execId)
18471851
if (fileLost) {
1848-
hostToUnregisterOutputs match {
1849-
case Some(host) =>
1852+
(hostToUnregisterOutputs, unRegisterOutputOnHost) match {
1853+
case (Some(host), false) =>
1854+
val execIdsOnHost = mapOutputTracker.getExecutorsRegisteredOnHost(host)
1855+
val port = SparkEnv.get.conf.get(config.SHUFFLE_SERVICE_PORT)
1856+
try {
1857+
val status = SparkEnv.get.blockManager.blockStoreClient
1858+
.asInstanceOf[ExternalBlockStoreClient]
1859+
.queryExecutorStatus(host, port, execIdsOnHost).array()
1860+
if (status.size == execIdsOnHost.size) {
1861+
val execIdsNotRegistered = execIdsOnHost.zip(status).filter(_._2 == 0).map(_._1)
1862+
logInfo("Shuffle files lost for executors: %s (epoch %d)"
1863+
.format(execIdsNotRegistered.toString, currentEpoch))
1864+
mapOutputTracker.removeOutputsOnExecutors(execIdsNotRegistered)
1865+
}
1866+
} catch {
1867+
case e: Exception =>
1868+
logInfo("Exception thrown when querying executor status", e)
1869+
}
1870+
case (Some(host), true) =>
18501871
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
18511872
mapOutputTracker.removeOutputsOnHost(host)
1852-
case None =>
1873+
case (None, _) =>
18531874
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
18541875
mapOutputTracker.removeOutputsOnExecutor(execId)
18551876
}

0 commit comments

Comments
 (0)