Skip to content

Commit 4734c77

Browse files
HDFS-15322. Make NflyFS to work when ViewFsOverloadScheme's scheme and target uris schemes are same. Contributed by Uma Maheswara Rao G.
1 parent 52b21de commit 4734c77

File tree

8 files changed

+230
-38
lines changed

8 files changed

+230
-38
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,17 @@ public static void addLinkMerge(Configuration conf, final URI[] targets) {
135135
addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
136136
}
137137

138+
/**
139+
* Add nfly link to configuration for the given mount table.
140+
*/
141+
public static void addLinkNfly(Configuration conf, String mountTableName,
142+
String src, String settings, final String targets) {
143+
conf.set(
144+
getConfigViewFsPrefix(mountTableName) + "."
145+
+ Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
146+
targets);
147+
}
148+
138149
/**
139150
*
140151
* @param conf
@@ -149,9 +160,7 @@ public static void addLinkNfly(Configuration conf, String mountTableName,
149160
settings = settings == null
150161
? "minReplication=2,repairOnRead=true"
151162
: settings;
152-
153-
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
154-
Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
163+
addLinkNfly(conf, mountTableName, src, settings,
155164
StringUtils.uriToString(targets));
156165
}
157166

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs.viewfs;
19+
20+
import java.io.IOException;
21+
import java.net.URI;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience.Private;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.FileSystem;
26+
27+
/**
28+
* File system instance getter.
29+
*/
30+
@Private
31+
class FsGetter {
32+
33+
/**
34+
* Gets new file system instance of given uri.
35+
*/
36+
public FileSystem getNewInstance(URI uri, Configuration conf)
37+
throws IOException {
38+
return FileSystem.newInstance(uri, conf);
39+
}
40+
41+
/**
42+
* Gets file system instance of given uri.
43+
*/
44+
public FileSystem get(URI uri, Configuration conf) throws IOException {
45+
return FileSystem.get(uri, conf);
46+
}
47+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ public void load(String mountTableConfigPath, Configuration conf)
5959
throws IOException {
6060
this.mountTable = new Path(mountTableConfigPath);
6161
String scheme = mountTable.toUri().getScheme();
62-
ViewFileSystem.FsGetter fsGetter =
63-
new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
62+
FsGetter fsGetter = new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
6463
try (FileSystem fs = fsGetter.getNewInstance(mountTable.toUri(), conf)) {
6564
RemoteIterator<LocatedFileStatus> listFiles =
6665
fs.listFiles(mountTable, false);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,21 @@ private static String getRack(String rackString) {
212212
*/
213213
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
214214
EnumSet<NflyKey> nflyFlags) throws IOException {
215+
this(uris, conf, minReplication, nflyFlags, null);
216+
}
217+
218+
/**
219+
* Creates a new Nfly instance.
220+
*
221+
* @param uris the list of uris in the mount point
222+
* @param conf configuration object
223+
* @param minReplication minimum copies to commit a write op
224+
* @param nflyFlags modes such readMostRecent
225+
* @param fsGetter to get the file system instance with the given uri
226+
* @throws IOException
227+
*/
228+
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
229+
EnumSet<NflyKey> nflyFlags, FsGetter fsGetter) throws IOException {
215230
if (uris.length < minReplication) {
216231
throw new IOException(minReplication + " < " + uris.length
217232
+ ": Minimum replication < #destinations");
@@ -238,8 +253,14 @@ private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
238253
nodes = new NflyNode[uris.length];
239254
final Iterator<String> rackIter = rackStrings.iterator();
240255
for (int i = 0; i < nodes.length; i++) {
241-
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
242-
conf);
256+
if (fsGetter != null) {
257+
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(),
258+
new ChRootedFileSystem(fsGetter.getNewInstance(uris[i], conf),
259+
uris[i]));
260+
} else {
261+
nodes[i] =
262+
new NflyNode(hostStrings.get(i), rackIter.next(), uris[i], conf);
263+
}
243264
}
244265
// sort all the uri's by distance from myNode, the local file system will
245266
// automatically be the the first one.
@@ -921,7 +942,7 @@ private static void processThrowable(NflyNode nflyNode, String op,
921942
* @throws IOException
922943
*/
923944
static FileSystem createFileSystem(URI[] uris, Configuration conf,
924-
String settings) throws IOException {
945+
String settings, FsGetter fsGetter) throws IOException {
925946
// assert settings != null
926947
int minRepl = DEFAULT_MIN_REPLICATION;
927948
EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
@@ -946,6 +967,6 @@ static FileSystem createFileSystem(URI[] uris, Configuration conf,
946967
throw new IllegalArgumentException(nflyKey + ": Infeasible");
947968
}
948969
}
949-
return new NflyFSystem(uris, conf, minRepl, nflyFlags);
970+
return new NflyFSystem(uris, conf, minRepl, nflyFlags, fsGetter);
950971
}
951972
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -96,27 +96,6 @@ static AccessControlException readOnlyMountTable(final String operation,
9696
return readOnlyMountTable(operation, p.toString());
9797
}
9898

99-
/**
100-
* File system instance getter.
101-
*/
102-
static class FsGetter {
103-
104-
/**
105-
* Gets new file system instance of given uri.
106-
*/
107-
public FileSystem getNewInstance(URI uri, Configuration conf)
108-
throws IOException {
109-
return FileSystem.newInstance(uri, conf);
110-
}
111-
112-
/**
113-
* Gets file system instance of given uri.
114-
*/
115-
public FileSystem get(URI uri, Configuration conf) throws IOException {
116-
return FileSystem.get(uri, conf);
117-
}
118-
}
119-
12099
/**
121100
* Gets file system creator instance.
122101
*/
@@ -316,7 +295,8 @@ protected FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
316295
@Override
317296
protected FileSystem getTargetFileSystem(final String settings,
318297
final URI[] uris) throws URISyntaxException, IOException {
319-
return NflyFSystem.createFileSystem(uris, config, settings);
298+
return NflyFSystem.createFileSystem(uris, config, settings,
299+
fsGetter);
320300
}
321301
};
322302
workingDir = this.getHomeDirectory();

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.hadoop.fs.permission.AclUtil;
5151
import org.apache.hadoop.fs.permission.FsAction;
5252
import org.apache.hadoop.fs.permission.FsPermission;
53-
import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter;
5453
import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
5554
import org.apache.hadoop.security.AccessControlException;
5655
import org.apache.hadoop.security.Credentials;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme.ChildFsGetter;
3232
import org.apache.hadoop.util.Shell;
3333
import org.eclipse.jetty.util.log.Log;
34+
import org.junit.Assert;
3435

3536

3637
/**
@@ -146,7 +147,8 @@ static void addMountLinksToFile(String mountTable, String[] sources,
146147
throws IOException, URISyntaxException {
147148
ChildFsGetter cfs = new ViewFileSystemOverloadScheme.ChildFsGetter(
148149
mountTableConfPath.toUri().getScheme());
149-
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), conf)) {
150+
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(),
151+
conf)) {
150152
try (FSDataOutputStream out = fs.create(mountTableConfPath)) {
151153
String prefix =
152154
new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(".")
@@ -158,17 +160,23 @@ static void addMountLinksToFile(String mountTable, String[] sources,
158160
for (int i = 0; i < sources.length; i++) {
159161
String src = sources[i];
160162
String target = targets[i];
163+
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
161164
out.writeBytes("<property><name>");
162-
if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
165+
if (isNfly) {
166+
String[] srcParts = src.split("[.]");
167+
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
168+
String actualSrc = srcParts[srcParts.length - 1];
169+
String params = srcParts[srcParts.length - 2];
170+
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_NFLY + "."
171+
+ params + "." + actualSrc);
172+
} else if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
163173
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_FALLBACK);
164-
out.writeBytes("</name>");
165174
} else if (Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH.equals(src)) {
166175
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH);
167-
out.writeBytes("</name>");
168176
} else {
169177
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK + "." + src);
170-
out.writeBytes("</name>");
171178
}
179+
out.writeBytes("</name>");
172180
out.writeBytes("<value>");
173181
out.writeBytes(target);
174182
out.writeBytes("</value></property>");
@@ -191,7 +199,15 @@ static void addMountLinksToConf(String mountTable, String[] sources,
191199
String target = targets[i];
192200
String mountTableName = mountTable == null ?
193201
Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE : mountTable;
194-
if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
202+
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
203+
if (isNfly) {
204+
String[] srcParts = src.split("[.]");
205+
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
206+
String actualSrc = srcParts[srcParts.length - 1];
207+
String params = srcParts[srcParts.length - 2];
208+
ConfigUtil.addLinkNfly(config, mountTableName, actualSrc, params,
209+
target);
210+
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
195211
ConfigUtil.addLinkFallback(config, mountTableName, new URI(target));
196212
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH)) {
197213
ConfigUtil.addLinkMergeSlash(config, mountTableName, new URI(target));

0 commit comments

Comments
 (0)