Skip to content

Commit e570e75

Browse files
slfan1989slfan1989
authored andcommitted
YARN-11158. Support (Create/Renew/Cancel) DelegationToken API's for Federation. (apache#5104)
1 parent f531c6b commit e570e75

File tree

6 files changed

+468
-14
lines changed

6 files changed

+468
-14
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ public final class RouterMetrics {
127127
private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
128128
@Metric("# of checkUserAccessToQueue failed to be retrieved")
129129
private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
130+
@Metric("# of getDelegationToken failed to be retrieved")
131+
private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
132+
@Metric("# of renewDelegationToken failed to be retrieved")
133+
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
134+
@Metric("# of renewDelegationToken failed to be retrieved")
135+
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
130136

131137
// Aggregate metrics are shared, and don't have to be looked up per call
132138
@Metric("Total number of successful Submitted apps and latency(ms)")
@@ -215,6 +221,12 @@ public final class RouterMetrics {
215221
private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
216222
@Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
217223
private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
224+
@Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)")
225+
private MutableRate totalSucceededGetDelegationTokenRetrieved;
226+
@Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)")
227+
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
228+
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
229+
private MutableRate totalSucceededCancelDelegationTokenRetrieved;
218230

219231
/**
220232
* Provide quantile counters for all latencies.
@@ -262,6 +274,9 @@ public final class RouterMetrics {
262274
private MutableQuantiles getRefreshQueuesLatency;
263275
private MutableQuantiles getRMNodeLabelsLatency;
264276
private MutableQuantiles checkUserAccessToQueueLatency;
277+
private MutableQuantiles getDelegationTokenLatency;
278+
private MutableQuantiles renewDelegationTokenLatency;
279+
private MutableQuantiles cancelDelegationTokenLatency;
265280

266281
private static volatile RouterMetrics instance = null;
267282
private static MetricsRegistry registry;
@@ -423,6 +438,15 @@ private RouterMetrics() {
423438

424439
checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency",
425440
"latency of get apptimeouts timeouts", "ops", "latency", 10);
441+
442+
getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency",
443+
"latency of get delegation token timeouts", "ops", "latency", 10);
444+
445+
renewDelegationTokenLatency = registry.newQuantiles("renewDelegationTokenLatency",
446+
"latency of renew delegation token timeouts", "ops", "latency", 10);
447+
448+
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
449+
"latency of cancel delegation token timeouts", "ops", "latency", 10);
426450
}
427451

428452
public static RouterMetrics getMetrics() {
@@ -655,10 +679,25 @@ public long getNumSucceededGetRMNodeLabelsRetrieved() {
655679
}
656680

657681
@VisibleForTesting
658-
public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() {
682+
public long getNumSucceededCheckUserAccessToQueueRetrieved() {
659683
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples();
660684
}
661685

686+
@VisibleForTesting
687+
public long getNumSucceededGetDelegationTokenRetrieved() {
688+
return totalSucceededGetDelegationTokenRetrieved.lastStat().numSamples();
689+
}
690+
691+
@VisibleForTesting
692+
public long getNumSucceededRenewDelegationTokenRetrieved() {
693+
return totalSucceededRenewDelegationTokenRetrieved.lastStat().numSamples();
694+
}
695+
696+
@VisibleForTesting
697+
public long getNumSucceededCancelDelegationTokenRetrieved() {
698+
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
699+
}
700+
662701
@VisibleForTesting
663702
public double getLatencySucceededAppsCreated() {
664703
return totalSucceededAppsCreated.lastStat().mean();
@@ -874,6 +913,21 @@ public double getLatencySucceededCheckUserAccessToQueueRetrieved() {
874913
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean();
875914
}
876915

916+
@VisibleForTesting
917+
public double getLatencySucceededGetDelegationTokenRetrieved() {
918+
return totalSucceededGetDelegationTokenRetrieved.lastStat().mean();
919+
}
920+
921+
@VisibleForTesting
922+
public double getLatencySucceededRenewDelegationTokenRetrieved() {
923+
return totalSucceededRenewDelegationTokenRetrieved.lastStat().mean();
924+
}
925+
926+
@VisibleForTesting
927+
public double getLatencySucceededCancelDelegationTokenRetrieved() {
928+
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
929+
}
930+
877931
@VisibleForTesting
878932
public int getAppsFailedCreated() {
879933
return numAppsFailedCreated.value();
@@ -1068,6 +1122,18 @@ public int getCheckUserAccessToQueueFailedRetrieved() {
10681122
return numCheckUserAccessToQueueFailedRetrieved.value();
10691123
}
10701124

1125+
public int getDelegationTokenFailedRetrieved() {
1126+
return numGetDelegationTokenFailedRetrieved.value();
1127+
}
1128+
1129+
public int getRenewDelegationTokenFailedRetrieved() {
1130+
return numRenewDelegationTokenFailedRetrieved.value();
1131+
}
1132+
1133+
public int getCancelDelegationTokenFailedRetrieved() {
1134+
return numCancelDelegationTokenFailedRetrieved.value();
1135+
}
1136+
10711137
public void succeededAppsCreated(long duration) {
10721138
totalSucceededAppsCreated.add(duration);
10731139
getNewApplicationLatency.add(duration);
@@ -1283,6 +1349,21 @@ public void succeededCheckUserAccessToQueueRetrieved(long duration) {
12831349
checkUserAccessToQueueLatency.add(duration);
12841350
}
12851351

1352+
public void succeededGetDelegationTokenRetrieved(long duration) {
1353+
totalSucceededGetDelegationTokenRetrieved.add(duration);
1354+
getDelegationTokenLatency.add(duration);
1355+
}
1356+
1357+
public void succeededRenewDelegationTokenRetrieved(long duration) {
1358+
totalSucceededRenewDelegationTokenRetrieved.add(duration);
1359+
renewDelegationTokenLatency.add(duration);
1360+
}
1361+
1362+
public void succeededCancelDelegationTokenRetrieved(long duration) {
1363+
totalSucceededCancelDelegationTokenRetrieved.add(duration);
1364+
cancelDelegationTokenLatency.add(duration);
1365+
}
1366+
12861367
public void incrAppsFailedCreated() {
12871368
numAppsFailedCreated.incr();
12881369
}
@@ -1454,4 +1535,16 @@ public void incrGetRMNodeLabelsFailedRetrieved() {
14541535
public void incrCheckUserAccessToQueueFailedRetrieved() {
14551536
numCheckUserAccessToQueueFailedRetrieved.incr();
14561537
}
1457-
}
1538+
1539+
public void incrGetDelegationTokenFailedRetrieved() {
1540+
numGetDelegationTokenFailedRetrieved.incr();
1541+
}
1542+
1543+
public void incrRenewDelegationTokenFailedRetrieved() {
1544+
numRenewDelegationTokenFailedRetrieved.incr();
1545+
}
1546+
1547+
public void incrCancelDelegationTokenFailedRetrieved() {
1548+
numCancelDelegationTokenFailedRetrieved.incr();
1549+
}
1550+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import org.apache.hadoop.classification.InterfaceStability.Unstable;
2525
import org.apache.hadoop.conf.Configuration;
2626
import org.apache.hadoop.security.UserGroupInformation;
27+
import org.apache.hadoop.security.token.Token;
2728
import org.apache.hadoop.util.ReflectionUtils;
2829
import org.apache.hadoop.util.StringUtils;
2930
import org.apache.hadoop.yarn.exceptions.YarnException;
3031
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
32+
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

@@ -36,6 +38,7 @@
3638
import java.util.ArrayList;
3739
import java.util.Collection;
3840
import java.util.List;
41+
import java.util.EnumSet;
3942
import java.io.IOException;
4043

4144
/**
@@ -470,6 +473,27 @@ public static void validateContainerId(String containerId)
470473
}
471474
}
472475

476+
public static boolean isAllowedDelegationTokenOp() throws IOException {
477+
if (UserGroupInformation.isSecurityEnabled()) {
478+
return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS,
479+
UserGroupInformation.AuthenticationMethod.KERBEROS_SSL,
480+
UserGroupInformation.AuthenticationMethod.CERTIFICATE)
481+
.contains(UserGroupInformation.getCurrentUser()
482+
.getRealAuthenticationMethod());
483+
} else {
484+
return true;
485+
}
486+
}
487+
488+
public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
489+
throws IOException {
490+
UserGroupInformation user = UserGroupInformation.getCurrentUser();
491+
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
492+
// we can always renew our own tokens
493+
return loginUser.getUserName().equals(user.getUserName())
494+
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
495+
}
496+
473497
public static UserGroupInformation setupUser(final String userName) {
474498
UserGroupInformation user = null;
475499
try {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.commons.lang3.StringUtils;
2222
import org.apache.commons.lang3.tuple.Pair;
23+
import org.apache.hadoop.io.Text;
2324
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
2425
import java.io.IOException;
2526
import java.lang.reflect.Method;
@@ -40,7 +41,6 @@
4041
import java.util.concurrent.ThreadFactory;
4142
import java.util.concurrent.ThreadPoolExecutor;
4243
import java.util.concurrent.TimeUnit;
43-
import org.apache.commons.lang3.NotImplementedException;
4444
import org.apache.hadoop.conf.Configuration;
4545
import org.apache.hadoop.fs.CommonConfigurationKeys;
4646
import org.apache.hadoop.security.UserGroupInformation;
@@ -118,9 +118,13 @@
118118
import org.apache.hadoop.yarn.api.records.ApplicationId;
119119
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
120120
import org.apache.hadoop.yarn.api.records.ReservationId;
121+
import org.apache.hadoop.security.token.Token;
122+
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
123+
121124
import org.apache.hadoop.yarn.conf.YarnConfiguration;
122125
import org.apache.hadoop.yarn.exceptions.YarnException;
123126
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
127+
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
124128
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
125129
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
126130
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
@@ -136,6 +140,7 @@
136140
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
137141
import org.apache.hadoop.yarn.util.Clock;
138142
import org.apache.hadoop.yarn.util.MonotonicClock;
143+
import org.apache.hadoop.yarn.util.Records;
139144
import org.slf4j.Logger;
140145
import org.slf4j.LoggerFactory;
141146

@@ -1392,19 +1397,103 @@ public GetContainersResponse getContainers(GetContainersRequest request)
13921397
@Override
13931398
public GetDelegationTokenResponse getDelegationToken(
13941399
GetDelegationTokenRequest request) throws YarnException, IOException {
1395-
throw new NotImplementedException("Code is not implemented");
1400+
1401+
if (request == null || request.getRenewer() == null) {
1402+
routerMetrics.incrGetDelegationTokenFailedRetrieved();
1403+
RouterServerUtil.logAndThrowException(
1404+
"Missing getDelegationToken request or Renewer.", null);
1405+
}
1406+
1407+
try {
1408+
// Verify that the connection is kerberos authenticated
1409+
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
1410+
routerMetrics.incrGetDelegationTokenFailedRetrieved();
1411+
throw new IOException(
1412+
"Delegation Token can be issued only with kerberos authentication.");
1413+
}
1414+
1415+
long startTime = clock.getTime();
1416+
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
1417+
Text owner = new Text(ugi.getUserName());
1418+
Text realUser = null;
1419+
if (ugi.getRealUser() != null) {
1420+
realUser = new Text(ugi.getRealUser().getUserName());
1421+
}
1422+
1423+
RMDelegationTokenIdentifier tokenIdentifier =
1424+
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser);
1425+
Token<RMDelegationTokenIdentifier> realRMDToken =
1426+
new Token<>(tokenIdentifier, this.getTokenSecretManager());
1427+
1428+
org.apache.hadoop.yarn.api.records.Token routerRMDTToken =
1429+
BuilderUtils.newDelegationToken(realRMDToken.getIdentifier(),
1430+
realRMDToken.getKind().toString(),
1431+
realRMDToken.getPassword(), realRMDToken.getService().toString());
1432+
1433+
long stopTime = clock.getTime();
1434+
routerMetrics.succeededGetDelegationTokenRetrieved((stopTime - startTime));
1435+
return GetDelegationTokenResponse.newInstance(routerRMDTToken);
1436+
} catch(IOException e) {
1437+
routerMetrics.incrGetDelegationTokenFailedRetrieved();
1438+
throw new YarnException(e);
1439+
}
13961440
}
13971441

13981442
@Override
13991443
public RenewDelegationTokenResponse renewDelegationToken(
14001444
RenewDelegationTokenRequest request) throws YarnException, IOException {
1401-
throw new NotImplementedException("Code is not implemented");
1445+
try {
1446+
1447+
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
1448+
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
1449+
throw new IOException(
1450+
"Delegation Token can be renewed only with kerberos authentication");
1451+
}
1452+
1453+
long startTime = clock.getTime();
1454+
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
1455+
Token<RMDelegationTokenIdentifier> token = new Token<>(
1456+
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
1457+
new Text(protoToken.getKind()), new Text(protoToken.getService()));
1458+
String user = RouterServerUtil.getRenewerForToken(token);
1459+
long nextExpTime = this.getTokenSecretManager().renewToken(token, user);
1460+
RenewDelegationTokenResponse renewResponse =
1461+
Records.newRecord(RenewDelegationTokenResponse.class);
1462+
renewResponse.setNextExpirationTime(nextExpTime);
1463+
long stopTime = clock.getTime();
1464+
routerMetrics.succeededRenewDelegationTokenRetrieved((stopTime - startTime));
1465+
return renewResponse;
1466+
1467+
} catch (IOException e) {
1468+
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
1469+
throw new YarnException(e);
1470+
}
14021471
}
14031472

14041473
@Override
14051474
public CancelDelegationTokenResponse cancelDelegationToken(
14061475
CancelDelegationTokenRequest request) throws YarnException, IOException {
1407-
throw new NotImplementedException("Code is not implemented");
1476+
try {
1477+
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
1478+
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
1479+
throw new IOException(
1480+
"Delegation Token can be cancelled only with kerberos authentication");
1481+
}
1482+
1483+
long startTime = clock.getTime();
1484+
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
1485+
Token<RMDelegationTokenIdentifier> token = new Token<>(
1486+
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
1487+
new Text(protoToken.getKind()), new Text(protoToken.getService()));
1488+
String user = UserGroupInformation.getCurrentUser().getUserName();
1489+
this.getTokenSecretManager().cancelToken(token, user);
1490+
long stopTime = clock.getTime();
1491+
routerMetrics.succeededCancelDelegationTokenRetrieved((stopTime - startTime));
1492+
return Records.newRecord(CancelDelegationTokenResponse.class);
1493+
} catch (IOException e) {
1494+
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
1495+
throw new YarnException(e);
1496+
}
14081497
}
14091498

14101499
@Override

0 commit comments

Comments
 (0)