Skip to content

Commit 138b7fc

Browse files
author
Minni Mittal
committed
YARN-11028. Add metrics for container allocation latency
1 parent a35f7de commit 138b7fc

File tree

4 files changed

+478
-1
lines changed

4 files changed

+478
-1
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.ConcurrentMap;
2929

30+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -395,7 +396,19 @@ public AllocateResponse allocate(AllocateRequest request)
395396

396397
ApplicationAttemptId appAttemptId =
397398
amrmTokenIdentifier.getApplicationAttemptId();
399+
RMAppAttemptMetrics rmMetrics = getAppAttemptMetrics(appAttemptId);
400+
// we do this here to prevent the internal lock in allocate()
401+
rmMetrics.setAllocateLatenciesTimestamps(request.getAskList());
402+
AllocateResponse response = allocate(request, amrmTokenIdentifier);
403+
rmMetrics.updateAllocateLatencies(response.getAllocatedContainers());
404+
return response;
405+
}
398406

407+
protected AllocateResponse allocate(AllocateRequest request,
408+
AMRMTokenIdentifier amrmTokenIdentifier)
409+
throws YarnException, IOException {
410+
ApplicationAttemptId appAttemptId =
411+
amrmTokenIdentifier.getApplicationAttemptId();
399412
this.amLivelinessMonitor.receivedPing(appAttemptId);
400413

401414
/* check if its in cache */
@@ -472,6 +485,23 @@ public AllocateResponse allocate(AllocateRequest request)
472485
}
473486
}
474487

488+
protected RMAppAttemptMetrics getAppAttemptMetrics(
489+
ApplicationAttemptId appAttemptId) {
490+
if (appAttemptId == null) {
491+
return null;
492+
}
493+
RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
494+
if (app == null) {
495+
return null;
496+
}
497+
RMAppAttempt attempt = app.getAppAttempts().get(appAttemptId);
498+
if (attempt == null) {
499+
return null;
500+
}
501+
502+
return attempt.getRMAppAttemptMetrics();
503+
}
504+
475505
public void registerAppAttempt(ApplicationAttemptId attemptId) {
476506
AllocateResponse response =
477507
recordFactory.newRecordInstance(AllocateResponse.class);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicInteger;
2828

29+
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
2930
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
3031
import org.apache.hadoop.classification.InterfaceAudience;
3132
import org.apache.hadoop.metrics2.MetricsInfo;
@@ -76,7 +77,10 @@ public class ClusterMetrics {
7677
MutableGaugeInt rmDispatcherEventQueueSize;
7778
@Metric("# of scheduler dispatcher event queue size")
7879
MutableGaugeInt schedulerDispatcherEventQueueSize;
79-
80+
@Metric("Allocation Latencies for Guarantee containers")
81+
MutableQuantiles allocateLatencyGuarQuantiles;
82+
@Metric("Allocation Latencies for Opportunistic containers")
83+
MutableQuantiles allocateLatencyOppQuantiles;
8084
private boolean rmEventProcMonitorEnable = false;
8185

8286
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
@@ -117,13 +121,25 @@ public static ClusterMetrics getMetrics() {
117121
if(INSTANCE == null){
118122
INSTANCE = new ClusterMetrics();
119123
registerMetrics();
124+
INSTANCE.initialize();
120125
isInitialized.set(true);
121126
}
122127
}
123128
}
124129
return INSTANCE;
125130
}
126131

132+
private void initialize() {
133+
allocateLatencyGuarQuantiles = registry
134+
.newQuantiles("AllocateLatencyGuaranteed",
135+
"Latency to fulfill an Allocate(Guaranteed) requests", "ops",
136+
"latency", 5);
137+
allocateLatencyOppQuantiles = registry
138+
.newQuantiles("AllocateLatencyOpportunistic",
139+
"Latency to fulfill an Allocate(Opportunistic) requests", "ops",
140+
"latency", 5);
141+
}
142+
127143
private static void registerMetrics() {
128144
registry = new MetricsRegistry(RECORD_INFO);
129145
registry.tag(RECORD_INFO, "ResourceManager");
@@ -357,6 +373,14 @@ public void incrNumContainerAssigned() {
357373
numContainersAssigned.incrementAndGet();
358374
}
359375

376+
public void addAllocateGuarLatencyEntry(long processingTime) {
377+
allocateLatencyGuarQuantiles.add(processingTime);
378+
}
379+
380+
public void addAllocateOppLatencyEntry(long processingTime) {
381+
allocateLatencyOppQuantiles.add(processingTime);
382+
}
383+
360384
private ScheduledThreadPoolExecutor getAssignCounterExecutor(){
361385
return assignCounterExecutor;
362386
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
2020

2121
import java.util.HashMap;
22+
import java.util.List;
2223
import java.util.Map;
2324
import java.util.concurrent.ConcurrentHashMap;
2425
import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,6 +30,11 @@
2930
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
3031

3132
import org.apache.commons.lang3.time.DateUtils;
33+
import org.apache.hadoop.util.Time;
34+
import org.apache.hadoop.yarn.api.records.Container;
35+
import org.apache.hadoop.yarn.api.records.ExecutionType;
36+
import org.apache.hadoop.yarn.api.records.ResourceRequest;
37+
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
3238
import org.slf4j.Logger;
3339
import org.slf4j.LoggerFactory;
3440
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -63,13 +69,18 @@ public class RMAppAttemptMetrics {
6369
new int[NodeType.values().length][NodeType.values().length];
6470
private volatile int totalAllocatedContainers;
6571

72+
private ConcurrentHashMap<Long, Long> allocationGuaranteedLatencies = null;
73+
private ConcurrentHashMap<Long, Long> allocationOpportunisticLatencies = null;
74+
6675
public RMAppAttemptMetrics(ApplicationAttemptId attemptId,
6776
RMContext rmContext) {
6877
this.attemptId = attemptId;
6978
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
7079
this.readLock = lock.readLock();
7180
this.writeLock = lock.writeLock();
7281
this.rmContext = rmContext;
82+
this.allocationGuaranteedLatencies = new ConcurrentHashMap<Long, Long>();
83+
this.allocationOpportunisticLatencies = new ConcurrentHashMap<Long, Long>();
7384
}
7485

7586
public void updatePreemptionInfo(Resource resource, RMContainer container) {
@@ -242,4 +253,149 @@ public Resource getApplicationAttemptHeadroom() {
242253
public void setApplicationAttemptHeadRoom(Resource headRoom) {
243254
this.applicationHeadroom = headRoom;
244255
}
256+
257+
/**
258+
* Add allocationID latency to the application ID with a timestap =
259+
* CurrentTime (guaranteed)
260+
*
261+
* @param allocId the allocation Id to add If the allocation ID is already
262+
* present (which shouldn't happen) it ignores the entry
263+
*/
264+
public void addAllocationGuarLatencyIfNotExists(long allocId) {
265+
addAllocationGuarLatencyIfNotExists(allocId, System.currentTimeMillis());
266+
}
267+
268+
/**
269+
* Add allocationID latency to the application ID with a specific timestamp
270+
* (guaranteed)
271+
*
272+
* @param allocId allocationId
273+
* @param timestamp the timestamp to associate
274+
*/
275+
public void addAllocationGuarLatencyIfNotExists(long allocId,
276+
long timestamp) {
277+
allocationGuaranteedLatencies.putIfAbsent(allocId, timestamp);
278+
}
279+
280+
/**
281+
* Add allocationID latency to the application ID with a timestap =
282+
* CurrentTime (opportunistic)
283+
*
284+
* @param allocId the allocation Id to add If the allocation ID is already
285+
* present (which shouldn't happen) it ignores the entry
286+
*/
287+
public void addAllocationOppLatencyIfNotExists(long allocId) {
288+
this.addAllocationOppLatencyIfNotExists(allocId,
289+
System.currentTimeMillis());
290+
}
291+
292+
/**
293+
* Add allocationID latency to the application ID with a specific timestamp
294+
* (opportunistic)
295+
*
296+
* @param allocId allocationId
297+
* @param timestamp the timestamp to associate
298+
*/
299+
public void addAllocationOppLatencyIfNotExists(long allocId, long timestamp) {
300+
allocationOpportunisticLatencies.putIfAbsent(allocId, timestamp);
301+
}
302+
303+
/**
304+
* Returns the time associated when the allocation Id was added This method
305+
* removes the allocation Id from the class (guaranteed)
306+
*
307+
* @param allocId the allocation ID to get the associated time
308+
* @return the timestamp associated with that allocation id as well as stop
309+
* tracking it
310+
*/
311+
public long getAndRemoveGuaAllocationLatencies(long allocId) {
312+
Long ret = allocationGuaranteedLatencies.remove(new Long(allocId));
313+
return ret != null ? ret : 0l;
314+
}
315+
316+
/**
317+
* Returns the time associated when the allocation Id was added This method
318+
* removes the allocation Id from the class (opportunistic)
319+
*
320+
* @param allocId the allocation ID to get the associated time
321+
* @return the timestamp associated with that allocation id as well as stop
322+
* tracking it
323+
*/
324+
public long getAndRemoveOppAllocationLatencies(long allocId) {
325+
Long ret = allocationOpportunisticLatencies.remove(new Long(allocId));
326+
return ret != null ? ret : 0l;
327+
}
328+
329+
/**
330+
* Set timestamp for the provided ResourceRequest. It will correctly identify
331+
* their ExecutionType, provided they have they have allocateId != 0 (DEFAULT)
332+
* This is used in conjunction with This is used in conjunction with
333+
* updatePromoteLatencies method method
334+
*
335+
* @param requests the ResourceRequests to add.
336+
*/
337+
public void setAllocateLatenciesTimestamps(List<ResourceRequest> requests) {
338+
long now = Time.now();
339+
for (ResourceRequest req : requests) {
340+
if (req.getNumContainers() > 0) {
341+
// we dont support tracking with negative or zero allocationIds
342+
long allocationRequestId = req.getAllocationRequestId();
343+
if (allocationRequestId > 0) {
344+
if (req.getExecutionTypeRequest() != null) {
345+
if (ExecutionType.GUARANTEED
346+
.equals(req.getExecutionTypeRequest().getExecutionType())) {
347+
addAllocationGuarLatencyIfNotExists(allocationRequestId, now);
348+
} else {
349+
addAllocationOppLatencyIfNotExists(allocationRequestId, now);
350+
}
351+
}
352+
} else {
353+
LOG.warn(String.format(
354+
"Can't register allocate latency for %s container "
355+
+ "with negative or zero allocation IDs",
356+
req.getExecutionTypeRequest().getExecutionType()));
357+
}
358+
}
359+
}
360+
}
361+
362+
/**
363+
* Updated the JMX metrics class (ClusterMetrics) with the delta time when
364+
* these containers where added. It will correctly identify their
365+
* ExecutionType, provided they have they have allocateId != 0 (DEFAULT)
366+
*
367+
* @param response the list of the containers to allocate.
368+
*/
369+
public void updateAllocateLatencies(List<Container> response) {
370+
371+
for (Container container : response) {
372+
long allocationRequestId = container.getAllocationRequestId();
373+
// we dont support tracking with negative or zero allocationIds
374+
if (allocationRequestId > 0) {
375+
long now = System.currentTimeMillis();
376+
long allocIdTime =
377+
(container.getExecutionType() == ExecutionType.GUARANTEED) ?
378+
getAndRemoveGuaAllocationLatencies(allocationRequestId) :
379+
getAndRemoveOppAllocationLatencies(allocationRequestId);
380+
if (allocIdTime != 0) {
381+
if (container.getExecutionType() == ExecutionType.GUARANTEED) {
382+
ClusterMetrics.getMetrics()
383+
.addAllocateGuarLatencyEntry(now - allocIdTime);
384+
} else {
385+
ClusterMetrics.getMetrics()
386+
.addAllocateOppLatencyEntry(now - allocIdTime);
387+
}
388+
} else {
389+
LOG.error(String.format(
390+
"Can't register allocate latency for %s container %s; allotTime=%d ",
391+
container.getExecutionType(), container.getId(), allocIdTime));
392+
}
393+
} else {
394+
LOG.warn(String.format("Cant register promotion latency "
395+
+ "for container %s. Either allocationID is less than equal to 0 or "
396+
+ "lost the container ID", container.getExecutionType().name(),
397+
container.getId()));
398+
}
399+
}
400+
}
245401
}

0 commit comments

Comments
 (0)