1717
1818package org .apache .hadoop .ozone .client .rpc ;
1919
20+ import org .apache .hadoop .hdds .client .BlockID ;
2021import org .apache .hadoop .hdds .client .ReplicationType ;
2122import org .apache .hadoop .hdds .conf .OzoneConfiguration ;
2223import org .apache .hadoop .hdds .protocol .DatanodeDetails ;
3031import org .apache .hadoop .ozone .client .ObjectStore ;
3132import org .apache .hadoop .ozone .client .OzoneClient ;
3233import org .apache .hadoop .ozone .client .OzoneClientFactory ;
34+ import org .apache .hadoop .ozone .client .io .BlockOutputStreamEntry ;
3335import org .apache .hadoop .ozone .client .io .KeyOutputStream ;
3436import org .apache .hadoop .ozone .client .io .OzoneOutputStream ;
3537import org .apache .hadoop .ozone .container .ContainerTestHelper ;
3638import org .apache .hadoop .ozone .om .helpers .OmKeyArgs ;
3739import org .apache .hadoop .ozone .om .helpers .OmKeyInfo ;
3840import org .apache .hadoop .ozone .om .helpers .OmKeyLocationInfo ;
41+ import org .junit .After ;
3942import org .junit .Assert ;
4043import org .junit .Test ;
4144
@@ -61,7 +64,6 @@ public class TestMultiBlockWritesWithDnFailures {
6164 private String volumeName ;
6265 private String bucketName ;
6366 private String keyString ;
64- private int maxRetries ;
6567
6668 /**
6769 * Create a MiniDFSCluster for testing.
@@ -70,23 +72,25 @@ public class TestMultiBlockWritesWithDnFailures {
7072 *
7173 * @throws IOException
7274 */
73- private void init ( ) throws Exception {
75+ private void startCluster ( int datanodes ) throws Exception {
7476 conf = new OzoneConfiguration ();
75- maxRetries = 100 ;
7677 chunkSize = (int ) OzoneConsts .MB ;
7778 blockSize = 4 * chunkSize ;
7879 conf .setTimeDuration (OzoneConfigKeys .OZONE_CLIENT_WATCH_REQUEST_TIMEOUT , 5 ,
7980 TimeUnit .SECONDS );
8081 conf .setTimeDuration (HDDS_SCM_WATCHER_TIMEOUT , 1000 , TimeUnit .MILLISECONDS );
8182 conf .setTimeDuration (OZONE_SCM_STALENODE_INTERVAL , 100 , TimeUnit .SECONDS );
82- conf .setInt (OzoneConfigKeys .DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY , 5 );
83+ conf .setInt (OzoneConfigKeys .DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY , 10 );
8384 conf .setTimeDuration (
8485 OzoneConfigKeys .DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY ,
8586 1 , TimeUnit .SECONDS );
87+ conf .setTimeDuration (
88+ OzoneConfigKeys .DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY ,
89+ 1 , TimeUnit .SECONDS );
8690
8791 conf .setQuietMode (false );
8892 cluster = MiniOzoneCluster .newBuilder (conf )
89- .setNumDatanodes (6 ).build ();
93+ .setNumDatanodes (datanodes ).build ();
9094 cluster .waitForClusterToBeReady ();
9195 //the easiest way to create an open container is creating a key
9296 client = OzoneClientFactory .getClient (conf );
@@ -98,22 +102,19 @@ private void init() throws Exception {
98102 objectStore .getVolume (volumeName ).createBucket (bucketName );
99103 }
100104
101- private void startCluster () throws Exception {
102- init ();
103- }
104-
105105 /**
106106 * Shutdown MiniDFSCluster.
107107 */
108- private void shutdown () {
108+ @ After
109+ public void shutdown () {
109110 if (cluster != null ) {
110111 cluster .shutdown ();
111112 }
112113 }
113114
114115 @ Test
115116 public void testMultiBlockWritesWithDnFailures () throws Exception {
116- startCluster ();
117+ startCluster (6 );
117118 String keyName = "ratis3" ;
118119 OzoneOutputStream key = createKey (keyName , ReplicationType .RATIS , 0 );
119120 String data =
@@ -151,7 +152,58 @@ public void testMultiBlockWritesWithDnFailures() throws Exception {
151152 OmKeyInfo keyInfo = cluster .getOzoneManager ().lookupKey (keyArgs );
152153 Assert .assertEquals (2 * data .getBytes ().length , keyInfo .getDataSize ());
153154 validateData (keyName , data .concat (data ).getBytes ());
154- shutdown ();
155+ }
156+
157+ @ Test
158+ public void testMultiBlockWritesWithIntermittentDnFailures ()
159+ throws Exception {
160+ startCluster (10 );
161+ String keyName = UUID .randomUUID ().toString ();
162+ OzoneOutputStream key =
163+ createKey (keyName , ReplicationType .RATIS , 6 * blockSize );
164+ String data = ContainerTestHelper
165+ .getFixedLengthString (keyString , blockSize + chunkSize );
166+ key .write (data .getBytes ());
167+
168+ // get the name of a valid container
169+ Assert .assertTrue (key .getOutputStream () instanceof KeyOutputStream );
170+ KeyOutputStream keyOutputStream =
171+ (KeyOutputStream ) key .getOutputStream ();
172+ List <BlockOutputStreamEntry > streamEntryList =
173+ keyOutputStream .getStreamEntries ();
174+
175+ // Assert that 6 block will be preallocated
176+ Assert .assertEquals (6 , streamEntryList .size ());
177+ key .write (data .getBytes ());
178+ key .flush ();
179+ long containerId = streamEntryList .get (0 ).getBlockID ().getContainerID ();
180+ BlockID blockId = streamEntryList .get (0 ).getBlockID ();
181+ ContainerInfo container =
182+ cluster .getStorageContainerManager ().getContainerManager ()
183+ .getContainer (ContainerID .valueof (containerId ));
184+ Pipeline pipeline =
185+ cluster .getStorageContainerManager ().getPipelineManager ()
186+ .getPipeline (container .getPipelineID ());
187+ List <DatanodeDetails > datanodes = pipeline .getNodes ();
188+ cluster .shutdownHddsDatanode (datanodes .get (0 ));
189+
190+ // The write will fail but exception will be handled and length will be
191+ // updated correctly in OzoneManager once the steam is closed
192+ key .write (data .getBytes ());
193+
194+ // shutdown the second datanode
195+ cluster .shutdownHddsDatanode (datanodes .get (1 ));
196+ key .write (data .getBytes ());
197+ key .close ();
198+ OmKeyArgs keyArgs = new OmKeyArgs .Builder ().setVolumeName (volumeName )
199+ .setBucketName (bucketName ).setType (HddsProtos .ReplicationType .RATIS )
200+ .setFactor (HddsProtos .ReplicationFactor .THREE ).setKeyName (keyName )
201+ .setRefreshPipeline (true )
202+ .build ();
203+ OmKeyInfo keyInfo = cluster .getOzoneManager ().lookupKey (keyArgs );
204+ Assert .assertEquals (4 * data .getBytes ().length , keyInfo .getDataSize ());
205+ validateData (keyName ,
206+ data .concat (data ).concat (data ).concat (data ).getBytes ());
155207 }
156208
157209 private OzoneOutputStream createKey (String keyName , ReplicationType type ,
0 commit comments