Skip to content

Commit 1d6d0d8

Browse files
committed
HDFS-14694. Call recoverLease on DFSOutputStream close exception. Contributed by Lisheng Sun.
Co-authored-by: Chen Zhang <[email protected]> Signed-off-by: He Xiaoqiao <[email protected]> Reviewed-by: Ayush Saxena <[email protected]>
1 parent 43572fc commit 1d6d0d8

File tree

3 files changed

+107
-1
lines changed

3 files changed

+107
-1
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
import com.google.common.annotations.VisibleForTesting;
7676
import com.google.common.base.Preconditions;
7777

78+
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT;
79+
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
7880

7981
/****************************************************************
8082
* DFSOutputStream creates files from a stream of bytes.
@@ -126,6 +128,7 @@ public class DFSOutputStream extends FSOutputSummer
126128
protected final AtomicReference<CachingStrategy> cachingStrategy;
127129
private FileEncryptionInfo fileEncryptionInfo;
128130
private int writePacketSize;
131+
private boolean leaseRecovered = false;
129132

130133
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
131134
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@@ -861,7 +864,14 @@ public void close() throws IOException {
861864
}
862865

863866
protected synchronized void closeImpl() throws IOException {
867+
boolean recoverLeaseOnCloseException = dfsClient.getConfiguration()
868+
.getBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY,
869+
RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT);
864870
if (isClosed()) {
871+
if (!leaseRecovered) {
872+
recoverLease(recoverLeaseOnCloseException);
873+
}
874+
865875
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
866876
closed, getStreamer().streamerClosed());
867877
try {
@@ -896,6 +906,9 @@ protected synchronized void closeImpl() throws IOException {
896906
}
897907
completeFile();
898908
} catch (ClosedChannelException ignored) {
909+
} catch (IOException ioe) {
910+
recoverLease(recoverLeaseOnCloseException);
911+
throw ioe;
899912
} finally {
900913
// Failures may happen when flushing data.
901914
// Streamers may keep waiting for the new block information.
@@ -906,7 +919,23 @@ protected synchronized void closeImpl() throws IOException {
906919
}
907920
}
908921

909-
private void completeFile() throws IOException {
922+
/**
923+
* If recoverLeaseOnCloseException is true and an exception occurs when
924+
* closing a file, recover lease.
925+
*/
926+
private void recoverLease(boolean recoverLeaseOnCloseException) {
927+
if (recoverLeaseOnCloseException) {
928+
try {
929+
dfsClient.endFileLease(fileId);
930+
dfsClient.recoverLease(src);
931+
leaseRecovered = true;
932+
} catch (Exception e) {
933+
LOG.warn("Fail to recover lease for {}", src, e);
934+
}
935+
}
936+
}
937+
938+
void completeFile() throws IOException {
910939
// get last block before destroying the streamer
911940
ExtendedBlock lastBlock = getStreamer().getBlock();
912941
try (TraceScope ignored =
@@ -1076,6 +1105,11 @@ public String toString() {
10761105
return getClass().getSimpleName() + ":" + streamer;
10771106
}
10781107

1108+
@VisibleForTesting
1109+
boolean isLeaseRecovered() {
1110+
return leaseRecovered;
1111+
}
1112+
10791113
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
10801114
DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
10811115
String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,9 @@ interface Write {
362362
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
363363
PREFIX + "exclude.nodes.cache.expiry.interval.millis";
364364
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
365+
String RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY =
366+
PREFIX + "recover.lease.on.close.exception";
367+
boolean RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT = false;
365368

366369
interface ByteArrayManager {
367370
String PREFIX = Write.PREFIX + "byte-array-manager.";

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.LinkedList;
3131
import java.util.Map;
3232
import java.util.Random;
33+
import java.util.concurrent.TimeoutException;
3334

3435
import org.apache.hadoop.conf.Configuration;
3536
import org.apache.hadoop.fs.CreateFlag;
@@ -62,7 +63,10 @@
6263
import org.junit.BeforeClass;
6364
import org.junit.Test;
6465

66+
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
67+
import static org.junit.Assert.assertFalse;
6568
import static org.junit.Assert.assertTrue;
69+
import static org.junit.Assert.fail;
6670
import static org.mockito.ArgumentMatchers.anyBoolean;
6771
import static org.mockito.ArgumentMatchers.anyLong;
6872
import org.mockito.Mockito;
@@ -371,10 +375,75 @@ public void testStreamFlush() throws Exception {
371375
os.close();
372376
}
373377

378+
@Test
379+
public void testExceptionInCloseWithRecoverLease() throws Exception {
380+
Configuration conf = new Configuration();
381+
conf.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true);
382+
DFSClient client =
383+
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
384+
DFSClient spyClient = Mockito.spy(client);
385+
DFSOutputStream dfsOutputStream = spyClient.create(
386+
"/testExceptionInCloseWithRecoverLease", FsPermission.getFileDefault(),
387+
EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null, 1024, null);
388+
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
389+
doThrow(new IOException("Emulated IOException in close"))
390+
.when(spyDFSOutputStream).completeFile();
391+
try {
392+
spyDFSOutputStream.close();
393+
fail();
394+
} catch (IOException ioe) {
395+
assertTrue(spyDFSOutputStream.isLeaseRecovered());
396+
waitForFileClosed("/testExceptionInCloseWithRecoverLease");
397+
assertTrue(isFileClosed("/testExceptionInCloseWithRecoverLease"));
398+
}
399+
}
400+
401+
@Test
402+
public void testExceptionInCloseWithoutRecoverLease() throws Exception {
403+
Configuration conf = new Configuration();
404+
DFSClient client =
405+
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
406+
DFSClient spyClient = Mockito.spy(client);
407+
DFSOutputStream dfsOutputStream =
408+
spyClient.create("/testExceptionInCloseWithoutRecoverLease",
409+
FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE),
410+
(short) 3, 1024, null, 1024, null);
411+
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
412+
doThrow(new IOException("Emulated IOException in close"))
413+
.when(spyDFSOutputStream).completeFile();
414+
try {
415+
spyDFSOutputStream.close();
416+
fail();
417+
} catch (IOException ioe) {
418+
assertFalse(spyDFSOutputStream.isLeaseRecovered());
419+
try {
420+
waitForFileClosed("/testExceptionInCloseWithoutRecoverLease");
421+
} catch (TimeoutException e) {
422+
assertFalse(isFileClosed("/testExceptionInCloseWithoutRecoverLease"));
423+
}
424+
}
425+
}
426+
374427
@AfterClass
375428
public static void tearDown() {
376429
if (cluster != null) {
377430
cluster.shutdown();
378431
}
379432
}
433+
434+
private boolean isFileClosed(String path) throws IOException {
435+
return cluster.getFileSystem().isFileClosed(new Path(path));
436+
}
437+
438+
private void waitForFileClosed(String path) throws Exception {
439+
GenericTestUtils.waitFor(() -> {
440+
boolean closed;
441+
try {
442+
closed = isFileClosed(path);
443+
} catch (IOException e) {
444+
return false;
445+
}
446+
return closed;
447+
}, 1000, 5000);
448+
}
380449
}

0 commit comments

Comments
 (0)