Skip to content

Commit 41440ec

Browse files
authored
HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception(#1552).
1 parent 3df733c commit 41440ec

File tree

2 files changed

+79
-6
lines changed

2 files changed

+79
-6
lines changed

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine {
150150
private final Cache<Long, ByteString> stateMachineDataCache;
151151
private final boolean isBlockTokenEnabled;
152152
private final TokenVerifier tokenVerifier;
153-
private final AtomicBoolean isStateMachineHealthy;
153+
private final AtomicBoolean stateMachineHealthy;
154154

155155
private final Semaphore applyTransactionSemaphore;
156156
/**
@@ -190,7 +190,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
190190
ScmConfigKeys.
191191
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
192192
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
193-
isStateMachineHealthy = new AtomicBoolean(true);
193+
stateMachineHealthy = new AtomicBoolean(true);
194194
this.executors = new ExecutorService[numContainerOpExecutors];
195195
for (int i = 0; i < numContainerOpExecutors; i++) {
196196
final int index = i;
@@ -271,11 +271,15 @@ public void persistContainerSet(OutputStream out) throws IOException {
271271
IOUtils.write(builder.build().toByteArray(), out);
272272
}
273273

274+
public boolean isStateMachineHealthy() {
275+
return stateMachineHealthy.get();
276+
}
277+
274278
@Override
275279
public long takeSnapshot() throws IOException {
276280
TermIndex ti = getLastAppliedTermIndex();
277281
long startTime = Time.monotonicNow();
278-
if (!isStateMachineHealthy.get()) {
282+
if (!isStateMachineHealthy()) {
279283
String msg =
280284
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
281285
+ " is unhealthy. The last applied index is at " + ti;
@@ -731,7 +735,11 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
731735
metrics.incPipelineLatency(cmdType,
732736
Time.monotonicNowNanos() - startTime);
733737
}
734-
if (r.getResult() != ContainerProtos.Result.SUCCESS) {
738+
// ignore close container exception while marking the stateMachine
739+
// unhealthy
740+
if (r.getResult() != ContainerProtos.Result.SUCCESS
741+
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
742+
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
735743
StorageContainerException sce =
736744
new StorageContainerException(r.getMessage(), r.getResult());
737745
LOG.error(
@@ -744,7 +752,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
744752
// caught in stateMachineUpdater in Ratis and ratis server will
745753
// shutdown.
746754
applyTransactionFuture.completeExceptionally(sce);
747-
isStateMachineHealthy.compareAndSet(true, false);
755+
stateMachineHealthy.compareAndSet(true, false);
748756
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
749757
} else {
750758
LOG.debug(
@@ -759,7 +767,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
759767
// add the entry to the applyTransactionCompletionMap only if the
760768
// stateMachine is healthy i.e, there has been no applyTransaction
761769
// failures before.
762-
if (isStateMachineHealthy.get()) {
770+
if (isStateMachineHealthy()) {
763771
final Long previous = applyTransactionCompletionMap
764772
.put(index, trx.getLogEntry().getTerm());
765773
Preconditions.checkState(previous == null);

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,71 @@ public void testApplyTransactionFailure() throws Exception {
353353
Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
354354
}
355355

356+
@Test
357+
public void testApplyTransactionIdempotencyWithClosedContainer()
358+
throws Exception {
359+
OzoneOutputStream key =
360+
objectStore.getVolume(volumeName).getBucket(bucketName)
361+
.createKey("ratis", 1024, ReplicationType.RATIS,
362+
ReplicationFactor.ONE, new HashMap<>());
363+
// First write and flush creates a container in the datanode
364+
key.write("ratis".getBytes());
365+
key.flush();
366+
key.write("ratis".getBytes());
367+
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
368+
List<OmKeyLocationInfo> locationInfoList =
369+
groupOutputStream.getLocationInfoList();
370+
Assert.assertEquals(1, locationInfoList.size());
371+
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
372+
ContainerData containerData =
373+
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
374+
.getContainer().getContainerSet()
375+
.getContainer(omKeyLocationInfo.getContainerID())
376+
.getContainerData();
377+
Assert.assertTrue(containerData instanceof KeyValueContainerData);
378+
key.close();
379+
ContainerStateMachine stateMachine =
380+
(ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
381+
SimpleStateMachineStorage storage =
382+
(SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
383+
Path parentPath = storage.findLatestSnapshot().getFile().getPath();
384+
// Since the snapshot threshold is set to 1, since there are
385+
// applyTransactions, we should see snapshots
386+
Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
387+
FileInfo snapshot = storage.findLatestSnapshot().getFile();
388+
Assert.assertNotNull(snapshot);
389+
long containerID = omKeyLocationInfo.getContainerID();
390+
Pipeline pipeline = cluster.getStorageContainerLocationClient()
391+
.getContainerWithPipeline(containerID).getPipeline();
392+
XceiverClientSpi xceiverClient =
393+
xceiverClientManager.acquireClient(pipeline);
394+
ContainerProtos.ContainerCommandRequestProto.Builder request =
395+
ContainerProtos.ContainerCommandRequestProto.newBuilder();
396+
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
397+
request.setCmdType(ContainerProtos.Type.CloseContainer);
398+
request.setContainerID(containerID);
399+
request.setCloseContainer(
400+
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
401+
try {
402+
xceiverClient.sendCommand(request.build());
403+
} catch (IOException e) {
404+
Assert.fail("Exception should not be thrown");
405+
}
406+
Assert.assertTrue(
407+
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
408+
.getContainer().getContainerSet().getContainer(containerID)
409+
.getContainerState()
410+
== ContainerProtos.ContainerDataProto.State.CLOSED);
411+
Assert.assertTrue(stateMachine.isStateMachineHealthy());
412+
try {
413+
stateMachine.takeSnapshot();
414+
} catch (IOException ioe) {
415+
Assert.fail("Exception should not be thrown");
416+
}
417+
FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
418+
Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
419+
}
420+
356421
@Test
357422
public void testValidateBCSIDOnDnRestart() throws Exception {
358423
OzoneOutputStream key =

0 commit comments

Comments
 (0)