Skip to content

Commit 2d00a0c

Browse files
committed
YARN-8130 Race condition when container events are published for KILLED applications. (Rohith Sharma K S via Haibo Chen)
1 parent 6beb25a commit 2d00a0c

File tree

4 files changed

+113
-27
lines changed

4 files changed

+113
-27
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818

1919
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
2020

21+
import org.apache.hadoop.yarn.api.records.ApplicationId;
2122
import org.apache.hadoop.yarn.event.AbstractEvent;
2223

2324
/**
2425
* Event posted to NMTimelinePublisher which in turn publishes it to
2526
* timelineservice v2.
2627
*/
2728
public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
28-
public NMTimelineEvent(NMTimelineEventType type) {
29-
super(type);
29+
private ApplicationId appId;
30+
31+
public NMTimelineEvent(NMTimelineEventType type, ApplicationId appId) {
32+
super(type, System.currentTimeMillis());
33+
this.appId=appId;
3034
}
3135

32-
public NMTimelineEvent(NMTimelineEventType type, long timestamp) {
33-
super(type, timestamp);
36+
public ApplicationId getApplicationId() {
37+
return appId;
3438
}
3539
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,7 @@
2424
public enum NMTimelineEventType {
2525
// Publish the NM Timeline entity
2626
TIMELINE_ENTITY_PUBLISH,
27+
28+
// Stop and remove timeline client
29+
STOP_TIMELINE_CLIENT
2730
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public NMTimelinePublisher(Context context) {
9696

9797
@Override
9898
protected void serviceInit(Configuration conf) throws Exception {
99-
dispatcher = new AsyncDispatcher("NM Timeline dispatcher");
99+
dispatcher = createDispatcher();
100100
dispatcher.register(NMTimelineEventType.class,
101101
new ForwardingEventHandler());
102102
addIfService(dispatcher);
@@ -113,6 +113,10 @@ protected void serviceInit(Configuration conf) throws Exception {
113113
super.serviceInit(conf);
114114
}
115115

116+
protected AsyncDispatcher createDispatcher() {
117+
return new AsyncDispatcher("NM Timeline dispatcher");
118+
}
119+
116120
@Override
117121
protected void serviceStart() throws Exception {
118122
super.serviceStart();
@@ -141,6 +145,9 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) {
141145
putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
142146
((TimelinePublishEvent) event).getApplicationId());
143147
break;
148+
case STOP_TIMELINE_CLIENT:
149+
removeAndStopTimelineClient(event.getApplicationId());
150+
break;
144151
default:
145152
LOG.error("Unknown NMTimelineEvent type: " + event.getType());
146153
}
@@ -392,20 +399,13 @@ public void handle(NMTimelineEvent event) {
392399
}
393400

394401
private static class TimelinePublishEvent extends NMTimelineEvent {
395-
private ApplicationId appId;
396402
private TimelineEntity entityToPublish;
397403

398404
public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
399-
super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System
400-
.currentTimeMillis());
401-
this.appId = appId;
405+
super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, appId);
402406
this.entityToPublish = entity;
403407
}
404408

405-
public ApplicationId getApplicationId() {
406-
return appId;
407-
}
408-
409409
public TimelineEntity getTimelineEntityToPublish() {
410410
return entityToPublish;
411411
}
@@ -434,6 +434,11 @@ public TimelineV2Client run() throws Exception {
434434
}
435435

436436
public void stopTimelineClient(ApplicationId appId) {
437+
dispatcher.getEventHandler().handle(
438+
new NMTimelineEvent(NMTimelineEventType.STOP_TIMELINE_CLIENT, appId));
439+
}
440+
441+
private void removeAndStopTimelineClient(ApplicationId appId) {
437442
TimelineV2Client client = appToClientMap.remove(appId);
438443
if (client != null) {
439444
client.stop();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java

Lines changed: 88 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,50 +31,121 @@
3131
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
3232
import org.apache.hadoop.yarn.api.records.ApplicationId;
3333
import org.apache.hadoop.yarn.api.records.ContainerId;
34+
import org.apache.hadoop.yarn.api.records.ContainerStatus;
3435
import org.apache.hadoop.yarn.api.records.NodeId;
36+
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
3537
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
3638
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
3739
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
3840
import org.apache.hadoop.yarn.conf.YarnConfiguration;
41+
import org.apache.hadoop.yarn.event.AsyncDispatcher;
42+
import org.apache.hadoop.yarn.event.DrainDispatcher;
3943
import org.apache.hadoop.yarn.exceptions.YarnException;
44+
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
4045
import org.apache.hadoop.yarn.server.nodemanager.Context;
46+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
4147
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
4248
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
4349
import org.junit.Assert;
4450
import org.junit.Test;
51+
import org.junit.After;
52+
import org.junit.Before;
4553

4654
public class TestNMTimelinePublisher {
4755
private static final String MEMORY_ID = "MEMORY";
4856
private static final String CPU_ID = "CPU";
4957

50-
@Test
51-
public void testContainerResourceUsage() {
52-
Context context = mock(Context.class);
53-
@SuppressWarnings("unchecked")
54-
final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
55-
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
58+
private NMTimelinePublisher publisher;
59+
private DummyTimelineClient timelineClient;
60+
private Configuration conf;
61+
private DrainDispatcher dispatcher;
5662

57-
Configuration conf = new Configuration();
63+
64+
@Before public void setup() throws Exception {
65+
conf = new Configuration();
5866
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
5967
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
68+
conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
69+
3000L);
70+
timelineClient = new DummyTimelineClient(null);
71+
Context context = createMockContext();
72+
dispatcher = new DrainDispatcher();
6073

61-
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
74+
publisher = new NMTimelinePublisher(context) {
6275
public void createTimelineClient(ApplicationId appId) {
6376
if (!getAppToClientMap().containsKey(appId)) {
6477
timelineClient.init(getConfig());
6578
timelineClient.start();
6679
getAppToClientMap().put(appId, timelineClient);
6780
}
6881
}
82+
83+
@Override protected AsyncDispatcher createDispatcher() {
84+
return dispatcher;
85+
}
6986
};
7087
publisher.init(conf);
7188
publisher.start();
89+
}
90+
91+
private Context createMockContext() {
92+
Context context = mock(Context.class);
93+
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
94+
return context;
95+
}
96+
97+
@After public void tearDown() throws Exception {
98+
if (publisher != null) {
99+
publisher.stop();
100+
}
101+
if (timelineClient != null) {
102+
timelineClient.stop();
103+
}
104+
}
105+
106+
@Test public void testPublishContainerFinish() throws Exception {
107+
ApplicationId appId = ApplicationId.newInstance(0, 2);
108+
ApplicationAttemptId appAttemptId =
109+
ApplicationAttemptId.newInstance(appId, 1);
110+
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
111+
112+
String diag = "test-diagnostics";
113+
int exitStatus = 0;
114+
ContainerStatus cStatus = mock(ContainerStatus.class);
115+
when(cStatus.getContainerId()).thenReturn(cId);
116+
when(cStatus.getDiagnostics()).thenReturn(diag);
117+
when(cStatus.getExitStatus()).thenReturn(exitStatus);
118+
long timeStamp = System.currentTimeMillis();
119+
120+
ApplicationContainerFinishedEvent finishedEvent =
121+
new ApplicationContainerFinishedEvent(cStatus, timeStamp);
122+
123+
publisher.createTimelineClient(appId);
124+
publisher.publishApplicationEvent(finishedEvent);
125+
publisher.stopTimelineClient(appId);
126+
dispatcher.await();
127+
128+
ContainerEntity cEntity = new ContainerEntity();
129+
cEntity.setId(cId.toString());
130+
TimelineEntity[] lastPublishedEntities =
131+
timelineClient.getLastPublishedEntities();
132+
133+
Assert.assertNotNull(lastPublishedEntities);
134+
Assert.assertEquals(1, lastPublishedEntities.length);
135+
TimelineEntity entity = lastPublishedEntities[0];
136+
Assert.assertTrue(cEntity.equals(entity));
137+
Assert.assertEquals(diag,
138+
entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
139+
Assert.assertEquals(exitStatus,
140+
entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO));
141+
}
142+
143+
@Test public void testContainerResourceUsage() {
72144
ApplicationId appId = ApplicationId.newInstance(0, 1);
73145
publisher.createTimelineClient(appId);
74146
Container aContainer = mock(Container.class);
75-
when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
76-
ApplicationAttemptId.newInstance(appId, 1),
77-
0L));
147+
when(aContainer.getContainerId()).thenReturn(ContainerId
148+
.newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L));
78149
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
79150
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
80151
timelineClient.reset();
@@ -91,7 +162,6 @@ public void createTimelineClient(ApplicationId appId) {
91162
(float) ResourceCalculatorProcessTree.UNAVAILABLE);
92163
verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
93164
ResourceCalculatorProcessTree.UNAVAILABLE);
94-
publisher.stop();
95165
}
96166

97167
private void verifyPublishedResourceUsageMetrics(
@@ -151,8 +221,12 @@ public DummyTimelineClient(ApplicationId appId) {
151221

152222
private TimelineEntity[] lastPublishedEntities;
153223

154-
@Override
155-
public void putEntitiesAsync(TimelineEntity... entities)
224+
@Override public void putEntitiesAsync(TimelineEntity... entities)
225+
throws IOException, YarnException {
226+
this.lastPublishedEntities = entities;
227+
}
228+
229+
@Override public void putEntities(TimelineEntity... entities)
156230
throws IOException, YarnException {
157231
this.lastPublishedEntities = entities;
158232
}

0 commit comments

Comments
 (0)