Skip to content

Commit 1930e47

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-16665. Filesystems to be closed if they failed during initialize().
Contributed by Steve Loughran. This FileSystem instantiation so if an IOException or RuntimeException is raised in the invocation of FileSystem.initialize() then a best-effort attempt is made to close the FS instance; exceptions raised that there are swallowed. The S3AFileSystem is also modified to do its own cleanup if an IOException is raised during its initialize() process, it being the FS we know has the "potential" to leak threads, especially in extension points (e.g AWS Authenticators) which spawn threads. Change-Id: Ib84073a606c9d53bf53cbfca4629876a03894f04
1 parent 20ddd0d commit 1930e47

File tree

7 files changed

+264
-12
lines changed

7 files changed

+264
-12
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.hadoop.fs.permission.FsAction;
6565
import org.apache.hadoop.fs.permission.FsCreateModes;
6666
import org.apache.hadoop.fs.permission.FsPermission;
67+
import org.apache.hadoop.io.IOUtils;
6768
import org.apache.hadoop.io.MultipleIOException;
6869
import org.apache.hadoop.net.NetUtils;
6970
import org.apache.hadoop.security.AccessControlException;
@@ -3390,9 +3391,22 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
33903391
Tracer tracer = FsTracer.get(conf);
33913392
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
33923393
scope.addKVAnnotation("scheme", uri.getScheme());
3393-
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
3394-
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
3395-
fs.initialize(uri, conf);
3394+
Class<? extends FileSystem> clazz =
3395+
getFileSystemClass(uri.getScheme(), conf);
3396+
FileSystem fs = ReflectionUtils.newInstance(clazz, conf);
3397+
try {
3398+
fs.initialize(uri, conf);
3399+
} catch (IOException | RuntimeException e) {
3400+
// exception raised during initialization.
3401+
// log summary at warn and full stack at debug
3402+
LOGGER.warn("Failed to initialize fileystem {}: {}",
3403+
uri, e.toString());
3404+
LOGGER.debug("Failed to initialize fileystem", e);
3405+
// then (robustly) close the FS, so as to invoke any
3406+
// cleanup code.
3407+
IOUtils.cleanupWithLogger(LOGGER, fs);
3408+
throw e;
3409+
}
33963410
return fs;
33973411
}
33983412
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemInitialization.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,24 @@
1818
package org.apache.hadoop.fs;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.permission.FsPermission;
22+
import org.apache.hadoop.util.Progressable;
2123

24+
import java.io.FileNotFoundException;
2225
import java.io.IOException;
26+
import java.net.URI;
2327
import java.net.URL;
2428
import java.util.ServiceConfigurationError;
2529

2630
import org.junit.Test;
31+
32+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
33+
import static org.assertj.core.api.Assertions.assertThat;
2734
import static org.junit.Assert.*;
2835

36+
/**
37+
* Tests related to filesystem creation and lifecycle.
38+
*/
2939
public class TestFileSystemInitialization {
3040

3141
/**
@@ -55,4 +65,119 @@ public void testMissingLibraries() {
5565
} catch (Exception | ServiceConfigurationError expected) {
5666
}
5767
}
68+
69+
@Test
70+
public void testNewInstanceFailure() throws Throwable {
71+
intercept(IOException.class, FailingFileSystem.INITIALIZE, () ->
72+
FileSystem.newInstance(new URI("failing://localhost"), FailingFileSystem
73+
.failingConf()));
74+
assertThat(FailingFileSystem.initCount).describedAs("init count")
75+
.isEqualTo(1);
76+
assertThat(FailingFileSystem.closeCount).describedAs("close count")
77+
.isEqualTo(1);
78+
}
79+
80+
/**
81+
* An FS which will fail on both init and close, and update
82+
* counters of invocations as it does so.
83+
*/
84+
public static class FailingFileSystem extends FileSystem {
85+
86+
public static final String INITIALIZE = "initialize()";
87+
88+
public static final String CLOSE = "close()";
89+
90+
private static int initCount;
91+
92+
private static int closeCount;
93+
94+
private static Configuration failingConf() {
95+
final Configuration conf = new Configuration(false);
96+
conf.setClass("fs.failing.impl", FailingFileSystem.class,
97+
FileSystem.class);
98+
return conf;
99+
}
100+
101+
@Override
102+
public void initialize(final URI name, final Configuration conf)
103+
throws IOException {
104+
super.initialize(name, conf);
105+
initCount++;
106+
throw new IOException(INITIALIZE);
107+
}
108+
109+
@Override
110+
public void close() throws IOException {
111+
closeCount++;
112+
throw new IOException(CLOSE);
113+
}
114+
115+
@Override
116+
public URI getUri() {
117+
return null;
118+
}
119+
120+
@Override
121+
public FSDataInputStream open(final Path f, final int bufferSize)
122+
throws IOException {
123+
return null;
124+
}
125+
126+
@Override
127+
public FSDataOutputStream create(final Path f,
128+
final FsPermission permission,
129+
final boolean overwrite,
130+
final int bufferSize,
131+
final short replication,
132+
final long blockSize,
133+
final Progressable progress) throws IOException {
134+
return null;
135+
}
136+
137+
@Override
138+
public FSDataOutputStream append(final Path f,
139+
final int bufferSize,
140+
final Progressable progress) throws IOException {
141+
return null;
142+
}
143+
144+
@Override
145+
public boolean rename(final Path src, final Path dst) throws IOException {
146+
return false;
147+
}
148+
149+
@Override
150+
public boolean delete(final Path f, final boolean recursive)
151+
throws IOException {
152+
return false;
153+
}
154+
155+
@Override
156+
public FileStatus[] listStatus(final Path f)
157+
throws FileNotFoundException, IOException {
158+
return new FileStatus[0];
159+
}
160+
161+
@Override
162+
public void setWorkingDirectory(final Path new_dir) {
163+
164+
}
165+
166+
@Override
167+
public Path getWorkingDirectory() {
168+
return null;
169+
}
170+
171+
@Override
172+
public boolean mkdirs(final Path f, final FsPermission permission)
173+
throws IOException {
174+
return false;
175+
}
176+
177+
@Override
178+
public FileStatus getFileStatus(final Path f) throws IOException {
179+
return null;
180+
}
181+
}
182+
58183
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.test;
19+
20+
import java.util.concurrent.TimeUnit;
21+
22+
import org.junit.Before;
23+
import org.junit.BeforeClass;
24+
import org.junit.Rule;
25+
import org.junit.rules.TestName;
26+
import org.junit.rules.Timeout;
27+
28+
/**
29+
* A base class for JUnit5+ tests that sets a default timeout for all tests
30+
* that subclass this test.
31+
*
32+
* Threads are named to the method being executed, for ease of diagnostics
33+
* in logs and thread dumps.
34+
*
35+
* Unlike {@link HadoopTestBase} this class does not extend JUnit Assert
36+
* so is easier to use with AssertJ.
37+
*/
38+
public abstract class AbstractHadoopTestBase {
39+
40+
/**
41+
* System property name to set the test timeout: {@value}.
42+
*/
43+
public static final String PROPERTY_TEST_DEFAULT_TIMEOUT =
44+
"test.default.timeout";
45+
46+
/**
47+
* The default timeout (in milliseconds) if the system property
48+
* {@link #PROPERTY_TEST_DEFAULT_TIMEOUT}
49+
* is not set: {@value}.
50+
*/
51+
public static final int TEST_DEFAULT_TIMEOUT_VALUE = 100000;
52+
53+
/**
54+
* The JUnit rule that sets the default timeout for tests.
55+
*/
56+
@Rule
57+
public Timeout defaultTimeout = retrieveTestTimeout();
58+
59+
/**
60+
* Retrieve the test timeout from the system property
61+
* {@link #PROPERTY_TEST_DEFAULT_TIMEOUT}, falling back to
62+
* the value in {@link #TEST_DEFAULT_TIMEOUT_VALUE} if the
63+
* property is not defined.
64+
* @return the recommended timeout for tests
65+
*/
66+
public static Timeout retrieveTestTimeout() {
67+
String propval = System.getProperty(PROPERTY_TEST_DEFAULT_TIMEOUT,
68+
Integer.toString(
69+
TEST_DEFAULT_TIMEOUT_VALUE));
70+
int millis;
71+
try {
72+
millis = Integer.parseInt(propval);
73+
} catch (NumberFormatException e) {
74+
//fall back to the default value, as the property cannot be parsed
75+
millis = TEST_DEFAULT_TIMEOUT_VALUE;
76+
}
77+
return new Timeout(millis, TimeUnit.MILLISECONDS);
78+
}
79+
80+
/**
81+
* The method name.
82+
*/
83+
@Rule
84+
public TestName methodName = new TestName();
85+
86+
/**
87+
* Get the method name; defaults to the value of {@link #methodName}.
88+
* Subclasses may wish to override it, which will tune the thread naming.
89+
* @return the name of the method.
90+
*/
91+
protected String getMethodName() {
92+
return methodName.getMethodName();
93+
}
94+
95+
/**
96+
* Static initializer names this thread "JUnit".
97+
*/
98+
@BeforeClass
99+
public static void nameTestThread() {
100+
Thread.currentThread().setName("JUnit");
101+
}
102+
103+
/**
104+
* Before each method, the thread is renamed to match the method name.
105+
*/
106+
@Before
107+
public void nameThreadToMethod() {
108+
Thread.currentThread().setName("JUnit-" + getMethodName());
109+
}
110+
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.test;
1919

20+
import java.util.concurrent.TimeUnit;
21+
2022
import org.junit.Assert;
2123
import org.junit.Before;
2224
import org.junit.BeforeClass;
@@ -70,7 +72,7 @@ public static Timeout retrieveTestTimeout() {
7072
//fall back to the default value, as the property cannot be parsed
7173
millis = TEST_DEFAULT_TIMEOUT_VALUE;
7274
}
73-
return new Timeout(millis);
75+
return new Timeout(millis, TimeUnit.MILLISECONDS);
7476
}
7577

7678
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
5555
import static org.apache.hadoop.fs.s3a.Statistic.*;
56+
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
5657

5758
/**
5859
* Upload files/parts directly via different buffering mechanisms:
@@ -396,9 +397,9 @@ public void close() throws IOException {
396397
writeOperationHelper.writeFailed(ioe);
397398
throw ioe;
398399
} finally {
399-
closeAll(LOG, block, blockFactory);
400+
cleanupWithLogger(LOG, block, blockFactory);
400401
LOG.debug("Statistics: {}", statistics);
401-
closeAll(LOG, statistics);
402+
cleanupWithLogger(LOG, statistics);
402403
clearActiveBlock();
403404
}
404405
// Note end of write. This does not change the state of the remote FS.
@@ -437,7 +438,7 @@ private int putObject() throws IOException {
437438
// stream afterwards.
438439
return writeOperationHelper.putObject(putObjectRequest);
439440
} finally {
440-
closeAll(LOG, uploadData, block);
441+
cleanupWithLogger(LOG, uploadData, block);
441442
}
442443
});
443444
clearActiveBlock();
@@ -614,7 +615,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
614615
return partETag;
615616
} finally {
616617
// close the stream and block
617-
closeAll(LOG, uploadData, block);
618+
cleanupWithLogger(LOG, uploadData, block);
618619
}
619620
});
620621
partETagsFutures.add(partETagFuture);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.hadoop.util.DirectBufferPool;
4141

4242
import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
43-
import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll;
43+
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
4444

4545
/**
4646
* Set of classes to support output streaming into blocks which are then
@@ -155,7 +155,7 @@ InputStream getUploadStream() {
155155
*/
156156
@Override
157157
public void close() throws IOException {
158-
closeAll(LOG, uploadStream);
158+
cleanupWithLogger(LOG, uploadStream);
159159
}
160160
}
161161

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626
import org.apache.hadoop.conf.Configuration;
2727
import org.apache.hadoop.fs.Path;
2828
import org.apache.hadoop.fs.s3a.S3AFileSystem;
29-
import org.apache.hadoop.fs.s3a.S3AUtils;
3029
import org.apache.hadoop.fs.s3a.commit.ITestCommitOperations;
3130

3231
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
3332
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
3433
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
3534
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
3635
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
36+
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
3737

3838
/**
3939
* Verify that the commit operations work with a restricted set of operations.
@@ -84,7 +84,7 @@ public void setup() throws Exception {
8484

8585
@Override
8686
public void teardown() throws Exception {
87-
S3AUtils.closeAll(LOG, roleFS);
87+
cleanupWithLogger(LOG, roleFS);
8888
// switches getFileSystem() back to the full FS.
8989
roleFS = null;
9090
super.teardown();

0 commit comments

Comments
 (0)