Skip to content

Commit 2c52d00

Browse files
authored
HDDS-2032. Ozone client should retry writes in case of any ratis/stateMachine exceptions. Contributed by Shashikant Banerjee (#1420).
1 parent 5dd859a commit 2c52d00

File tree

2 files changed

+19
-19
lines changed

2 files changed

+19
-19
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
2929
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
3030
import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
31-
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
31+
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
3232
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
3333
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
3434
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -86,7 +86,7 @@ private HddsClientUtils() {
8686
private static final List<Class<? extends Exception>> EXCEPTION_LIST =
8787
new ArrayList<Class<? extends Exception>>() {{
8888
add(TimeoutException.class);
89-
add(ContainerNotOpenException.class);
89+
add(StorageContainerException.class);
9090
add(RaftRetryFailureException.class);
9191
add(AlreadyClosedException.class);
9292
add(GroupMismatchException.class);
@@ -301,7 +301,7 @@ public static SCMSecurityProtocol getScmSecurityClient(
301301
return scmSecurityClient;
302302
}
303303

304-
public static Throwable checkForException(Exception e) throws IOException {
304+
public static Throwable checkForException(Exception e) {
305305
Throwable t = e;
306306
while (t != null) {
307307
for (Class<? extends Exception> cls : getExceptionList()) {
@@ -311,8 +311,7 @@ public static Throwable checkForException(Exception e) throws IOException {
311311
}
312312
t = t.getCause();
313313
}
314-
315-
throw e instanceof IOException ? (IOException)e : new IOException(e);
314+
return t;
316315
}
317316

318317
public static RetryPolicy createRetryPolicy(int maxRetryCount,

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
2626
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
2727
import org.apache.hadoop.hdds.scm.container.ContainerID;
28-
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
2928
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
29+
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
3030
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
3131
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
3232
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -37,8 +37,6 @@
3737
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
3838
import org.apache.hadoop.hdds.scm.XceiverClientManager;
3939
import org.apache.ratis.protocol.AlreadyClosedException;
40-
import org.apache.ratis.protocol.GroupMismatchException;
41-
import org.apache.ratis.protocol.NotReplicatedException;
4240
import org.apache.ratis.protocol.RaftRetryFailureException;
4341
import org.slf4j.Logger;
4442
import org.slf4j.LoggerFactory;
@@ -49,7 +47,6 @@
4947
import java.util.List;
5048
import java.util.Collection;
5149
import java.util.Map;
52-
import java.util.concurrent.TimeoutException;
5350
import java.util.function.Function;
5451
import java.util.stream.Collectors;
5552

@@ -256,18 +253,19 @@ private void handleWrite(byte[] b, int off, long len, boolean retry)
256253
private void handleException(BlockOutputStreamEntry streamEntry,
257254
IOException exception) throws IOException {
258255
Throwable t = HddsClientUtils.checkForException(exception);
256+
Preconditions.checkNotNull(t);
259257
boolean retryFailure = checkForRetryFailure(t);
260-
boolean closedContainerException = false;
258+
boolean containerExclusionException = false;
261259
if (!retryFailure) {
262-
closedContainerException = checkIfContainerIsClosed(t);
260+
containerExclusionException = checkIfContainerToExclude(t);
263261
}
264262
Pipeline pipeline = streamEntry.getPipeline();
265263
PipelineID pipelineId = pipeline.getId();
266264
long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
267265
//set the correct length for the current stream
268266
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
269267
long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
270-
if (closedContainerException) {
268+
if (containerExclusionException) {
271269
LOG.debug(
272270
"Encountered exception {}. The last committed block length is {}, "
273271
+ "uncommitted data length is {} retry count {}", exception,
@@ -290,11 +288,12 @@ private void handleException(BlockOutputStreamEntry streamEntry,
290288
if (!failedServers.isEmpty()) {
291289
excludeList.addDatanodes(failedServers);
292290
}
293-
if (closedContainerException) {
291+
292+
// if the container needs to be excluded , add the container to the
293+
// exclusion list , otherwise add the pipeline to the exclusion list
294+
if (containerExclusionException) {
294295
excludeList.addConatinerId(ContainerID.valueof(containerId));
295-
} else if (retryFailure || t instanceof TimeoutException
296-
|| t instanceof GroupMismatchException
297-
|| t instanceof NotReplicatedException) {
296+
} else {
298297
excludeList.addPipeline(pipelineId);
299298
}
300299
// just clean up the current stream.
@@ -303,7 +302,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
303302
// discard all subsequent blocks the containers and pipelines which
304303
// are in the exclude list so that, the very next retry should never
305304
// write data on the closed container/pipeline
306-
if (closedContainerException) {
305+
if (containerExclusionException) {
307306
// discard subsequent pre allocated blocks from the streamEntries list
308307
// from the closed container
309308
blockOutputStreamEntryPool
@@ -386,8 +385,10 @@ private boolean checkForRetryFailure(Throwable t) {
386385
|| t instanceof AlreadyClosedException;
387386
}
388387

389-
private boolean checkIfContainerIsClosed(Throwable t) {
390-
return t instanceof ContainerNotOpenException;
388+
// Every container specific exception from datatnode will be seen as
389+
// StorageContainerException
390+
private boolean checkIfContainerToExclude(Throwable t) {
391+
return t instanceof StorageContainerException;
391392
}
392393

393394
@Override

0 commit comments

Comments
 (0)