Skip to content

Commit 567600f

Browse files
committed
YARN-10425. Replace the legacy placement engine in CS with the new one. Contributed by Gergely Pollak.
1 parent cd0490e commit 567600f

File tree

18 files changed

+227
-913
lines changed

18 files changed

+227
-913
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -864,9 +864,9 @@ ApplicationPlacementContext placeApplication(
864864
if (placementManager != null) {
865865
try {
866866
String usernameUsedForPlacement =
867-
getUserNameForPlacement(user, context, placementManager);
867+
getUserNameForPlacement(user, context, placementManager);
868868
placementContext = placementManager
869-
.placeApplication(context, usernameUsedForPlacement);
869+
.placeApplication(context, usernameUsedForPlacement, isRecovery);
870870
} catch (YarnException e) {
871871
// Placement could also fail if the user doesn't exist in system
872872
// skip if the user is not found during recovery.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java

Lines changed: 0 additions & 204 deletions
This file was deleted.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,11 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException {
125125
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
126126

127127
if (groups == null) {
128-
groups = Groups.getUserToGroupsMappingService(conf);
128+
//We cannot use Groups#getUserToGroupsMappingService here, because when
129+
//tests change the HADOOP_SECURITY_GROUP_MAPPING, Groups won't refresh its
130+
//cached instance of groups, so we might get a Group instance which
131+
//ignores the HADOOP_SECURITY_GROUP_MAPPING settings.
132+
groups = new Groups(conf);
129133
}
130134

131135
MappingRuleValidationContext validationContext = buildValidationContext();
@@ -145,8 +149,8 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException {
145149
}
146150

147151
LOG.info("Initialized queue mappings, can override user specified " +
148-
"queues: {} number of rules: {}", overrideWithQueueMappings,
149-
mappingRules.size());
152+
"queues: {} number of rules: {} mapping rules: {}",
153+
overrideWithQueueMappings, mappingRules.size(), mappingRules);
150154

151155
if (LOG.isDebugEnabled()) {
152156
LOG.debug("Initialized with the following mapping rules:");
@@ -170,6 +174,12 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException {
170174
*/
171175
private void setupGroupsForVariableContext(VariableContext vctx, String user)
172176
throws IOException {
177+
if (groups == null) {
178+
LOG.warn(
179+
"Group provider hasn't been set, cannot query groups for user {}",
180+
user);
181+
return;
182+
}
173183
Set<String> groupsSet = groups.getGroupsSet(user);
174184
String secondaryGroup = null;
175185
Iterator<String> it = groupsSet.iterator();
@@ -193,14 +203,18 @@ private void setupGroupsForVariableContext(VariableContext vctx, String user)
193203
}
194204

195205
private VariableContext createVariableContext(
196-
ApplicationSubmissionContext asc, String user) throws IOException {
206+
ApplicationSubmissionContext asc, String user) {
197207
VariableContext vctx = new VariableContext();
198208

199209
vctx.put("%user", user);
200210
vctx.put("%specified", asc.getQueue());
201211
vctx.put("%application", asc.getApplicationName());
202212
vctx.put("%default", "root.default");
203-
setupGroupsForVariableContext(vctx, user);
213+
try {
214+
setupGroupsForVariableContext(vctx, user);
215+
} catch (IOException e) {
216+
LOG.warn("Unable to setup groups: {}", e.getMessage());
217+
}
204218

205219
vctx.setImmutables(immutableVariables);
206220
return vctx;
@@ -338,34 +352,43 @@ private ApplicationPlacementContext createPlacementContext(String queueName) {
338352
@Override
339353
public ApplicationPlacementContext getPlacementForApp(
340354
ApplicationSubmissionContext asc, String user) throws YarnException {
355+
return getPlacementForApp(asc, user, false);
356+
}
357+
358+
@Override
359+
public ApplicationPlacementContext getPlacementForApp(
360+
ApplicationSubmissionContext asc, String user, boolean recovery)
361+
throws YarnException {
341362
//We only use the mapping rules if overrideWithQueueMappings enabled
342363
//or the application is submitted to the default queue, which effectively
343364
//means the application doesn't have any specific queue.
344365
String appQueue = asc.getQueue();
366+
LOG.debug("Looking placement for app '{}' originally submitted to queue " +
367+
"'{}', with override enabled '{}'",
368+
asc.getApplicationName(), appQueue, overrideWithQueueMappings);
345369
if (appQueue != null &&
346370
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
347371
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_FULL_NAME) &&
348-
!overrideWithQueueMappings) {
372+
!overrideWithQueueMappings &&
373+
!recovery) {
349374
LOG.info("Have no jurisdiction over application submission '{}', " +
350375
"moving to next PlacementRule engine", asc.getApplicationName());
351376
return null;
352377
}
353378

354379
VariableContext variables;
355-
try {
356-
variables = createVariableContext(asc, user);
357-
} catch (IOException e) {
358-
LOG.error("Unable to setup variable context", e);
359-
throw new YarnException(e);
360-
}
380+
variables = createVariableContext(asc, user);
361381

382+
ApplicationPlacementContext ret = null;
362383
for (MappingRule rule : mappingRules) {
363384
MappingRuleResult result = evaluateRule(rule, variables);
364385
switch (result.getResult()) {
365386
case PLACE_TO_DEFAULT:
366-
return placeToDefault(asc, variables, rule);
387+
ret = placeToDefault(asc, variables, rule);
388+
break;
367389
case PLACE:
368-
return placeToQueue(asc, rule, result);
390+
ret = placeToQueue(asc, rule, result);
391+
break;
369392
case REJECT:
370393
LOG.info("Rejecting application '{}', reason: Mapping rule '{}' " +
371394
" fallback action is set to REJECT.",
@@ -377,17 +400,42 @@ public ApplicationPlacementContext getPlacementForApp(
377400
case SKIP:
378401
//SKIP means skip to the next rule, which is the default behaviour of
379402
//the for loop, so we don't need to take any extra actions
380-
break;
403+
break;
381404
default:
382405
LOG.error("Invalid result '{}'", result);
383406
}
407+
408+
//If we already have a return value, we can return it!
409+
if (ret != null) {
410+
break;
411+
}
412+
}
413+
414+
if (ret == null) {
415+
//If no rule was applied we return null, to let the engine move onto the
416+
//next placementRule class
417+
LOG.info("No matching rule found for application '{}', moving to next " +
418+
"PlacementRule engine", asc.getApplicationName());
419+
}
420+
421+
if (recovery) {
422+
//we need this part for backwards compatibility with recovery
423+
//the legacy code checked if the placement matches the queue of the
424+
//application to be recovered, and if it did, it created an
425+
//ApplicationPlacementContext.
426+
//However at a later point this is going to be changed, there are two
427+
//major issues with this approach:
428+
// 1) The recovery only uses LEAF queue names, which must be updated
429+
// 2) The ORIGINAL queue which the application was submitted is NOT
430+
// stored this might result in different placement evaluation since
431+
// now we can have rules which give different result based on what
432+
// the user submitted.
433+
if (ret == null || !ret.getQueue().equals(asc.getQueue())) {
434+
return null;
435+
}
384436
}
385437

386-
//If no rule was applied we return null, to let the engine move onto the
387-
//next placementRule class
388-
LOG.info("No matching rule found for application '{}', moving to next " +
389-
"PlacementRule engine", asc.getApplicationName());
390-
return null;
438+
return ret;
391439
}
392440

393441
private ApplicationPlacementContext placeToQueue(
@@ -410,13 +458,13 @@ private ApplicationPlacementContext placeToDefault(
410458
String queueName = validateAndNormalizeQueue(
411459
variables.replacePathVariables("%default"), false);
412460
LOG.debug("Application '{}' have been placed to queue '{}' by " +
413-
"the fallback option of rule {}",
461+
"the fallback option of rule {}",
414462
asc.getApplicationName(), queueName, rule);
415463
return createPlacementContext(queueName);
416464
} catch (YarnException e) {
417465
LOG.error("Rejecting application due to a failed fallback" +
418466
" action '{}'" + ", reason: {}", asc.getApplicationName(),
419-
e.getMessage());
467+
e);
420468
//We intentionally omit the details, we don't want any server side
421469
//config information to leak to the client side
422470
throw new YarnException("Application submission have been rejected by a" +

0 commit comments

Comments
 (0)