Skip to content

Commit 172bcd8

Browse files
committed
YARN-8995. Log events info in AsyncDispatcher when event queue size cumulatively reaches a certain number every time. Contributed by zhuqi.
1 parent f347c34 commit 172bcd8

File tree

4 files changed

+123
-0
lines changed

4 files changed

+123
-0
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2483,6 +2483,20 @@ public static boolean isAclEnabled(Configuration conf) {
24832483

24842484
public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;
24852485

2486+
/**
2487+
* The threshold used to trigger the logging of event types and counts
2488+
* in RM's main event dispatcher. Default value is 5000,
2489+
* which means RM will print events info when the queue size cumulatively
2490+
* reaches 5000 every time. Such info can be used to reveal what
2491+
* kind of events that RM is stuck at processing mostly,
2492+
* it can help to narrow down certain performance issues.
2493+
*/
2494+
public static final String
2495+
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD =
2496+
YARN_PREFIX + "dispatcher.print-events-info.threshold";
2497+
public static final int
2498+
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000;
2499+
24862500
/**
24872501
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
24882502
* entries

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import java.util.Map;
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.LinkedBlockingQueue;
27+
import java.util.stream.Collectors;
2728

2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031
import org.slf4j.Marker;
3132
import org.slf4j.MarkerFactory;
3233
import org.apache.hadoop.classification.InterfaceAudience.Public;
3334
import org.apache.hadoop.classification.InterfaceStability.Evolving;
35+
import org.apache.hadoop.conf.Configuration;
3436
import org.apache.hadoop.service.AbstractService;
3537
import org.apache.hadoop.util.ShutdownHookManager;
3638
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -55,8 +57,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
5557

5658
private final BlockingQueue<Event> eventQueue;
5759
private volatile int lastEventQueueSizeLogged = 0;
60+
private volatile int lastEventDetailsQueueSizeLogged = 0;
5861
private volatile boolean stopped = false;
5962

63+
//Configuration for control the details queue event printing.
64+
private int detailsInterval;
65+
private boolean printTrigger = false;
66+
6067
// Configuration flag for enabling/disabling draining dispatcher's events on
6168
// stop functionality.
6269
private volatile boolean drainEventsOnStop = false;
@@ -129,6 +136,12 @@ public void run() {
129136
}
130137
if (event != null) {
131138
dispatch(event);
139+
if (printTrigger) {
140+
//Log the latest dispatch event type
141+
// may cause the too many events queued
142+
LOG.info("Latest dispatch event type: " + event.getType());
143+
printTrigger = false;
144+
}
132145
}
133146
}
134147
}
@@ -140,6 +153,15 @@ public void disableExitOnDispatchException() {
140153
exitOnDispatchException = false;
141154
}
142155

156+
@Override
157+
protected void serviceInit(Configuration conf) throws Exception{
158+
super.serviceInit(conf);
159+
this.detailsInterval = getConfig().getInt(YarnConfiguration.
160+
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
161+
YarnConfiguration.
162+
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
163+
}
164+
143165
@Override
144166
protected void serviceStart() throws Exception {
145167
//start all the components
@@ -246,6 +268,17 @@ public EventHandler<Event> getEventHandler() {
246268
}
247269

248270
class GenericEventHandler implements EventHandler<Event> {
271+
private void printEventQueueDetails(BlockingQueue<Event> queue) {
272+
Map<Enum, Long> counterMap = eventQueue.stream().
273+
collect(Collectors.
274+
groupingBy(e -> e.getType(), Collectors.counting())
275+
);
276+
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
277+
long num = entry.getValue();
278+
LOG.info("Event type: " + entry.getKey()
279+
+ ", Event record counter: " + num);
280+
}
281+
}
249282
public void handle(Event event) {
250283
if (blockNewEvents) {
251284
return;
@@ -259,6 +292,12 @@ public void handle(Event event) {
259292
lastEventQueueSizeLogged = qSize;
260293
LOG.info("Size of event-queue is " + qSize);
261294
}
295+
if (qSize != 0 && qSize % detailsInterval == 0
296+
&& lastEventDetailsQueueSizeLogged != qSize) {
297+
lastEventDetailsQueueSizeLogged = qSize;
298+
printEventQueueDetails(eventQueue);
299+
printTrigger = true;
300+
}
262301
int remCapacity = eventQueue.remainingCapacity();
263302
if (remCapacity < 1000) {
264303
LOG.warn("Very low remaining capacity in the event-queue: "

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@
107107
<value>300000</value>
108108
</property>
109109

110+
<property>
111+
<description>
112+
The threshold used to trigger the logging of event types
113+
and counts in RM's main event dispatcher. Default length is 5000,
114+
which means RM will print events info when the queue size cumulatively
115+
reaches 5000 every time. Such info can be used to reveal what kind of events
116+
that RM is stuck at processing mostly, it can help to
117+
narrow down certain performance issues.
118+
</description>
119+
<name>yarn.dispatcher.print-events-info.threshold</name>
120+
<value>5000</value>
121+
</property>
122+
110123
<property>
111124
<description>The expiry interval for application master reporting.</description>
112125
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818

1919
package org.apache.hadoop.yarn.event;
2020

21+
import java.lang.reflect.Field;
22+
import java.lang.reflect.Modifier;
2123
import java.util.concurrent.BlockingQueue;
2224
import java.util.concurrent.LinkedBlockingQueue;
2325

26+
import org.slf4j.Logger;
2427
import org.apache.hadoop.conf.Configuration;
2528
import org.apache.hadoop.yarn.conf.YarnConfiguration;
2629
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -93,6 +96,20 @@ private enum DummyType {
9396
DUMMY
9497
}
9598

99+
private static class TestHandler implements EventHandler<Event> {
100+
@Override
101+
public void handle(Event event) {
102+
try {
103+
// As long as 10000 events queued
104+
Thread.sleep(1500);
105+
} catch (InterruptedException e) {}
106+
}
107+
}
108+
109+
private enum TestEnum {
110+
TestEventType
111+
}
112+
96113
@SuppressWarnings({ "rawtypes", "unchecked" })
97114
private void dispatchDummyEvents(Dispatcher disp, int count) {
98115
for (int i = 0; i < count; i++) {
@@ -119,5 +136,45 @@ public void testDrainDispatcherDrainEventsOnStop() throws Exception {
119136
disp.close();
120137
assertEquals(0, queue.size());
121138
}
139+
140+
//Test print dispatcher details when the blocking queue is heavy
141+
@Test(timeout = 10000)
142+
public void testPrintDispatcherEventDetails() throws Exception {
143+
YarnConfiguration conf = new YarnConfiguration();
144+
conf.setInt(YarnConfiguration.
145+
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 5000);
146+
Logger log = mock(Logger.class);
147+
AsyncDispatcher dispatcher = new AsyncDispatcher();
148+
dispatcher.init(conf);
149+
150+
Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
151+
logger.setAccessible(true);
152+
Field modifiers = Field.class.getDeclaredField("modifiers");
153+
modifiers.setAccessible(true);
154+
modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
155+
Object oldLog = logger.get(null);
156+
157+
try {
158+
logger.set(null, log);
159+
dispatcher.register(TestEnum.class, new TestHandler());
160+
dispatcher.start();
161+
162+
for (int i = 0; i < 10000; ++i) {
163+
Event event = mock(Event.class);
164+
when(event.getType()).thenReturn(TestEnum.TestEventType);
165+
dispatcher.getEventHandler().handle(event);
166+
}
167+
verify(log, atLeastOnce()).info("Event type: TestEventType, " +
168+
"Event record counter: 5000");
169+
Thread.sleep(2000);
170+
//Make sure more than one event to take
171+
verify(log, atLeastOnce()).
172+
info("Latest dispatch event type: TestEventType");
173+
dispatcher.stop();
174+
} finally {
175+
//... restore logger object
176+
logger.set(null, oldLog);
177+
}
178+
}
122179
}
123180

0 commit comments

Comments
 (0)