Skip to content

Commit a2242df

Browse files
committed
HADOOP-17894. CredentialProviderFactory.getProviders() recursion loading JCEKS file from S3A (#3393)
* CredentialProviderFactory to detect and report on recursion. * S3AFS to remove incompatible providers. * Integration Test for this. Contributed by Steve Loughran. Change-Id: Ia247b3c9fe8488ffdb7f57b40eb6e37c57e522ef
1 parent 76393e1 commit a2242df

File tree

4 files changed

+225
-8
lines changed

4 files changed

+225
-8
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/alias/CredentialProviderFactory.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import java.util.Iterator;
2626
import java.util.List;
2727
import java.util.ServiceLoader;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.apache.hadoop.classification.InterfaceAudience;
3031
import org.apache.hadoop.classification.InterfaceStability;
3132
import org.apache.hadoop.conf.Configuration;
3233
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
34+
import org.apache.hadoop.fs.PathIOException;
3335

3436
/**
3537
* A factory to create a list of CredentialProvider based on the path given in a
@@ -59,23 +61,42 @@ public abstract CredentialProvider createProvider(URI providerName,
5961
}
6062
}
6163

64+
/**
65+
* Fail fast on any recursive load of credential providers, which can
66+
* happen if the FS itself triggers the load.
67+
* A simple boolean could be used here, as the synchronized block ensures
68+
* that only one thread can be active at a time. An atomic is used
69+
* for rigorousness.
70+
*/
71+
private static final AtomicBoolean SERVICE_LOADER_LOCKED = new AtomicBoolean(false);
72+
6273
public static List<CredentialProvider> getProviders(Configuration conf
6374
) throws IOException {
64-
List<CredentialProvider> result = new ArrayList<CredentialProvider>();
75+
List<CredentialProvider> result = new ArrayList<>();
6576
for(String path: conf.getStringCollection(CREDENTIAL_PROVIDER_PATH)) {
6677
try {
6778
URI uri = new URI(path);
6879
boolean found = false;
6980
// Iterate serviceLoader in a synchronized block since
7081
// serviceLoader iterator is not thread-safe.
7182
synchronized (serviceLoader) {
72-
for (CredentialProviderFactory factory : serviceLoader) {
73-
CredentialProvider kp = factory.createProvider(uri, conf);
74-
if (kp != null) {
75-
result.add(kp);
76-
found = true;
77-
break;
83+
try {
84+
if (SERVICE_LOADER_LOCKED.getAndSet(true)) {
85+
throw new PathIOException(path,
86+
"Recursive load of credential provider; " +
87+
"if loading a JCEKS file, this means that the filesystem connector is " +
88+
"trying to load the same file");
89+
}
90+
for (CredentialProviderFactory factory : serviceLoader) {
91+
CredentialProvider kp = factory.createProvider(uri, conf);
92+
if (kp != null) {
93+
result.add(kp);
94+
found = true;
95+
break;
96+
}
7897
}
98+
} finally {
99+
SERVICE_LOADER_LOCKED.set(false);
79100
}
80101
}
81102
if (!found) {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@
186186
import org.apache.hadoop.fs.store.EtagChecksum;
187187
import org.apache.hadoop.security.UserGroupInformation;
188188
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
189+
import org.apache.hadoop.security.ProviderUtils;
189190
import org.apache.hadoop.security.token.Token;
190191
import org.apache.hadoop.util.Progressable;
191192
import org.apache.hadoop.util.ReflectionUtils;
@@ -393,6 +394,11 @@ public void initialize(URI name, Configuration originalConf)
393394
LOG.debug("Initializing S3AFileSystem for {}", bucket);
394395
// clone the configuration into one with propagated bucket options
395396
Configuration conf = propagateBucketOptions(originalConf, bucket);
397+
398+
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
399+
conf = ProviderUtils.excludeIncompatibleCredentialProviders(
400+
conf, S3AFileSystem.class);
401+
396402
// fix up the classloader of the configuration to be whatever
397403
// classloader loaded this filesystem.
398404
// See: HADOOP-17372

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ private static Method getFactoryMethod(Class<?> cl, Class<?> returnType,
11531153
public static Configuration propagateBucketOptions(Configuration source,
11541154
String bucket) {
11551155

1156-
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket");
1156+
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket is null/empty");
11571157
final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket +'.';
11581158
LOG.debug("Propagating entries under {}", bucketPrefix);
11591159
final Configuration dest = new Configuration(source);
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.auth;
20+
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.IOException;
23+
import java.io.PrintStream;
24+
import java.net.URI;
25+
import java.nio.charset.StandardCharsets;
26+
27+
import org.assertj.core.api.Assertions;
28+
import org.junit.AfterClass;
29+
import org.junit.Test;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import org.apache.hadoop.conf.Configuration;
34+
import org.apache.hadoop.fs.FileSystem;
35+
import org.apache.hadoop.fs.Path;
36+
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
37+
import org.apache.hadoop.io.IOUtils;
38+
import org.apache.hadoop.security.UserGroupInformation;
39+
import org.apache.hadoop.security.alias.CredentialShell;
40+
41+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
42+
import static org.apache.hadoop.fs.s3a.Constants.S3A_SECURITY_CREDENTIAL_PROVIDER_PATH;
43+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
44+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
45+
46+
/**
47+
* Test JCEKS file load/save on S3a.
48+
* Uses CredentialShell to better replicate the CLI.
49+
*
50+
* See HADOOP-17894.
51+
* This test is at risk of leaking FS instances in the JCEKS providers;
52+
* this is handled in an AfterClass operation.
53+
*
54+
*/
55+
public class ITestJceksIO extends AbstractS3ATestBase {
56+
private static final Logger LOG = LoggerFactory.getLogger(
57+
ITestJceksIO.class);
58+
private static final String UTF8 = StandardCharsets.UTF_8.name();
59+
private PrintStream oldStdout, oldStderr;
60+
private ByteArrayOutputStream stdout, stderr;
61+
private PrintStream printStdout, printStderr;
62+
63+
@Override
64+
public void setup() throws Exception {
65+
super.setup();
66+
oldStdout = System.out;
67+
oldStderr = System.err;
68+
stdout = new ByteArrayOutputStream();
69+
printStdout = new PrintStream(stdout);
70+
System.setOut(printStdout);
71+
72+
stderr = new ByteArrayOutputStream();
73+
printStderr = new PrintStream(stderr);
74+
System.setErr(printStderr);
75+
}
76+
77+
@Override
78+
public void teardown() throws Exception {
79+
System.setOut(oldStdout);
80+
System.setErr(oldStderr);
81+
IOUtils.cleanupWithLogger(LOG, printStdout, printStderr);
82+
super.teardown();
83+
}
84+
85+
/**
86+
* Shut down all filesystems for this user to avoid
87+
* leaking those used by credential providers.
88+
*/
89+
@AfterClass
90+
public static void closeAllFilesystems() {
91+
try {
92+
LOG.info("Closing down all filesystems for current user");
93+
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
94+
} catch (IOException e) {
95+
LOG.warn("UGI.getCurrentUser()", e);
96+
}
97+
}
98+
99+
/**
100+
* FS config with no providers. FS caching is disabled.
101+
* @return a new configuration.
102+
*/
103+
private Configuration createNewConfiguration() {
104+
final Configuration conf = new Configuration(getConfiguration());
105+
removeBaseAndBucketOverrides(conf,
106+
HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH,
107+
S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
108+
disableFilesystemCaching(conf);
109+
return conf;
110+
111+
}
112+
113+
/*
114+
* List credentials; expect the file to be missing.
115+
* hadoop credential list -provider jceks://s3a@bucket/s3.jceks
116+
*/
117+
@Test
118+
public void testListMissingJceksFile() throws Throwable {
119+
final Path dir = path("jceks");
120+
Path keystore = new Path(dir, "keystore.jceks");
121+
String jceksProvider = toJceksProvider(keystore);
122+
123+
CredentialShell cs = new CredentialShell();
124+
125+
cs.setConf(createNewConfiguration());
126+
run(cs, null,
127+
"list", "-provider", jceksProvider);
128+
}
129+
130+
@Test
131+
public void testCredentialSuccessfulLifecycle() throws Exception {
132+
final Path dir = path("jceks");
133+
Path keystore = new Path(dir, "keystore.jceks");
134+
String jceksProvider = toJceksProvider(keystore);
135+
CredentialShell cs = new CredentialShell();
136+
cs.setConf(createNewConfiguration());
137+
run(cs, "credential1 has been successfully created.", "create", "credential1", "-value",
138+
"p@ssw0rd", "-provider",
139+
jceksProvider);
140+
141+
assertIsFile(keystore);
142+
run(cs, "credential1",
143+
"list", "-provider", jceksProvider);
144+
145+
run(cs, "credential1 has been successfully deleted.",
146+
"delete", "credential1", "-f", "-provider",
147+
jceksProvider);
148+
149+
String[] args5 = {
150+
"list", "-provider",
151+
jceksProvider
152+
};
153+
String out = run(cs, null, args5);
154+
Assertions.assertThat(out)
155+
.describedAs("Command result of list")
156+
.doesNotContain("credential1");
157+
}
158+
159+
private String run(CredentialShell cs, String expected, String... args)
160+
throws Exception {
161+
stdout.reset();
162+
int rc = cs.run(args);
163+
final String out = stdout.toString(UTF8);
164+
LOG.error("{}", stderr.toString(UTF8));
165+
LOG.info("{}", out);
166+
167+
Assertions.assertThat(rc)
168+
.describedAs("Command result of %s with output %s",
169+
args[0], out)
170+
.isEqualTo(0);
171+
if (expected != null) {
172+
Assertions.assertThat(out)
173+
.describedAs("Command result of %s", args[0])
174+
.contains(expected);
175+
}
176+
return out;
177+
}
178+
179+
/**
180+
* Convert a path to a jceks URI.
181+
* @param keystore store
182+
* @return string for the command line
183+
*/
184+
private String toJceksProvider(Path keystore) {
185+
final URI uri = keystore.toUri();
186+
return String.format("jceks://%s@%s%s",
187+
uri.getScheme(), uri.getHost(), uri.getPath());
188+
}
189+
190+
}

0 commit comments

Comments
 (0)