Skip to content

Commit 48f9011

Browse files
committed
YARN-10348. Allow RM to always cancel tokens after app completes. Contributed by
Jim Brennan
1 parent e62d8f8 commit 48f9011

File tree

4 files changed

+93
-3
lines changed

4 files changed

+93
-3
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,9 @@ public static boolean isAclEnabled(Configuration conf) {
743743
RM_PREFIX + "delegation-token.max-conf-size-bytes";
744744
public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
745745
12800;
746+
public static final String RM_DELEGATION_TOKEN_ALWAYS_CANCEL =
747+
RM_PREFIX + "delegation-token.always-cancel";
748+
public static final boolean DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL = false;
746749

747750
public static final String RM_DT_RENEWER_THREAD_TIMEOUT =
748751
RM_PREFIX + "delegation-token-renewer.thread-timeout";

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,16 @@
804804
<value>12800</value>
805805
</property>
806806

807+
<property>
808+
<description>If true, ResourceManager will always try to cancel delegation
809+
tokens after the application completes, even if the client sets
810+
shouldCancelAtEnd false. References to delegation tokens are tracked,
811+
so they will not be canceled until all sub-tasks are done using them.
812+
</description>
813+
<name>yarn.resourcemanager.delegation-token.always-cancel</name>
814+
<value>false</value>
815+
</property>
816+
807817
<property>
808818
<description>If true, ResourceManager will have proxy-user privileges.
809819
Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public class DelegationTokenRenewer extends AbstractService {
115115
private volatile boolean isServiceStarted;
116116
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
117117

118+
private boolean alwaysCancelDelegationTokens;
118119
private boolean tokenKeepAliveEnabled;
119120
private boolean hasProxyUserPrivileges;
120121
private long credentialsValidTimeRemaining;
@@ -137,6 +138,9 @@ public DelegationTokenRenewer() {
137138

138139
@Override
139140
protected void serviceInit(Configuration conf) throws Exception {
141+
this.alwaysCancelDelegationTokens =
142+
conf.getBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
143+
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_ALWAYS_CANCEL);
140144
this.hasProxyUserPrivileges =
141145
conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
142146
YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
@@ -268,7 +272,7 @@ protected void serviceStop() {
268272
*
269273
*/
270274
@VisibleForTesting
271-
protected static class DelegationTokenToRenew {
275+
protected class DelegationTokenToRenew {
272276
public final Token<?> token;
273277
public final Collection<ApplicationId> referringAppIds;
274278
public final Configuration conf;
@@ -298,7 +302,7 @@ public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
298302
this.conf = conf;
299303
this.expirationDate = expirationDate;
300304
this.timerTask = null;
301-
this.shouldCancelAtEnd = shouldCancelAtEnd;
305+
this.shouldCancelAtEnd = shouldCancelAtEnd | alwaysCancelDelegationTokens;
302306
}
303307

304308
public void setTimerTask(RenewalTimerTask tTask) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ public void setUp() throws Exception {
217217
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
218218
"kerberos");
219219
conf.set("override_token_expire_time", "3000");
220+
conf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
221+
false);
220222
UserGroupInformation.setConfiguration(conf);
221223
eventQueue = new LinkedBlockingQueue<Event>();
222224
dispatcher = new AsyncDispatcher(eventQueue);
@@ -608,6 +610,77 @@ public void testDTRenewalWithNoCancel () throws Exception {
608610
token1.renew(conf);
609611
}
610612

613+
/**
614+
* Basic idea of the test:
615+
* 1. Verify that YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL = true
616+
* overrides shouldCancelAtEnd
617+
* 2. register a token for 2 seconds with shouldCancelAtEnd = false
618+
* 3. cancel it immediately
619+
* 4. check that token was canceled
620+
* @throws IOException
621+
* @throws URISyntaxException
622+
*/
623+
@Test(timeout=60000)
624+
public void testDTRenewalWithNoCancelAlwaysCancel() throws Exception {
625+
Configuration lconf = new Configuration(conf);
626+
lconf.setBoolean(YarnConfiguration.RM_DELEGATION_TOKEN_ALWAYS_CANCEL,
627+
true);
628+
629+
DelegationTokenRenewer localDtr =
630+
createNewDelegationTokenRenewer(lconf, counter);
631+
RMContext mockContext = mock(RMContext.class);
632+
when(mockContext.getSystemCredentialsForApps()).thenReturn(
633+
new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
634+
ClientRMService mockClientRMService = mock(ClientRMService.class);
635+
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
636+
when(mockContext.getDelegationTokenRenewer()).thenReturn(
637+
localDtr);
638+
when(mockContext.getDispatcher()).thenReturn(dispatcher);
639+
InetSocketAddress sockAddr =
640+
InetSocketAddress.createUnresolved("localhost", 1234);
641+
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
642+
localDtr.setDelegationTokenRenewerPoolTracker(false);
643+
localDtr.setRMContext(mockContext);
644+
localDtr.init(lconf);
645+
localDtr.start();
646+
647+
MyFS dfs = (MyFS)FileSystem.get(lconf);
648+
LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
649+
650+
Credentials ts = new Credentials();
651+
MyToken token1 = dfs.getDelegationToken("user1");
652+
653+
//to cause this one to be set for renew in 2 secs
654+
Renewer.tokenToRenewIn2Sec = token1;
655+
LOG.info("token="+token1+" should be renewed for 2 secs");
656+
657+
String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
658+
ts.addToken(new Text(nn1), token1);
659+
660+
ApplicationId applicationId = BuilderUtils.newApplicationId(0, 1);
661+
localDtr.addApplicationAsync(applicationId, ts, false, "user",
662+
new Configuration());
663+
waitForEventsToGetProcessed(localDtr);
664+
localDtr.applicationFinished(applicationId);
665+
waitForEventsToGetProcessed(localDtr);
666+
667+
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
668+
try {
669+
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
670+
} catch (InterruptedException e) {}
671+
LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
672+
673+
// counter and the token should still be the old ones
674+
assertEquals("renew wasn't called as many times as expected",
675+
numberOfExpectedRenewals, Renewer.counter);
676+
677+
// The token should have been cancelled at this point. Renewal will fail.
678+
try {
679+
token1.renew(lconf);
680+
fail("Renewal of cancelled token should have failed");
681+
} catch (InvalidToken ite) {}
682+
}
683+
611684
/**
612685
* Basic idea of the test:
613686
* 0. Setup token KEEP_ALIVE
@@ -1616,7 +1689,7 @@ protected Token<?>[] obtainSystemTokensForUser(String user,
16161689
// Ensure incrTokenSequenceNo has been called for new token request
16171690
Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo();
16181691

1619-
DelegationTokenToRenew dttr = new DelegationTokenToRenew(appIds,
1692+
DelegationTokenToRenew dttr = dtr.new DelegationTokenToRenew(appIds,
16201693
expectedToken, conf, 1000, false, "user1");
16211694

16221695
dtr.requestNewHdfsDelegationTokenIfNeeded(dttr);

0 commit comments

Comments
 (0)