Skip to content

Commit be4c638

Browse files
authored
HDFS-16748. RBF: DFSClient should uniquely identify writing files by namespace id and iNodeId via RBF (#4813). Contributed by ZanderXu.
Reviewed-by: He Xiaoqiao <[email protected]> Signed-off-by: Ayush Saxena <[email protected]>
1 parent ac42519 commit be4c638

File tree

9 files changed

+168
-28
lines changed

9 files changed

+168
-28
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ Configuration getConfiguration() {
275275
* that are currently being written by this client.
276276
* Note that a file can only be written by a single client.
277277
*/
278-
private final Map<Long, DFSOutputStream> filesBeingWritten = new HashMap<>();
278+
private final Map<String, DFSOutputStream> filesBeingWritten = new HashMap<>();
279279

280280
/**
281281
* Same as this(NameNode.getNNAddress(conf), conf);
@@ -502,9 +502,9 @@ public LeaseRenewer getLeaseRenewer() {
502502
}
503503

504504
/** Get a lease and start automatic renewal */
505-
private void beginFileLease(final long inodeId, final DFSOutputStream out) {
505+
private void beginFileLease(final String key, final DFSOutputStream out) {
506506
synchronized (filesBeingWritten) {
507-
putFileBeingWritten(inodeId, out);
507+
putFileBeingWritten(key, out);
508508
LeaseRenewer renewer = getLeaseRenewer();
509509
boolean result = renewer.put(this);
510510
if (!result) {
@@ -518,9 +518,9 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out) {
518518
}
519519

520520
/** Stop renewal of lease for the file. */
521-
void endFileLease(final long inodeId) {
521+
void endFileLease(final String renewLeaseKey) {
522522
synchronized (filesBeingWritten) {
523-
removeFileBeingWritten(inodeId);
523+
removeFileBeingWritten(renewLeaseKey);
524524
// remove client from renewer if no files are open
525525
if (filesBeingWritten.isEmpty()) {
526526
getLeaseRenewer().closeClient(this);
@@ -532,10 +532,10 @@ void endFileLease(final long inodeId) {
532532
* enforced to consistently update its local dfsclients array and
533533
* client's filesBeingWritten map.
534534
*/
535-
public void putFileBeingWritten(final long inodeId,
535+
public void putFileBeingWritten(final String key,
536536
final DFSOutputStream out) {
537537
synchronized(filesBeingWritten) {
538-
filesBeingWritten.put(inodeId, out);
538+
filesBeingWritten.put(key, out);
539539
// update the last lease renewal time only when there was no
540540
// writes. once there is one write stream open, the lease renewer
541541
// thread keeps it updated well with in anyone's expiration time.
@@ -546,9 +546,9 @@ public void putFileBeingWritten(final long inodeId,
546546
}
547547

548548
/** Remove a file. Only called from LeaseRenewer. */
549-
public void removeFileBeingWritten(final long inodeId) {
549+
public void removeFileBeingWritten(final String key) {
550550
synchronized(filesBeingWritten) {
551-
filesBeingWritten.remove(inodeId);
551+
filesBeingWritten.remove(key);
552552
if (filesBeingWritten.isEmpty()) {
553553
lastLeaseRenewal = 0;
554554
}
@@ -580,6 +580,13 @@ void updateLastLeaseRenewal() {
580580
}
581581
}
582582

583+
@VisibleForTesting
584+
public int getNumOfFilesBeingWritten() {
585+
synchronized (filesBeingWritten) {
586+
return filesBeingWritten.size();
587+
}
588+
}
589+
583590
/**
584591
* Get all namespaces of DFSOutputStreams.
585592
*/
@@ -640,14 +647,14 @@ void closeConnectionToNamenode() {
640647
/** Close/abort all files being written. */
641648
public void closeAllFilesBeingWritten(final boolean abort) {
642649
for(;;) {
643-
final long inodeId;
650+
final String key;
644651
final DFSOutputStream out;
645652
synchronized(filesBeingWritten) {
646653
if (filesBeingWritten.isEmpty()) {
647654
return;
648655
}
649-
inodeId = filesBeingWritten.keySet().iterator().next();
650-
out = filesBeingWritten.remove(inodeId);
656+
key = filesBeingWritten.keySet().iterator().next();
657+
out = filesBeingWritten.remove(key);
651658
}
652659
if (out != null) {
653660
try {
@@ -658,7 +665,7 @@ public void closeAllFilesBeingWritten(final boolean abort) {
658665
}
659666
} catch(IOException ie) {
660667
LOG.error("Failed to " + (abort ? "abort" : "close") + " file: "
661-
+ out.getSrc() + " with inode: " + inodeId, ie);
668+
+ out.getSrc() + " with renewLeaseKey: " + key, ie);
662669
}
663670
}
664671
}
@@ -1297,7 +1304,7 @@ public DFSOutputStream create(String src, FsPermission permission,
12971304
src, masked, flag, createParent, replication, blockSize, progress,
12981305
dfsClientConf.createChecksum(checksumOpt),
12991306
getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy);
1300-
beginFileLease(result.getFileId(), result);
1307+
beginFileLease(result.getUniqKey(), result);
13011308
return result;
13021309
}
13031310

@@ -1352,7 +1359,7 @@ public DFSOutputStream primitiveCreate(String src, FsPermission absPermission,
13521359
flag, createParent, replication, blockSize, progress, checksum,
13531360
null, null, null);
13541361
}
1355-
beginFileLease(result.getFileId(), result);
1362+
beginFileLease(result.getUniqKey(), result);
13561363
return result;
13571364
}
13581365

@@ -1497,7 +1504,7 @@ private DFSOutputStream append(String src, int buffersize,
14971504
checkOpen();
14981505
final DFSOutputStream result = callAppend(src, flag, progress,
14991506
favoredNodes);
1500-
beginFileLease(result.getFileId(), result);
1507+
beginFileLease(result.getUniqKey(), result);
15011508
return result;
15021509
}
15031510

@@ -2418,8 +2425,8 @@ long rollEdits() throws IOException {
24182425
}
24192426

24202427
@VisibleForTesting
2421-
ExtendedBlock getPreviousBlock(long fileId) {
2422-
return filesBeingWritten.get(fileId).getBlock();
2428+
ExtendedBlock getPreviousBlock(String key) {
2429+
return filesBeingWritten.get(key).getBlock();
24232430
}
24242431

24252432
/**

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public class DFSOutputStream extends FSOutputSummer
114114
protected final String src;
115115
protected final long fileId;
116116
private final String namespace;
117+
private final String uniqKey;
117118
protected final long blockSize;
118119
protected final int bytesPerChecksum;
119120

@@ -197,6 +198,14 @@ private DFSOutputStream(DFSClient dfsClient, String src,
197198
this.src = src;
198199
this.fileId = stat.getFileId();
199200
this.namespace = stat.getNamespace();
201+
if (this.namespace == null) {
202+
String defaultKey = dfsClient.getConfiguration().get(
203+
HdfsClientConfigKeys.DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
204+
HdfsClientConfigKeys.DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
205+
this.uniqKey = defaultKey + "_" + this.fileId;
206+
} else {
207+
this.uniqKey = this.namespace + "_" + this.fileId;
208+
}
200209
this.blockSize = stat.getBlockSize();
201210
this.blockReplication = stat.getReplication();
202211
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@@ -820,7 +829,7 @@ boolean isClosed() {
820829

821830
void setClosed() {
822831
closed = true;
823-
dfsClient.endFileLease(fileId);
832+
dfsClient.endFileLease(getUniqKey());
824833
getStreamer().release();
825834
}
826835

@@ -923,7 +932,7 @@ protected synchronized void closeImpl() throws IOException {
923932
protected void recoverLease(boolean recoverLeaseOnCloseException) {
924933
if (recoverLeaseOnCloseException) {
925934
try {
926-
dfsClient.endFileLease(fileId);
935+
dfsClient.endFileLease(getUniqKey());
927936
dfsClient.recoverLease(src);
928937
leaseRecovered = true;
929938
} catch (Exception e) {
@@ -1091,6 +1100,11 @@ public String getNamespace() {
10911100
return namespace;
10921101
}
10931102

1103+
@VisibleForTesting
1104+
public String getUniqKey() {
1105+
return this.uniqKey;
1106+
}
1107+
10941108
/**
10951109
* Return the source of stream.
10961110
*/

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ void abort() throws IOException {
10551055
}
10561056
}
10571057

1058-
dfsClient.endFileLease(fileId);
1058+
dfsClient.endFileLease(getUniqKey());
10591059
final IOException ioe = b.build();
10601060
if (ioe != null) {
10611061
throw ioe;

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ public interface HdfsClientConfigKeys {
281281
"dfs.client.fsck.read.timeout";
282282
int DFS_CLIENT_FSCK_READ_TIMEOUT_DEFAULT = 60 * 1000;
283283

284+
String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
285+
"dfs.client.output.stream.uniq.default.key";
286+
String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";
287+
284288
/**
285289
* These are deprecated config keys to client code.
286290
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FSDataOutputStream;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.hdfs.DistributedFileSystem;
24+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
25+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
26+
import org.apache.hadoop.hdfs.server.federation.MockResolver;
27+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
28+
import org.junit.AfterClass;
29+
import org.junit.BeforeClass;
30+
import org.junit.Test;
31+
32+
import java.io.IOException;
33+
34+
import static org.junit.Assert.assertEquals;
35+
36+
/**
37+
* Testing DFSClient renewLease with same INodeId.
38+
*/
39+
public class TestRenewLeaseWithSameINodeId {
40+
41+
/** Federated HDFS cluster. */
42+
private static MiniRouterDFSCluster cluster;
43+
44+
/** The first Router Context for this federated cluster. */
45+
private static MiniRouterDFSCluster.RouterContext routerContext;
46+
47+
@BeforeClass
48+
public static void globalSetUp() throws Exception {
49+
cluster = new MiniRouterDFSCluster(false, 2);
50+
cluster.setNumDatanodesPerNameservice(3);
51+
cluster.startCluster();
52+
53+
Configuration routerConf = new RouterConfigBuilder()
54+
.metrics()
55+
.rpc()
56+
.quota()
57+
.build();
58+
cluster.addRouterOverrides(routerConf);
59+
cluster.startRouters();
60+
61+
// Register and verify all NNs with all routers
62+
cluster.registerNamenodes();
63+
cluster.waitNamenodeRegistration();
64+
65+
routerContext = cluster.getRouters().get(0);
66+
}
67+
68+
@AfterClass
69+
public static void tearDown() throws Exception {
70+
cluster.shutdown();
71+
}
72+
73+
/**
74+
* Testing case:
75+
* 1. One Router DFSClient writing multi files from different namespace with same iNodeId.
76+
* 2. DFSClient Lease Renewer should work well.
77+
*/
78+
@Test
79+
public void testRenewLeaseWithSameINodeId() throws IOException {
80+
// Add mount point "/ns0" and "/ns1"
81+
Router router = cluster.getRouters().get(0).getRouter();
82+
MockResolver resolver = (MockResolver) router.getSubclusterResolver();
83+
resolver.addLocation("/ns0", cluster.getNameservices().get(0), "/ns0");
84+
resolver.addLocation("/ns1", cluster.getNameservices().get(1), "/ns1");
85+
86+
DistributedFileSystem fs = (DistributedFileSystem) routerContext.getFileSystem();
87+
88+
Path path1 = new Path("/ns0/file");
89+
Path path2 = new Path("/ns1/file");
90+
91+
try (FSDataOutputStream ignored1 = fs.create(path1);
92+
FSDataOutputStream ignored2 = fs.create(path2)) {
93+
HdfsFileStatus fileStatus1 = fs.getClient().getFileInfo(path1.toUri().getPath());
94+
HdfsFileStatus fileStatus2 = fs.getClient().getFileInfo(path2.toUri().getPath());
95+
96+
// The fileId of the files from different new namespaces should be same.
97+
assertEquals(fileStatus2.getFileId(), fileStatus1.getFileId());
98+
99+
// The number of fileBeingWritten of this DFSClient should be two.
100+
assertEquals(2, fs.getClient().getNumOfFilesBeingWritten());
101+
}
102+
103+
// The number of fileBeingWritten of this DFSClient should be zero.
104+
assertEquals(0, fs.getClient().getNumOfFilesBeingWritten());
105+
}
106+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6454,4 +6454,12 @@
64546454
frequently than this time, the client will give up waiting.
64556455
</description>
64566456
</property>
6457+
<property>
6458+
<name>dfs.client.output.stream.uniq.default.key</name>
6459+
<value>DEFAULT</value>
6460+
<description>
6461+
The default prefix key to construct the uniqKey for one DFSOutputStream.
6462+
If the namespace is DEFAULT, it's best to change this conf to other value.
6463+
</description>
6464+
</property>
64576465
</configuration>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public static DFSClient getClient(DistributedFileSystem dfs)
5454
return dfs.dfs;
5555
}
5656

57-
public static ExtendedBlock getPreviousBlock(DFSClient client, long fileId) {
58-
return client.getPreviousBlock(fileId);
57+
public static ExtendedBlock getPreviousBlock(DFSClient client, String renewLeaseKey) {
58+
return client.getPreviousBlock(renewLeaseKey);
5959
}
6060

6161
public static long getFileId(DFSOutputStream out) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@
6868
import static org.junit.Assert.assertTrue;
6969
import static org.junit.Assert.fail;
7070
import static org.mockito.ArgumentMatchers.anyBoolean;
71-
import static org.mockito.ArgumentMatchers.anyLong;
7271
import org.mockito.Mockito;
72+
73+
import static org.mockito.ArgumentMatchers.anyString;
7374
import static org.mockito.Mockito.times;
7475
import static org.mockito.Mockito.verify;
7576

@@ -433,7 +434,7 @@ public void testEndLeaseCall() throws Exception {
433434
EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null , 1024, null);
434435
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
435436
spyDFSOutputStream.closeThreads(anyBoolean());
436-
verify(spyClient, times(1)).endFileLease(anyLong());
437+
verify(spyClient, times(1)).endFileLease(anyString());
437438
}
438439

439440
@Test

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -852,13 +852,13 @@ public void testOpenFileWhenNNAndClientCrashAfterAddBlock() throws Exception {
852852
null);
853853
create.write(testData.getBytes());
854854
create.hflush();
855-
long fileId = ((DFSOutputStream)create.
856-
getWrappedStream()).getFileId();
855+
String renewLeaseKey = ((DFSOutputStream)create.
856+
getWrappedStream()).getUniqKey();
857857
FileStatus fileStatus = dfs.getFileStatus(filePath);
858858
DFSClient client = DFSClientAdapter.getClient(dfs);
859859
// add one dummy block at NN, but not write to DataNode
860860
ExtendedBlock previousBlock =
861-
DFSClientAdapter.getPreviousBlock(client, fileId);
861+
DFSClientAdapter.getPreviousBlock(client, renewLeaseKey);
862862
DFSClientAdapter.getNamenode(client).addBlock(
863863
pathString,
864864
client.getClientName(),

0 commit comments

Comments
 (0)