Skip to content

Commit bb754c5

Browse files
hotcodemachaHarshitGupta11
authored andcommitted
YARN-9425. Make initialDelay configurable for FederationStateStoreService#scheduledExecutorService (apache#4731). Contributed by groot and Shen Yinjie.
Signed-off-by: Ayush Saxena <[email protected]>
1 parent d3ebf3a commit bb754c5

File tree

4 files changed

+69
-3
lines changed

4 files changed

+69
-3
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3920,6 +3920,13 @@ public static boolean isAclEnabled(Configuration conf) {
39203920
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
39213921
"yarnfederation/";
39223922

3923+
public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY =
3924+
FEDERATION_PREFIX + "state-store.heartbeat.initial-delay";
3925+
3926+
// 30 secs
3927+
public static final int
3928+
DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = 30;
3929+
39233930
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
39243931
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
39253932

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3624,6 +3624,16 @@
36243624
<name>yarn.federation.enabled</name>
36253625
<value>false</value>
36263626
</property>
3627+
<property>
3628+
<description>
3629+
Initial delay for federation state-store heartbeat service. Value is followed by a unit
3630+
specifier: ns, us, ms, s, m, h, d for nanoseconds, microseconds, milliseconds, seconds,
3631+
minutes, hours, days respectively. Values should provide units,
3632+
but seconds are assumed
3633+
</description>
3634+
<name>yarn.federation.state-store.heartbeat.initial-delay</name>
3635+
<value>30s</value>
3636+
</property>
36273637
<property>
36283638
<description>
36293639
Machine list file to be loaded by the FederationSubCluster Resolver

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public class FederationStateStoreService extends AbstractService
9696
private FederationStateStore stateStoreClient = null;
9797
private SubClusterId subClusterId;
9898
private long heartbeatInterval;
99+
private long heartbeatInitialDelay;
99100
private RMContext rmContext;
100101

101102
public FederationStateStoreService(RMContext rmContext) {
@@ -126,10 +127,24 @@ protected void serviceInit(Configuration conf) throws Exception {
126127
heartbeatInterval = conf.getLong(
127128
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
128129
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
130+
129131
if (heartbeatInterval <= 0) {
130132
heartbeatInterval =
131133
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
132134
}
135+
136+
heartbeatInitialDelay = conf.getTimeDuration(
137+
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
138+
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
139+
TimeUnit.SECONDS);
140+
141+
if (heartbeatInitialDelay <= 0) {
142+
LOG.warn("{} configured value is wrong, must be > 0; using default value of {}",
143+
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
144+
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY);
145+
heartbeatInitialDelay =
146+
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY;
147+
}
133148
LOG.info("Initialized federation membership service.");
134149

135150
super.serviceInit(conf);
@@ -206,9 +221,9 @@ private void registerAndInitializeHeartbeat() {
206221
scheduledExecutorService =
207222
HadoopExecutors.newSingleThreadScheduledExecutor();
208223
scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
209-
heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
210-
LOG.info("Started federation membership heartbeat with interval: {}",
211-
heartbeatInterval);
224+
heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS);
225+
LOG.info("Started federation membership heartbeat with interval: {} and initial delay: {}",
226+
heartbeatInterval, heartbeatInitialDelay);
212227
}
213228

214229
@VisibleForTesting

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.hadoop.conf.Configuration;
2727
import org.apache.hadoop.ha.HAServiceProtocol;
28+
import org.apache.hadoop.test.GenericTestUtils;
2829
import org.apache.hadoop.yarn.conf.YarnConfiguration;
2930
import org.apache.hadoop.yarn.exceptions.YarnException;
3031
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@@ -173,4 +174,37 @@ private String checkSubClusterInfo(SubClusterState state)
173174
return response.getCapability();
174175
}
175176

177+
@Test
178+
public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Exception {
179+
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
180+
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
181+
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
182+
183+
GenericTestUtils.LogCapturer logCapture =
184+
GenericTestUtils.LogCapturer.captureLogs(FederationStateStoreService.LOG);
185+
186+
final MockRM rm = new MockRM(conf);
187+
188+
// Initially there should be no entry for the sub-cluster
189+
rm.init(conf);
190+
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
191+
GetSubClusterInfoResponse response = stateStore.getSubCluster(request);
192+
Assert.assertNull(response);
193+
194+
// Validate if sub-cluster is registered
195+
rm.start();
196+
String capability = checkSubClusterInfo(SubClusterState.SC_NEW);
197+
Assert.assertTrue(capability.isEmpty());
198+
199+
// Heartbeat to see if sub-cluster transitions to running
200+
FederationStateStoreHeartbeat storeHeartbeat =
201+
rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
202+
storeHeartbeat.run();
203+
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
204+
checkClusterMetricsInfo(capability, 0);
205+
206+
Assert.assertTrue(logCapture.getOutput().contains(
207+
"Started federation membership heartbeat with interval: 300 and initial delay: 10"));
208+
rm.stop();
209+
}
176210
}

0 commit comments

Comments
 (0)