Skip to content

Commit 8ef2365

Browse files
committed
HDDS-1779. TestWatchForCommit tests are flaky.Contributed by Shashikant Banerjee. (#1071)
1 parent 79f6118 commit 8ef2365

File tree

2 files changed

+187
-69
lines changed

2 files changed

+187
-69
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
18+
package org.apache.hadoop.ozone.client.rpc;
19+
20+
import org.apache.hadoop.conf.StorageUnit;
21+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
22+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
23+
import org.apache.hadoop.hdds.scm.XceiverClientManager;
24+
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
25+
import org.apache.hadoop.hdds.scm.XceiverClientReply;
26+
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
27+
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
28+
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
29+
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
30+
import org.apache.hadoop.ozone.MiniOzoneCluster;
31+
import org.apache.hadoop.ozone.OzoneConfigKeys;
32+
import org.apache.hadoop.ozone.client.ObjectStore;
33+
import org.apache.hadoop.ozone.client.OzoneClient;
34+
import org.apache.hadoop.ozone.client.OzoneClientFactory;
35+
import org.apache.hadoop.ozone.container.ContainerTestHelper;
36+
import org.apache.hadoop.test.GenericTestUtils;
37+
import org.junit.Assert;
38+
import org.junit.Test;
39+
40+
import java.io.IOException;
41+
import java.util.concurrent.TimeUnit;
42+
43+
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
44+
45+
/**
46+
* This class tests the 2 way commit in Ratis.
47+
*/
48+
public class Test2WayCommitInRatis {
49+
50+
private MiniOzoneCluster cluster;
51+
private OzoneClient client;
52+
private ObjectStore objectStore;
53+
private String volumeName;
54+
private String bucketName;
55+
private int chunkSize;
56+
private int flushSize;
57+
private int maxFlushSize;
58+
private int blockSize;
59+
private StorageContainerLocationProtocolClientSideTranslatorPB
60+
storageContainerLocationClient;
61+
private static String containerOwner = "OZONE";
62+
63+
/**
64+
* Create a MiniDFSCluster for testing.
65+
* <p>
66+
* Ozone is made active by setting OZONE_ENABLED = true
67+
*
68+
* @throws IOException
69+
*/
70+
private void startCluster(OzoneConfiguration conf) throws Exception {
71+
chunkSize = 100;
72+
flushSize = 2 * chunkSize;
73+
maxFlushSize = 2 * flushSize;
74+
blockSize = 2 * maxFlushSize;
75+
76+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
77+
conf.setTimeDuration(
78+
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
79+
1, TimeUnit.SECONDS);
80+
81+
conf.setQuietMode(false);
82+
cluster = MiniOzoneCluster.newBuilder(conf)
83+
.setNumDatanodes(7)
84+
.setBlockSize(blockSize)
85+
.setChunkSize(chunkSize)
86+
.setStreamBufferFlushSize(flushSize)
87+
.setStreamBufferMaxSize(maxFlushSize)
88+
.setStreamBufferSizeUnit(StorageUnit.BYTES)
89+
.build();
90+
cluster.waitForClusterToBeReady();
91+
//the easiest way to create an open container is creating a key
92+
client = OzoneClientFactory.getClient(conf);
93+
objectStore = client.getObjectStore();
94+
volumeName = "watchforcommithandlingtest";
95+
bucketName = volumeName;
96+
objectStore.createVolume(volumeName);
97+
objectStore.getVolume(volumeName).createBucket(bucketName);
98+
storageContainerLocationClient = cluster
99+
.getStorageContainerLocationClient();
100+
}
101+
102+
103+
/**
104+
* Shutdown MiniDFSCluster.
105+
*/
106+
private void shutdown() {
107+
if (cluster != null) {
108+
cluster.shutdown();
109+
}
110+
}
111+
112+
113+
@Test
114+
public void test2WayCommitForRetryfailure() throws Exception {
115+
OzoneConfiguration conf = new OzoneConfiguration();
116+
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
117+
TimeUnit.SECONDS);
118+
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
119+
startCluster(conf);
120+
GenericTestUtils.LogCapturer logCapturer =
121+
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
122+
XceiverClientManager clientManager = new XceiverClientManager(conf);
123+
124+
ContainerWithPipeline container1 = storageContainerLocationClient
125+
.allocateContainer(HddsProtos.ReplicationType.RATIS,
126+
HddsProtos.ReplicationFactor.THREE, containerOwner);
127+
XceiverClientSpi xceiverClient = clientManager
128+
.acquireClient(container1.getPipeline());
129+
Assert.assertEquals(1, xceiverClient.getRefcount());
130+
Assert.assertEquals(container1.getPipeline(),
131+
xceiverClient.getPipeline());
132+
Pipeline pipeline = xceiverClient.getPipeline();
133+
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
134+
XceiverClientReply reply = xceiverClient.sendCommandAsync(
135+
ContainerTestHelper.getCreateContainerRequest(
136+
container1.getContainerInfo().getContainerID(),
137+
xceiverClient.getPipeline()));
138+
reply.getResponse().get();
139+
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
140+
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
141+
reply = xceiverClient.sendCommandAsync(ContainerTestHelper
142+
.getCloseContainer(pipeline,
143+
container1.getContainerInfo().getContainerID()));
144+
reply.getResponse().get();
145+
xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
146+
147+
// commitInfo Map will be reduced to 2 here
148+
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
149+
clientManager.releaseClient(xceiverClient, false);
150+
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
151+
Assert
152+
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
153+
logCapturer.stopCapturing();
154+
shutdown();
155+
}
156+
}

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java

Lines changed: 31 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
import java.io.OutputStream;
4747
import java.util.ArrayList;
4848
import java.util.List;
49+
import java.util.Random;
4950
import java.util.UUID;
51+
import java.util.concurrent.ExecutionException;
5052
import java.util.concurrent.TimeUnit;
5153
import java.util.concurrent.TimeoutException;
5254

@@ -135,7 +137,10 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
135137
OzoneConfiguration conf = new OzoneConfiguration();
136138
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
137139
TimeUnit.SECONDS);
138-
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5);
140+
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
141+
conf.setTimeDuration(
142+
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
143+
1, TimeUnit.SECONDS);
139144
startCluster(conf);
140145
XceiverClientMetrics metrics =
141146
XceiverClientManager.getXceiverClientMetrics();
@@ -178,31 +183,24 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
178183
.getOutputStream();
179184
Assert.assertTrue(stream instanceof BlockOutputStream);
180185
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
181-
182186
// we have just written data more than flush Size(2 chunks), at this time
183187
// buffer pool will have 3 buffers allocated worth of chunk size
184-
185188
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
186189
// writtenDataLength as well flushedDataLength will be updated here
187190
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
188-
189191
Assert.assertEquals(maxFlushSize,
190192
blockOutputStream.getTotalDataFlushedLength());
191-
192193
// since data equals to maxBufferSize is written, this will be a blocking
193194
// call and hence will wait for atleast flushSize worth of data to get
194195
// acked by all servers right here
195196
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
196-
197197
// watchForCommit will clean up atleast one entry from the map where each
198198
// entry corresponds to flushSize worth of data
199199
Assert.assertTrue(
200200
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
201-
202201
// Now do a flush. This will flush the data and update the flush length and
203202
// the map.
204203
key.flush();
205-
206204
Assert.assertEquals(pendingWriteChunkCount,
207205
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
208206
Assert.assertEquals(pendingPutBlockCount,
@@ -213,19 +211,15 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
213211
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
214212
Assert.assertEquals(totalOpCount + 8,
215213
metrics.getTotalOpCount());
216-
217214
// Since the data in the buffer is already flushed, flush here will have
218215
// no impact on the counters and data structures
219-
220216
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
221217
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
222-
223218
Assert.assertEquals(dataLength,
224219
blockOutputStream.getTotalDataFlushedLength());
225220
// flush will make sure one more entry gets updated in the map
226221
Assert.assertTrue(
227222
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
228-
229223
XceiverClientRatis raftClient =
230224
(XceiverClientRatis) blockOutputStream.getXceiverClient();
231225
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
@@ -235,11 +229,9 @@ public void testWatchForCommitWithKeyWrite() throws Exception {
235229
// again write data with more than max buffer limit. This will call
236230
// watchForCommit again. Since the commit will happen 2 way, the
237231
// commitInfoMap will get updated for servers which are alive
238-
239232
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
240233
// once exception is hit
241234
key.write(data1);
242-
243235
// As a part of handling the exception, 4 failed writeChunks will be
244236
// rewritten plus one partial chunk plus two putBlocks for flushSize
245237
// and one flush for partial chunk
@@ -282,7 +274,7 @@ public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
282274
OzoneConfiguration conf = new OzoneConfiguration();
283275
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
284276
TimeUnit.SECONDS);
285-
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
277+
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
286278
startCluster(conf);
287279
XceiverClientManager clientManager = new XceiverClientManager(conf);
288280
ContainerWithPipeline container1 = storageContainerLocationClient
@@ -303,8 +295,11 @@ public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
303295
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
304296
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
305297
try {
306-
// just watch for a lo index which in not updated in the commitInfo Map
307-
xceiverClient.watchForCommit(index + 1, 3000);
298+
// just watch for a log index which in not updated in the commitInfo Map
299+
// as well as there is no logIndex generate in Ratis.
300+
// The basic idea here is just to test if its throws an exception.
301+
xceiverClient
302+
.watchForCommit(index + new Random().nextInt(100) + 10, 3000);
308303
Assert.fail("expected exception not thrown");
309304
} catch (Exception e) {
310305
Assert.assertTrue(
@@ -321,7 +316,7 @@ public void testWatchForCommitForRetryfailure() throws Exception {
321316
OzoneConfiguration conf = new OzoneConfiguration();
322317
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
323318
100, TimeUnit.SECONDS);
324-
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
319+
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
325320
startCluster(conf);
326321
XceiverClientManager clientManager = new XceiverClientManager(conf);
327322
ContainerWithPipeline container1 = storageContainerLocationClient
@@ -343,67 +338,30 @@ public void testWatchForCommitForRetryfailure() throws Exception {
343338
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
344339
// again write data with more than max buffer limit. This wi
345340
try {
346-
// just watch for a lo index which in not updated in the commitInfo Map
347-
xceiverClient.watchForCommit(index + 1, 20000);
341+
// just watch for a log index which in not updated in the commitInfo Map
342+
// as well as there is no logIndex generate in Ratis.
343+
// The basic idea here is just to test if its throws an exception.
344+
xceiverClient
345+
.watchForCommit(index + new Random().nextInt(100) + 10, 20000);
348346
Assert.fail("expected exception not thrown");
349347
} catch (Exception e) {
350-
Assert.assertTrue(HddsClientUtils
351-
.checkForException(e) instanceof RaftRetryFailureException);
348+
Assert.assertTrue(e instanceof ExecutionException);
349+
// since the timeout value is quite long, the watch request will either
350+
// fail with NotReplicated exceptio, RetryFailureException or
351+
// RuntimeException
352+
Assert.assertFalse(HddsClientUtils
353+
.checkForException(e) instanceof TimeoutException);
352354
}
353355
clientManager.releaseClient(xceiverClient, false);
354356
shutdown();
355357
}
356358

357-
@Test
358-
public void test2WayCommitForRetryfailure() throws Exception {
359-
OzoneConfiguration conf = new OzoneConfiguration();
360-
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
361-
TimeUnit.SECONDS);
362-
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8);
363-
startCluster(conf);
364-
GenericTestUtils.LogCapturer logCapturer =
365-
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
366-
XceiverClientManager clientManager = new XceiverClientManager(conf);
367-
368-
ContainerWithPipeline container1 = storageContainerLocationClient
369-
.allocateContainer(HddsProtos.ReplicationType.RATIS,
370-
HddsProtos.ReplicationFactor.THREE, containerOwner);
371-
XceiverClientSpi xceiverClient = clientManager
372-
.acquireClient(container1.getPipeline());
373-
Assert.assertEquals(1, xceiverClient.getRefcount());
374-
Assert.assertEquals(container1.getPipeline(),
375-
xceiverClient.getPipeline());
376-
Pipeline pipeline = xceiverClient.getPipeline();
377-
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
378-
XceiverClientReply reply = xceiverClient.sendCommandAsync(
379-
ContainerTestHelper.getCreateContainerRequest(
380-
container1.getContainerInfo().getContainerID(),
381-
xceiverClient.getPipeline()));
382-
reply.getResponse().get();
383-
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
384-
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
385-
reply = xceiverClient.sendCommandAsync(ContainerTestHelper
386-
.getCloseContainer(pipeline,
387-
container1.getContainerInfo().getContainerID()));
388-
reply.getResponse().get();
389-
xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
390-
391-
// commitInfo Map will be reduced to 2 here
392-
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
393-
clientManager.releaseClient(xceiverClient, false);
394-
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
395-
Assert
396-
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
397-
logCapturer.stopCapturing();
398-
shutdown();
399-
}
400-
401359
@Test
402360
public void test2WayCommitForTimeoutException() throws Exception {
403361
OzoneConfiguration conf = new OzoneConfiguration();
404362
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
405363
TimeUnit.SECONDS);
406-
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
364+
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
407365
startCluster(conf);
408366
GenericTestUtils.LogCapturer logCapturer =
409367
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
@@ -477,8 +435,12 @@ public void testWatchForCommitForGroupMismatchException() throws Exception {
477435
pipelineList.add(pipeline);
478436
ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
479437
try {
480-
// just watch for a lo index which in not updated in the commitInfo Map
481-
xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000);
438+
// just watch for a log index which in not updated in the commitInfo Map
439+
// as well as there is no logIndex generate in Ratis.
440+
// The basic idea here is just to test if its throws an exception.
441+
xceiverClient
442+
.watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10,
443+
20000);
482444
Assert.fail("Expected exception not thrown");
483445
} catch(Exception e) {
484446
Assert.assertTrue(HddsClientUtils

0 commit comments

Comments
 (0)