Skip to content

Commit 213b9ae

Browse files
authored
Merge branch 'trunk' into YARN-11218
2 parents 6ae1a9c + 4520448 commit 213b9ae

File tree

37 files changed

+826
-104
lines changed

37 files changed

+826
-104
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Trash.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Trash(FileSystem fs, Configuration conf) throws IOException {
6969
* Hence we get the file system of the fully-qualified resolved-path and
7070
* then move the path p to the trashbin in that volume,
7171
* @param fs - the filesystem of path p
72-
* @param p - the path being deleted - to be moved to trasg
72+
* @param p - the path being deleted - to be moved to trash
7373
* @param conf - configuration
7474
* @return false if the item is already in the trash or trash is disabled
7575
* @throws IOException on error

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
import org.apache.hadoop.util.ProtoUtil;
124124
import org.apache.hadoop.util.StringUtils;
125125
import org.apache.hadoop.util.Time;
126+
import java.util.concurrent.atomic.AtomicBoolean;
126127
import org.apache.hadoop.tracing.Span;
127128
import org.apache.hadoop.tracing.SpanContext;
128129
import org.apache.hadoop.tracing.TraceScope;
@@ -153,6 +154,13 @@ public abstract class Server {
153154
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
154155
private Tracer tracer;
155156
private AlignmentContext alignmentContext;
157+
158+
/**
159+
* Allow server to do force Kerberos re-login once after failure irrespective
160+
* of the last login time.
161+
*/
162+
private final AtomicBoolean canTryForceLogin = new AtomicBoolean(true);
163+
156164
/**
157165
* Logical name of the server used in metrics and monitor.
158166
*/
@@ -2206,7 +2214,23 @@ private void saslProcess(RpcSaslProto saslMessage)
22062214
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
22072215
+ attemptingUser + " (" + e.getLocalizedMessage()
22082216
+ ") with true cause: (" + tce.getLocalizedMessage() + ")");
2209-
throw tce;
2217+
if (!UserGroupInformation.getLoginUser().isLoginSuccess()) {
2218+
doKerberosRelogin();
2219+
try {
2220+
// try processing message again
2221+
LOG.debug("Reprocessing sasl message for {}:{} after re-login",
2222+
this.toString(), attemptingUser);
2223+
saslResponse = processSaslMessage(saslMessage);
2224+
AUDITLOG.info("Retry {}{}:{} after failure", AUTH_SUCCESSFUL_FOR,
2225+
this.toString(), attemptingUser);
2226+
canTryForceLogin.set(true);
2227+
} catch (IOException exp) {
2228+
tce = (IOException) getTrueCause(e);
2229+
throw tce;
2230+
}
2231+
} else {
2232+
throw tce;
2233+
}
22102234
}
22112235

22122236
if (saslServer != null && saslServer.isComplete()) {
@@ -3322,6 +3346,26 @@ protected Server(String bindAddress, int port,
33223346
metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
33233347
}
33243348

3349+
private synchronized void doKerberosRelogin() throws IOException {
3350+
if(UserGroupInformation.getLoginUser().isLoginSuccess()){
3351+
return;
3352+
}
3353+
LOG.warn("Initiating re-login from IPC Server");
3354+
if (canTryForceLogin.compareAndSet(true, false)) {
3355+
if (UserGroupInformation.isLoginKeytabBased()) {
3356+
UserGroupInformation.getLoginUser().forceReloginFromKeytab();
3357+
} else if (UserGroupInformation.isLoginTicketBased()) {
3358+
UserGroupInformation.getLoginUser().forceReloginFromTicketCache();
3359+
}
3360+
} else {
3361+
if (UserGroupInformation.isLoginKeytabBased()) {
3362+
UserGroupInformation.getLoginUser().reloginFromKeytab();
3363+
} else if (UserGroupInformation.isLoginTicketBased()) {
3364+
UserGroupInformation.getLoginUser().reloginFromTicketCache();
3365+
}
3366+
}
3367+
}
3368+
33253369
public synchronized void addAuxiliaryListener(int auxiliaryPort)
33263370
throws IOException {
33273371
if (auxiliaryListenerMap == null) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
* <p>This class can also be used to coordinate multiple logging points; see
6666
* {@link #record(String, long, double...)} for more details.
6767
*
68-
* <p>This class is not thread-safe.
68+
* <p>This class is thread-safe.
6969
*/
7070
public class LogThrottlingHelper {
7171

@@ -192,7 +192,7 @@ public LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName) {
192192
* @return A LogAction indicating whether or not the caller should write to
193193
* its log.
194194
*/
195-
public LogAction record(double... values) {
195+
public synchronized LogAction record(double... values) {
196196
return record(DEFAULT_RECORDER_NAME, timer.monotonicNow(), values);
197197
}
198198

@@ -244,7 +244,7 @@ public LogAction record(double... values) {
244244
*
245245
* @see #record(double...)
246246
*/
247-
public LogAction record(String recorderName, long currentTimeMs,
247+
public synchronized LogAction record(String recorderName, long currentTimeMs,
248248
double... values) {
249249
if (primaryRecorderName == null) {
250250
primaryRecorderName = recorderName;
@@ -287,7 +287,7 @@ public LogAction record(String recorderName, long currentTimeMs,
287287
* @param idx The index value.
288288
* @return The summary information.
289289
*/
290-
public SummaryStatistics getCurrentStats(String recorderName, int idx) {
290+
public synchronized SummaryStatistics getCurrentStats(String recorderName, int idx) {
291291
LoggingAction currentLog = currentLogs.get(recorderName);
292292
if (currentLog != null) {
293293
return currentLog.getStats(idx);
@@ -314,6 +314,13 @@ public static String getLogSupressionMessage(LogAction action) {
314314
}
315315
}
316316

317+
@VisibleForTesting
318+
public synchronized void reset() {
319+
primaryRecorderName = null;
320+
currentLogs.clear();
321+
lastLogTimestampMs = Long.MIN_VALUE;
322+
}
323+
317324
/**
318325
* A standard log action which keeps track of all of the values which have
319326
* been logged. This is also used for internal bookkeeping via its private

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,18 @@ private void setLogin(LoginContext login) {
529529
user.setLogin(login);
530530
}
531531

532+
/** This method checks for a successful Kerberos login
533+
* and returns true by default if it is not using Kerberos.
534+
*
535+
* @return true on successful login
536+
*/
537+
public boolean isLoginSuccess() {
538+
LoginContext login = user.getLogin();
539+
return (login instanceof HadoopLoginContext)
540+
? ((HadoopLoginContext) login).isLoginSuccess()
541+
: true;
542+
}
543+
532544
/**
533545
* Set the last login time for logged in user
534546
* @param loginTime the number of milliseconds since the beginning of time
@@ -1276,6 +1288,23 @@ private void reloginFromKeytab(boolean checkTGT, boolean ignoreLastLoginTime)
12761288
relogin(login, ignoreLastLoginTime);
12771289
}
12781290

1291+
/**
1292+
* Force re-Login a user in from the ticket cache irrespective of the last
1293+
* login time. This method assumes that login had happened already. The
1294+
* Subject field of this UserGroupInformation object is updated to have the
1295+
* new credentials.
1296+
*
1297+
* @throws IOException
1298+
* raised on errors performing I/O.
1299+
* @throws KerberosAuthException
1300+
* on a failure
1301+
*/
1302+
@InterfaceAudience.Public
1303+
@InterfaceStability.Evolving
1304+
public void forceReloginFromTicketCache() throws IOException {
1305+
reloginFromTicketCache(true);
1306+
}
1307+
12791308
/**
12801309
* Re-Login a user in from the ticket cache. This
12811310
* method assumes that login had happened already.
@@ -1287,14 +1316,19 @@ private void reloginFromKeytab(boolean checkTGT, boolean ignoreLastLoginTime)
12871316
@InterfaceAudience.Public
12881317
@InterfaceStability.Evolving
12891318
public void reloginFromTicketCache() throws IOException {
1319+
reloginFromTicketCache(false);
1320+
}
1321+
1322+
private void reloginFromTicketCache(boolean ignoreLastLoginTime)
1323+
throws IOException {
12901324
if (!shouldRelogin() || !isFromTicket()) {
12911325
return;
12921326
}
12931327
HadoopLoginContext login = getLogin();
12941328
if (login == null) {
12951329
throw new KerberosAuthException(MUST_FIRST_LOGIN);
12961330
}
1297-
relogin(login, false);
1331+
relogin(login, ignoreLastLoginTime);
12981332
}
12991333

13001334
private void relogin(HadoopLoginContext login, boolean ignoreLastLoginTime)
@@ -2083,6 +2117,11 @@ private static class HadoopLoginContext extends LoginContext {
20832117
this.conf = conf;
20842118
}
20852119

2120+
/** Get the login status. */
2121+
public boolean isLoginSuccess() {
2122+
return isLoggedIn.get();
2123+
}
2124+
20862125
String getAppName() {
20872126
return appName;
20882127
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfigurationFieldsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public abstract class TestConfigurationFieldsBase {
194194
HashMap<String,String> retVal = new HashMap<>();
195195

196196
// Setup regexp for valid properties
197-
String propRegex = "^[A-Za-z][A-Za-z0-9_-]+(\\.[A-Za-z0-9_-]+)+$";
197+
String propRegex = "^[A-Za-z][A-Za-z0-9_-]+(\\.[A-Za-z%s0-9_-]+)+$";
198198
Pattern p = Pattern.compile(propRegex);
199199

200200
// Iterate through class member variables

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ public class ConnectionManager {
7777
* Global federated namespace context for router.
7878
*/
7979
private final RouterStateIdContext routerStateIdContext;
80-
/**
81-
* Map from connection pool ID to namespace.
82-
*/
83-
private final Map<ConnectionPoolId, String> connectionPoolToNamespaceMap;
8480
/** Max size of queue for creating new connections. */
8581
private final int creatorQueueMaxSize;
8682

@@ -105,7 +101,6 @@ public ConnectionManager(Configuration config) {
105101
public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) {
106102
this.conf = config;
107103
this.routerStateIdContext = routerStateIdContext;
108-
this.connectionPoolToNamespaceMap = new HashMap<>();
109104
// Configure minimum, maximum and active connection pools
110105
this.maxSize = this.conf.getInt(
111106
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
@@ -172,10 +167,6 @@ public void close() {
172167
pool.close();
173168
}
174169
this.pools.clear();
175-
for (String nsID: connectionPoolToNamespaceMap.values()) {
176-
routerStateIdContext.removeNamespaceStateId(nsID);
177-
}
178-
connectionPoolToNamespaceMap.clear();
179170
} finally {
180171
writeLock.unlock();
181172
}
@@ -224,7 +215,6 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
224215
this.minActiveRatio, protocol,
225216
new PoolAlignmentContext(this.routerStateIdContext, nsId));
226217
this.pools.put(connectionId, pool);
227-
this.connectionPoolToNamespaceMap.put(connectionId, nsId);
228218
}
229219
} finally {
230220
writeLock.unlock();
@@ -451,11 +441,6 @@ public void run() {
451441
try {
452442
for (ConnectionPoolId poolId : toRemove) {
453443
pools.remove(poolId);
454-
String nsID = connectionPoolToNamespaceMap.get(poolId);
455-
connectionPoolToNamespaceMap.remove(poolId);
456-
if (!connectionPoolToNamespaceMap.values().contains(nsID)) {
457-
routerStateIdContext.removeNamespaceStateId(nsID);
458-
}
459444
}
460445
} finally {
461446
writeLock.unlock();

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.concurrent.Executors;
5454
import java.util.concurrent.ThreadFactory;
5555
import java.util.concurrent.TimeUnit;
56+
import java.util.stream.Collectors;
5657

5758
import org.apache.hadoop.fs.Path;
5859
import org.apache.hadoop.hdfs.HAUtil;
@@ -413,9 +414,38 @@ public RouterRpcServer(Configuration conf, Router router,
413414
.forEach(this.dnCache::refresh),
414415
0,
415416
dnCacheExpire, TimeUnit.MILLISECONDS);
417+
418+
Executors
419+
.newSingleThreadScheduledExecutor()
420+
.scheduleWithFixedDelay(this::clearStaleNamespacesInRouterStateIdContext,
421+
0,
422+
conf.getLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
423+
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT),
424+
TimeUnit.MILLISECONDS);
425+
416426
initRouterFedRename();
417427
}
418428

429+
/**
430+
* Clear expired namespace in the shared RouterStateIdContext.
431+
*/
432+
private void clearStaleNamespacesInRouterStateIdContext() {
433+
try {
434+
final Set<String> resolvedNamespaces = namenodeResolver.getNamespaces()
435+
.stream()
436+
.map(FederationNamespaceInfo::getNameserviceId)
437+
.collect(Collectors.toSet());
438+
439+
routerStateIdContext.getNamespaces().forEach(namespace -> {
440+
if (!resolvedNamespaces.contains(namespace)) {
441+
routerStateIdContext.removeNamespaceStateId(namespace);
442+
}
443+
});
444+
} catch (IOException e) {
445+
LOG.warn("Could not fetch current list of namespaces.", e);
446+
}
447+
}
448+
419449
/**
420450
* Init the router federation rename environment. Each router has its own
421451
* journal path.

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collections;
2323
import java.util.HashSet;
2424

25+
import java.util.List;
2526
import java.util.Map;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.atomic.LongAccumulator;
@@ -92,6 +93,10 @@ public LongAccumulator getNamespaceStateId(String nsId) {
9293
return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
9394
}
9495

96+
public List<String> getNamespaces() {
97+
return Collections.list(namespaceIdMap.keys());
98+
}
99+
95100
public void removeNamespaceStateId(String nsId) {
96101
namespaceIdMap.remove(nsId);
97102
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public interface StateStoreRecordOperations {
5656
* @param clazz Class of record to fetch.
5757
* @param query Query to filter results.
5858
* @return A single record matching the query. Null if there are no matching
59-
* records or more than one matching record in the store.
59+
* records.
6060
* @throws IOException If multiple records match or if the data store cannot
6161
* be queried.
6262
*/

0 commit comments

Comments
 (0)