Skip to content

Commit de1dae6

Browse files
committed
HDDS-726. Ozone Client should update SCM to move the container out of allocation path in case a write transaction fails. Contributed by Shashikant Banerjee.
1 parent 80b77de commit de1dae6

File tree

40 files changed

+872
-240
lines changed

40 files changed

+872
-240
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,14 @@ public ContainerCommandResponseProto sendCommand(
214214

215215
@Override
216216
public XceiverClientReply sendCommand(
217-
ContainerCommandRequestProto request, List<UUID> excludeDns)
217+
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
218218
throws IOException {
219219
Preconditions.checkState(HddsUtils.isReadOnly(request));
220220
return sendCommandWithRetry(request, excludeDns);
221221
}
222222

223223
private XceiverClientReply sendCommandWithRetry(
224-
ContainerCommandRequestProto request, List<UUID> excludeDns)
224+
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
225225
throws IOException {
226226
ContainerCommandResponseProto responseProto = null;
227227

@@ -231,24 +231,24 @@ private XceiverClientReply sendCommandWithRetry(
231231
// TODO: cache the correct leader info in here, so that any subsequent calls
232232
// should first go to leader
233233
List<DatanodeDetails> dns = pipeline.getNodes();
234-
DatanodeDetails datanode = null;
235234
List<DatanodeDetails> healthyDns =
236235
excludeDns != null ? dns.stream().filter(dnId -> {
237-
for (UUID excludeId : excludeDns) {
238-
if (dnId.getUuid().equals(excludeId)) {
236+
for (DatanodeDetails excludeId : excludeDns) {
237+
if (dnId.equals(excludeId)) {
239238
return false;
240239
}
241240
}
242241
return true;
243242
}).collect(Collectors.toList()) : dns;
243+
XceiverClientReply reply = new XceiverClientReply(null);
244244
for (DatanodeDetails dn : healthyDns) {
245245
try {
246246
LOG.debug("Executing command " + request + " on datanode " + dn);
247247
// In case the command gets retried on a 2nd datanode,
248248
// sendCommandAsyncCall will create a new channel and async stub
249249
// in case these don't exist for the specific datanode.
250+
reply.addDatanode(dn);
250251
responseProto = sendCommandAsync(request, dn).getResponse().get();
251-
datanode = dn;
252252
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
253253
break;
254254
}
@@ -264,8 +264,8 @@ private XceiverClientReply sendCommandWithRetry(
264264
}
265265

266266
if (responseProto != null) {
267-
return new XceiverClientReply(
268-
CompletableFuture.completedFuture(responseProto), datanode.getUuid());
267+
reply.setResponse(CompletableFuture.completedFuture(responseProto));
268+
return reply;
269269
} else {
270270
throw new IOException(
271271
"Failed to execute command " + request + " on the pipeline "
@@ -382,11 +382,11 @@ private void reconnect(DatanodeDetails dn, String encodedToken)
382382
}
383383

384384
@Override
385-
public long watchForCommit(long index, long timeout)
385+
public XceiverClientReply watchForCommit(long index, long timeout)
386386
throws InterruptedException, ExecutionException, TimeoutException,
387387
IOException {
388388
// there is no notion of watch for commit index in standalone pipeline
389-
return 0;
389+
return null;
390390
};
391391

392392
public long getReplicatedMinCommitIndex() {

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.hadoop.hdds.scm;
2020

21-
import com.google.common.base.Preconditions;
2221
import org.apache.hadoop.hdds.HddsUtils;
22+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2323
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2424
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
2525

@@ -59,6 +59,7 @@
5959
import java.util.concurrent.TimeoutException;
6060
import java.util.concurrent.atomic.AtomicReference;
6161
import java.util.concurrent.ConcurrentHashMap;
62+
import java.util.stream.Collectors;
6263

6364
/**
6465
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -91,7 +92,7 @@ public static XceiverClientRatis newXceiverClientRatis(
9192
private final GrpcTlsConfig tlsConfig;
9293

9394
// Map to track commit index at every server
94-
private final ConcurrentHashMap<String, Long> commitInfoMap;
95+
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
9596

9697
// create a separate RaftClient for watchForCommit API
9798
private RaftClient watchClient;
@@ -118,15 +119,16 @@ private void updateCommitInfosMap(
118119
// of the servers
119120
if (commitInfoMap.isEmpty()) {
120121
commitInfoProtos.forEach(proto -> commitInfoMap
121-
.put(proto.getServer().getAddress(), proto.getCommitIndex()));
122+
.put(RatisHelper.toDatanodeId(proto.getServer()),
123+
proto.getCommitIndex()));
122124
// In case the commit is happening 2 way, just update the commitIndex
123125
// for the servers which have been successfully updating the commit
124126
// indexes. This is important because getReplicatedMinCommitIndex()
125127
// should always return the min commit index out of the nodes which have
126128
// been replicating data successfully.
127129
} else {
128130
commitInfoProtos.forEach(proto -> commitInfoMap
129-
.computeIfPresent(proto.getServer().getAddress(),
131+
.computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
130132
(address, index) -> {
131133
index = proto.getCommitIndex();
132134
return index;
@@ -218,15 +220,23 @@ public long getReplicatedMinCommitIndex() {
218220
return minIndex.isPresent() ? minIndex.getAsLong() : 0;
219221
}
220222

223+
private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
224+
DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
225+
builder.setUuid(address.toString());
226+
reply.addDatanode(builder.build());
227+
}
228+
221229
@Override
222-
public long watchForCommit(long index, long timeout)
230+
public XceiverClientReply watchForCommit(long index, long timeout)
223231
throws InterruptedException, ExecutionException, TimeoutException,
224232
IOException {
225233
long commitIndex = getReplicatedMinCommitIndex();
234+
XceiverClientReply clientReply = new XceiverClientReply(null);
226235
if (commitIndex >= index) {
227236
// return the min commit index till which the log has been replicated to
228237
// all servers
229-
return commitIndex;
238+
clientReply.setLogIndex(commitIndex);
239+
return clientReply;
230240
}
231241
LOG.debug("commit index : {} watch timeout : {}", index, timeout);
232242
// create a new RaftClient instance for watch request
@@ -250,26 +260,30 @@ public long watchForCommit(long index, long timeout)
250260
// TODO : need to remove the code to create the new RaftClient instance
251261
// here once the watch request bypassing sliding window in Raft Client
252262
// gets fixed.
253-
watchClient =
254-
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
263+
watchClient = RatisHelper
264+
.newRaftClient(rpcType, getPipeline(), retryPolicy,
255265
maxOutstandingRequests, tlsConfig);
256266
reply = watchClient
257267
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
258268
.get(timeout, TimeUnit.MILLISECONDS);
259-
Optional<RaftProtos.CommitInfoProto>
260-
proto = reply.getCommitInfos().stream().min(Comparator.comparing(
261-
RaftProtos.CommitInfoProto :: getCommitIndex));
262-
Preconditions.checkState(proto.isPresent());
263-
String address = proto.get().getServer().getAddress();
264-
// since 3 way commit has failed, the updated map from now on will
265-
// only store entries for those datanodes which have had successful
266-
// replication.
267-
commitInfoMap.remove(address);
268-
LOG.info(
269-
"Could not commit " + index + " to all the nodes. Server " + address
270-
+ " has failed." + " Committed by majority.");
269+
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
270+
reply.getCommitInfos().stream()
271+
.filter(i -> i.getCommitIndex() < index)
272+
.collect(Collectors.toList());
273+
commitInfoProtoList.parallelStream().forEach(proto -> {
274+
UUID address = RatisHelper.toDatanodeId(proto.getServer());
275+
addDatanodetoReply(address, clientReply);
276+
// since 3 way commit has failed, the updated map from now on will
277+
// only store entries for those datanodes which have had successful
278+
// replication.
279+
commitInfoMap.remove(address);
280+
LOG.info(
281+
"Could not commit " + index + " to all the nodes. Server " + address
282+
+ " has failed." + " Committed by majority.");
283+
});
271284
}
272-
return index;
285+
clientReply.setLogIndex(index);
286+
return clientReply;
273287
}
274288

275289
/**
@@ -296,17 +310,28 @@ public XceiverClientReply sendCommandAsync(
296310
RaftRetryFailureException raftRetryFailureException =
297311
reply.getRetryFailureException();
298312
if (raftRetryFailureException != null) {
313+
// in case of raft retry failure, the raft client is
314+
// not able to connect to the leader hence the pipeline
315+
// can not be used but this instance of RaftClient will close
316+
// and refreshed again. In case the client cannot connect to
317+
// leader, getClient call will fail.
318+
319+
// No need to set the failed Server ID here. Ozone client
320+
// will directly exclude this pipeline in next allocate block
321+
// to SCM as in this case, it is the raft client which is not
322+
// able to connect to leader in the pipeline, though the
323+
// pipeline can still be functional.
299324
throw new CompletionException(raftRetryFailureException);
300325
}
301326
ContainerCommandResponseProto response =
302327
ContainerCommandResponseProto
303328
.parseFrom(reply.getMessage().getContent());
329+
UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
304330
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
305331
updateCommitInfosMap(reply.getCommitInfos());
306-
asyncReply.setLogIndex(reply.getLogIndex());
307-
asyncReply.setDatanode(
308-
RatisHelper.toDatanodeId(reply.getReplierId()));
309332
}
333+
asyncReply.setLogIndex(reply.getLogIndex());
334+
addDatanodetoReply(serverId, asyncReply);
310335
return response;
311336
} catch (InvalidProtocolBufferException e) {
312337
throw new CompletionException(e);

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.ArrayList;
4343
import java.util.Arrays;
4444
import java.util.List;
45-
import java.util.UUID;
4645
import java.util.concurrent.ExecutionException;
4746

4847
/**
@@ -290,7 +289,7 @@ private synchronized void readChunkFromContainer() throws IOException {
290289
XceiverClientReply reply;
291290
ReadChunkResponseProto readChunkResponse = null;
292291
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
293-
List<UUID> excludeDns = null;
292+
List<DatanodeDetails> excludeDns = null;
294293
ByteString byteString;
295294
List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
296295
while (true) {
@@ -334,7 +333,7 @@ private synchronized void readChunkFromContainer() throws IOException {
334333
if (excludeDns == null) {
335334
excludeDns = new ArrayList<>();
336335
}
337-
excludeDns.add(reply.getDatanode());
336+
excludeDns.addAll(reply.getDatanodes());
338337
if (excludeDns.size() == dnList.size()) {
339338
throw ioe;
340339
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.hdds.scm.storage;
2020
import com.google.common.base.Preconditions;
21+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2122
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2223
import org.apache.hadoop.hdds.scm.XceiverClientReply;
2324
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -41,6 +42,7 @@
4142
import java.io.OutputStream;
4243
import java.nio.Buffer;
4344
import java.nio.ByteBuffer;
45+
import java.util.Collections;
4446
import java.util.UUID;
4547
import java.util.List;
4648
import java.util.ArrayList;
@@ -102,14 +104,17 @@ public class BlockOutputStream extends OutputStream {
102104
// by all servers
103105
private long totalAckDataLength;
104106

105-
// list to hold up all putBlock futures
106-
private List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
107-
futureList;
107+
// future Map to hold up all putBlock futures
108+
private ConcurrentHashMap<Long,
109+
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
110+
futureMap;
108111
// map containing mapping for putBlock logIndex to to flushedDataLength Map.
109112
private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
110113

111114
private int currentBufferIndex;
112115

116+
private List<DatanodeDetails> failedServers;
117+
113118
/**
114119
* Creates a new BlockOutputStream.
115120
*
@@ -157,10 +162,11 @@ public BlockOutputStream(BlockID blockID, String key,
157162
responseExecutor = Executors.newSingleThreadExecutor();
158163
commitIndex2flushedDataMap = new ConcurrentHashMap<>();
159164
totalAckDataLength = 0;
160-
futureList = new ArrayList<>();
165+
futureMap = new ConcurrentHashMap<>();
161166
totalDataFlushedLength = 0;
162167
currentBufferIndex = 0;
163168
writtenDataLength = 0;
169+
failedServers = Collections.emptyList();
164170
}
165171

166172
public BlockID getBlockID() {
@@ -182,6 +188,9 @@ private long computeBufferData() {
182188
return dataLength;
183189
}
184190

191+
public List<DatanodeDetails> getFailedServers() {
192+
return failedServers;
193+
}
185194

186195
@Override
187196
public void write(int b) throws IOException {
@@ -299,7 +308,7 @@ private void updateFlushIndex(long index) {
299308
Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
300309
totalAckDataLength = commitIndex2flushedDataMap.remove(index);
301310
LOG.debug("Total data successfully replicated: " + totalAckDataLength);
302-
futureList.remove(0);
311+
futureMap.remove(totalAckDataLength);
303312
// Flush has been committed to required servers successful.
304313
// just swap the bufferList head and tail after clearing.
305314
ByteBuffer currentBuffer = bufferList.remove(0);
@@ -320,7 +329,7 @@ private void updateFlushIndex(long index) {
320329
private void handleFullBuffer() throws IOException {
321330
try {
322331
checkOpen();
323-
if (!futureList.isEmpty()) {
332+
if (!futureMap.isEmpty()) {
324333
waitOnFlushFutures();
325334
}
326335
} catch (InterruptedException | ExecutionException e) {
@@ -362,9 +371,22 @@ private void adjustBuffersOnException() {
362371
private void watchForCommit(long commitIndex) throws IOException {
363372
checkOpen();
364373
Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
374+
long index;
365375
try {
366-
long index =
376+
XceiverClientReply reply =
367377
xceiverClient.watchForCommit(commitIndex, watchTimeout);
378+
if (reply == null) {
379+
index = 0;
380+
} else {
381+
List<DatanodeDetails> dnList = reply.getDatanodes();
382+
if (!dnList.isEmpty()) {
383+
if (failedServers.isEmpty()) {
384+
failedServers = new ArrayList<>();
385+
}
386+
failedServers.addAll(dnList);
387+
}
388+
index = reply.getLogIndex();
389+
}
368390
adjustBuffers(index);
369391
} catch (TimeoutException | InterruptedException | ExecutionException e) {
370392
LOG.warn("watchForCommit failed for index " + commitIndex, e);
@@ -392,8 +414,7 @@ ContainerCommandResponseProto> handlePartialFlush()
392414
try {
393415
validateResponse(e);
394416
} catch (IOException sce) {
395-
future.completeExceptionally(sce);
396-
return e;
417+
throw new CompletionException(sce);
397418
}
398419
// if the ioException is not set, putBlock is successful
399420
if (ioException == null) {
@@ -422,7 +443,7 @@ ContainerCommandResponseProto> handlePartialFlush()
422443
throw new IOException(
423444
"Unexpected Storage Container Exception: " + e.toString(), e);
424445
}
425-
futureList.add(flushFuture);
446+
futureMap.put(flushPos, flushFuture);
426447
return flushFuture;
427448
}
428449

@@ -516,8 +537,8 @@ public void close() throws IOException {
516537

517538
private void waitOnFlushFutures()
518539
throws InterruptedException, ExecutionException {
519-
CompletableFuture<Void> combinedFuture = CompletableFuture
520-
.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
540+
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
541+
futureMap.values().toArray(new CompletableFuture[futureMap.size()]));
521542
// wait for all the transactions to complete
522543
combinedFuture.get();
523544
}
@@ -553,10 +574,10 @@ public void cleanup(boolean invalidateClient) {
553574
}
554575
xceiverClientManager = null;
555576
xceiverClient = null;
556-
if (futureList != null) {
557-
futureList.clear();
577+
if (futureMap != null) {
578+
futureMap.clear();
558579
}
559-
futureList = null;
580+
futureMap = null;
560581
if (commitIndex2flushedDataMap != null) {
561582
commitIndex2flushedDataMap.clear();
562583
}

0 commit comments

Comments
 (0)