Skip to content

Commit 8075f9c

Browse files
committed
HADOOP-16298 - Manage/Renew delegation tokens for externally scheduled jobs
1 parent 56d8071 commit 8075f9c

File tree

5 files changed

+307
-12
lines changed

5 files changed

+307
-12
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
5151
import org.apache.hadoop.security.SecurityUtil;
5252
import org.apache.hadoop.security.UserGroupInformation;
53+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
5354
import org.apache.hadoop.util.ProtoUtil;
5455
import org.apache.hadoop.util.StringUtils;
5556
import org.apache.hadoop.util.Time;
@@ -613,6 +614,10 @@ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
613614
return false;
614615
}
615616

617+
private synchronized boolean shouldAuthenticateUsingDelegationTokens() throws IOException {
618+
return UserGroupInformation.getCurrentUser().isFromDelegationToken();
619+
}
620+
616621
private synchronized AuthMethod setupSaslConnection(IpcStreams streams)
617622
throws IOException {
618623
// Do not use Client.conf here! We must use ConnectionId.conf, since the
@@ -691,7 +696,7 @@ private synchronized void setupConnection(
691696
InetSocketAddress bindAddr = null;
692697
if (ticket != null && ticket.hasKerberosCredentials()) {
693698
KerberosInfo krbInfo =
694-
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
699+
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
695700
if (krbInfo != null) {
696701
String principal = ticket.getUserName();
697702
String host = SecurityUtil.getHostFromPrincipal(principal);
@@ -755,7 +760,7 @@ public Object run() throws IOException, InterruptedException {
755760
final short MAX_BACKOFF = 5000;
756761
closeConnection();
757762
disposeSasl();
758-
if (shouldAuthenticateOverKrb()) {
763+
if (shouldAuthenticateOverKrb() || shouldAuthenticateUsingDelegationTokens()) {
759764
if (currRetries < maxRetries) {
760765
if(LOG.isDebugEnabled()) {
761766
LOG.debug("Exception encountered while connecting to "
@@ -766,6 +771,19 @@ public Object run() throws IOException, InterruptedException {
766771
UserGroupInformation.getLoginUser().reloginFromKeytab();
767772
} else if (UserGroupInformation.isLoginTicketBased()) {
768773
UserGroupInformation.getLoginUser().reloginFromTicketCache();
774+
} else if (shouldAuthenticateUsingDelegationTokens()) {
775+
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
776+
for (AbstractDelegationTokenIdentifier delegationToken:
777+
currUser.getAllDelegationTokens(currUser.getCredentials())){
778+
LOG.debug("Delegation token for current user after SASL failure " +
779+
"and before refresh ugi is {}", delegationToken.toString());
780+
}
781+
currUser.reloginFromDelegationTokens();
782+
for (AbstractDelegationTokenIdentifier delegationToken:
783+
currUser.getAllDelegationTokens(currUser.getCredentials())){
784+
LOG.debug("Delegation token for current user after SASL failure " +
785+
"and after refresh ugi is {}", delegationToken.toString());
786+
}
769787
}
770788
// have granularity of milliseconds
771789
//we are sleeping with the Connection lock held but since this
@@ -1609,6 +1627,26 @@ private Writable getRpcResponse(final Call call, final Connection connection,
16091627

16101628
if (call.error != null) {
16111629
if (call.error instanceof RemoteException) {
1630+
//We got a delegation token expired error and we want to retry to refresh it
1631+
//Since the delegation token's can be externally managed we want the fail
1632+
//call to be ignored and retried
1633+
Exception unwrapped = ((RemoteException)call.error).unwrapRemoteException(
1634+
org.apache.hadoop.security.token.SecretManager.InvalidToken.class);
1635+
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
1636+
if(unwrapped instanceof org.apache.hadoop.security.token.SecretManager.InvalidToken &&
1637+
currUser.isFromDelegationToken()) {
1638+
for (AbstractDelegationTokenIdentifier delegationToken:
1639+
currUser.getAllDelegationTokens(currUser.getCredentials())){
1640+
LOG.debug("Delegation Token before refresh is {}", delegationToken.getTrackingId());
1641+
}
1642+
currUser.reloginFromDelegationTokens();
1643+
call.error = new RetriableException(unwrapped);
1644+
for (AbstractDelegationTokenIdentifier delegationToken:
1645+
currUser.getAllDelegationTokens(currUser.getCredentials())){
1646+
LOG.debug("Delegation Token after refresh is {} {}", delegationToken.getTrackingId(),
1647+
delegationToken.toString());
1648+
}
1649+
}
16121650
call.error.fillInStackTrace();
16131651
throw call.error;
16141652
} else { // local exception

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,8 @@ public Map<Text, byte[]> getSecretKeyMap() {
213213
* @param conf
214214
* @throws IOException
215215
*/
216-
public static Credentials readTokenStorageFile(Path filename,
217-
Configuration conf)
218-
throws IOException {
216+
public static Credentials readTokenStorageFile(Path filename, Configuration conf)
217+
throws IOException {
219218
FSDataInputStream in = null;
220219
Credentials credentials = new Credentials();
221220
try {
@@ -461,4 +460,21 @@ private void addAll(Credentials other, boolean overwrite) {
461460
}
462461
}
463462
}
463+
464+
/**
465+
* Update the token map to synchronize between HA pair servers
466+
*/
467+
public void synchTokens(Token<? extends TokenIdentifier> token) {
468+
for(Map.Entry<Text, Token<?>> entry: tokenMap.entrySet()){
469+
LOG.debug("synching token.to_s");
470+
tokenMap.forEach((key, value) -> LOG.debug("Before: " + key + ":" + value));
471+
if (entry.getValue().getKind().equals(token.getKind())){
472+
LOG.debug("matched " + entry.getValue().getKind());
473+
Token<? extends TokenIdentifier> clone = new Token<>(token.getIdentifier(),
474+
token.getPassword(), token.getKind(), entry.getValue().getService());
475+
tokenMap.put(entry.getKey(), clone);
476+
}
477+
tokenMap.forEach((key, value) -> LOG.debug("After: " + key + ":" + value));
478+
}
479+
}
464480
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.security;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
public class DelegationTokenUtil {
29+
public static final String HADOOP_TOKEN_FILE_LOCATION =
30+
"HADOOP_TOKEN_FILE_LOCATION";
31+
32+
static final Logger LOG = LoggerFactory.getLogger(
33+
DelegationTokenUtil.class);
34+
35+
private DelegationTokenUtil() {
36+
}
37+
38+
public static synchronized Credentials readDelegationTokens(Configuration conf)
39+
throws IOException {
40+
String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
41+
if (fileLocation != null) {
42+
// Load the token storage file and put all of the tokens into the
43+
// user. Don't use the FileSystem API for reading since it has a lock
44+
// cycle (HADOOP-9212).
45+
File source = new File(fileLocation);
46+
Credentials creds = Credentials.readTokenStorageFile(
47+
source, conf);
48+
LOG.info("Loaded {} tokens", creds.numberOfTokens());
49+
return creds;
50+
}
51+
return null;
52+
}
53+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.hadoop.security.authentication.util.KerberosUtil;
8888
import org.apache.hadoop.security.token.Token;
8989
import org.apache.hadoop.security.token.TokenIdentifier;
90+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
9091
import org.apache.hadoop.util.Shell;
9192
import org.apache.hadoop.util.Time;
9293

@@ -186,12 +187,25 @@ private <T extends Principal> T getCanonicalUser(Class<T> cls) {
186187
return null;
187188
}
188189

190+
private void addDelegationTokensToSubject() throws LoginException {
191+
try {
192+
Credentials creds = DelegationTokenUtil.readDelegationTokens(conf);
193+
if (creds != null) {
194+
subject.getPrivateCredentials().add(creds);
195+
}
196+
} catch (IOException e) {
197+
throw new LoginException("Failed to load token file from " +
198+
HADOOP_TOKEN_FILE_LOCATION);
199+
}
200+
}
201+
189202
@Override
190203
public boolean commit() throws LoginException {
191204
LOG.debug("hadoop login commit");
192205
// if we already have a user, we are done.
193206
if (!subject.getPrincipals(User.class).isEmpty()) {
194207
LOG.debug("Using existing subject: {}", subject.getPrincipals());
208+
addDelegationTokensToSubject();
195209
return true;
196210
}
197211
Principal user = getCanonicalUser(KerberosPrincipal.class);
@@ -229,6 +243,7 @@ public boolean commit() throws LoginException {
229243
LOG.debug("User entry: \"{}\"", userEntry);
230244

231245
subject.getPrincipals().add(userEntry);
246+
addDelegationTokensToSubject();
232247
return true;
233248
}
234249
throw new LoginException("Failed to find user in name " + subject);
@@ -740,11 +755,12 @@ UserGroupInformation createLoginUser(Subject subject) throws IOException {
740755
LOG.debug("Reading credentials from location {}",
741756
tokenFile.getCanonicalPath());
742757
if (tokenFile.exists() && tokenFile.isFile()) {
743-
Credentials cred = Credentials.readTokenStorageFile(
744-
tokenFile, conf);
745-
LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),
746-
tokenFile.getCanonicalPath());
747-
loginUser.addCredentials(cred);
758+
Credentials cred = DelegationTokenUtil.readDelegationTokens(conf);
759+
if (cred != null ) {
760+
LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),
761+
tokenFile.getCanonicalPath());
762+
loginUser.addCredentials(cred);
763+
}
748764
} else {
749765
LOG.info("Token file {} does not exist",
750766
tokenFile.getCanonicalPath());
@@ -851,6 +867,59 @@ private long getRefreshTime(KerberosTicket tgt) {
851867
return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
852868
}
853869

870+
/**
871+
* Re-Login a user in from delegation tokens
872+
* method assumes that login had happened already.
873+
* The Subject field of this UserGroupInformation object is updated to have
874+
* the new credentials.
875+
* @throws IOException
876+
* @throws IOException on a failure
877+
*/
878+
@InterfaceAudience.Public
879+
@InterfaceStability.Evolving
880+
public synchronized void reloginFromDelegationTokens() throws IOException {
881+
882+
if (!isFromDelegationToken()) {
883+
throw new IOException("User has not logged on using delegation token");
884+
}
885+
886+
synchronized(UserGroupInformation.class){
887+
Credentials cred = DelegationTokenUtil.readDelegationTokens(conf);
888+
if (cred != null ) {
889+
addCredentials(cred);
890+
}
891+
892+
for (Token<? extends TokenIdentifier> token: cred.getAllTokens()) {
893+
for ( Credentials currentCreds : subject.getPrivateCredentials(Credentials.class)) {
894+
currentCreds.synchTokens(token);
895+
}
896+
}
897+
}
898+
}
899+
900+
public boolean isFromDelegationToken () {
901+
return !isFromKeytab() && getTGT() == null && !subject.getPrivateCredentials(Credentials.class).isEmpty();
902+
}
903+
904+
public Collection<AbstractDelegationTokenIdentifier> getAllDelegationTokens(Credentials cred) {
905+
cred.getAllTokens();
906+
List<AbstractDelegationTokenIdentifier> delegToks = new ArrayList<>();
907+
908+
for(Token<? extends TokenIdentifier> t: getCredentials().getAllTokens()) {
909+
try {
910+
TokenIdentifier identifier = t.decodeIdentifier();
911+
if (identifier == null || !AbstractDelegationTokenIdentifier.class.isAssignableFrom(identifier.getClass())) {
912+
continue;
913+
}
914+
delegToks.add((AbstractDelegationTokenIdentifier) identifier);
915+
} catch (IOException e) {
916+
LOG.warn("Error decoding token identifier of kind " + t.getKind(), e);
917+
}
918+
}
919+
return delegToks;
920+
}
921+
922+
854923
@InterfaceAudience.Private
855924
@InterfaceStability.Unstable
856925
public boolean shouldRelogin() {

0 commit comments

Comments
 (0)