Skip to content

Commit 3a0afcf

Browse files
committed
YARN-9873. Mutation API Config Change updates Version Number. Contributed by Prabhu Joseph
(cherry picked from commit 4510970)
1 parent 7025724 commit 3a0afcf

File tree

10 files changed

+124
-2
lines changed

10 files changed

+124
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
6565
*/
6666
Configuration getConfiguration();
6767

68+
/**
69+
* Get the last updated scheduler config version.
70+
* @return Last updated scheduler config version.
71+
*/
72+
long getConfigVersion() throws Exception;
73+
6874
void formatConfigurationInStore(Configuration conf) throws Exception;
6975

7076
/**

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ public void confirmMutation(boolean isValid) throws Exception {
148148
tempConfigPath = null;
149149
}
150150

151+
@Override
152+
public long getConfigVersion() throws Exception {
153+
String version = getLatestConfigPath().getName().
154+
substring(YarnConfiguration.CS_CONFIGURATION_FILE.length() + 1);
155+
return Long.parseLong(version);
156+
}
157+
151158
private void finalizeFileSystemFile() throws IOException {
152159
// call confirmMutation() make sure tempConfigPath is not null
153160
Path finalConfigPath = getFinalConfigPath(tempConfigPath);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
3333

3434
private Configuration schedConf;
3535
private LogMutation pendingMutation;
36+
private long configVersion;
3637

3738
@Override
3839
public void initialize(Configuration conf, Configuration schedConf,
3940
RMContext rmContext) {
4041
this.schedConf = schedConf;
42+
this.configVersion = System.currentTimeMillis();
4143
}
4244

4345
@Override
@@ -57,6 +59,7 @@ public void confirmMutation(boolean isValid) {
5759
}
5860
}
5961
}
62+
this.configVersion = System.currentTimeMillis();
6063
pendingMutation = null;
6164
}
6265

@@ -70,6 +73,11 @@ public synchronized Configuration retrieve() {
7073
return schedConf;
7174
}
7275

76+
@Override
77+
public long getConfigVersion() throws Exception {
78+
return configVersion;
79+
}
80+
7381
@Override
7482
public List<LogMutation> getConfirmedConfHistory(long fromId) {
7583
// Unimplemented.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
6868
private static final String DB_NAME = "yarn-conf-store";
6969
private static final String LOG_KEY = "log";
7070
private static final String VERSION_KEY = "version";
71+
private static final String CONF_VERSION_KEY = "conf-version";
7172

7273
private DB db;
7374
private long maxLogs;
@@ -124,6 +125,10 @@ public int compare(byte[] key1, byte[] key2) {
124125
return 1;
125126
} else if (key2Str.equals(LOG_KEY)) {
126127
return -1;
128+
} else if (key1Str.equals(CONF_VERSION_KEY)) {
129+
return 1;
130+
} else if (key2Str.equals(CONF_VERSION_KEY)) {
131+
return -1;
127132
}
128133
return key1Str.compareTo(key2Str);
129134
}
@@ -146,6 +151,10 @@ public byte[] findShortSuccessor(byte[] key) {
146151
File dbfile = new File(storeRoot.toString());
147152
try {
148153
db = JniDBFactory.factory.open(dbfile, options);
154+
if (db.get(bytes(CONF_VERSION_KEY)) == null) {
155+
db.put(bytes(CONF_VERSION_KEY),
156+
bytes(String.valueOf(System.currentTimeMillis())));
157+
}
149158
} catch (NativeDB.DBException e) {
150159
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
151160
LOG.info("Creating conf database at " + dbfile);
@@ -158,6 +167,8 @@ public byte[] findShortSuccessor(byte[] key) {
158167
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
159168
}
160169
db.write(initBatch);
170+
db.put(bytes(CONF_VERSION_KEY),
171+
bytes(String.valueOf(System.currentTimeMillis())));
161172
} catch (DBException dbErr) {
162173
throw new IOException(dbErr.getMessage(), dbErr);
163174
}
@@ -215,6 +226,8 @@ public void confirmMutation(boolean isValid) throws IOException {
215226
}
216227
}
217228
db.write(updateBatch);
229+
db.put(bytes(CONF_VERSION_KEY),
230+
bytes(String.valueOf(System.currentTimeMillis())));
218231
pendingMutation = null;
219232
}
220233

@@ -250,14 +263,22 @@ public synchronized Configuration retrieve() {
250263
Map.Entry<byte[], byte[]> entry = itr.next();
251264
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
252265
String value = new String(entry.getValue(), StandardCharsets.UTF_8);
253-
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
266+
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY) ||
267+
key.equals(CONF_VERSION_KEY)) {
254268
break;
255269
}
256270
config.set(key, value);
257271
}
258272
return config;
259273
}
260274

275+
@Override
276+
public long getConfigVersion() throws Exception {
277+
String version = new String(db.get(bytes(CONF_VERSION_KEY)),
278+
StandardCharsets.UTF_8);
279+
return Long.parseLong(version);
280+
}
281+
261282
@Override
262283
public List<LogMutation> getConfirmedConfHistory(long fromId) {
263284
return null; // unimplemented

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ public Configuration getConfiguration() {
134134
return new Configuration(schedConf);
135135
}
136136

137+
@Override
138+
public long getConfigVersion() throws Exception {
139+
return confStore.getConfigVersion();
140+
}
141+
137142
@Override
138143
public ConfigurationMutationACLPolicy getAclMutationPolicy() {
139144
return aclMutationPolicy;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ public void close() throws IOException {}
132132
*/
133133
public abstract void format() throws Exception;
134134

135+
/**
136+
* Get the last updated config version.
137+
* @return Last updated config version.
138+
*/
139+
public abstract long getConfigVersion() throws Exception;
140+
135141
/**
136142
* Get a list of confirmed configuration mutations starting from a given id.
137143
* @param fromId id from which to start getting mutations, inclusive

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
6262
private static final String LOGS_PATH = "LOGS";
6363
private static final String CONF_STORE_PATH = "CONF_STORE";
6464
private static final String FENCING_PATH = "FENCING";
65+
private static final String CONF_VERSION_PATH = "CONF_VERSION";
6566

6667
private String zkVersionPath;
6768
private String logsPath;
6869
private String confStorePath;
6970
private String fencingNodePath;
71+
private String confVersionPath;
7072

7173
@VisibleForTesting
7274
protected ZKCuratorManager zkManager;
@@ -89,6 +91,7 @@ public void initialize(Configuration config, Configuration schedConf,
8991
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
9092
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
9193
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
94+
this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH);
9295

9396
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
9497
zkManager.delete(fencingNodePath);
@@ -99,13 +102,21 @@ public void initialize(Configuration config, Configuration schedConf,
99102
serializeObject(new LinkedList<LogMutation>()), -1);
100103
}
101104

105+
if (!zkManager.exists(confVersionPath)) {
106+
zkManager.create(confVersionPath);
107+
zkManager.setData(confVersionPath,
108+
String.valueOf(System.currentTimeMillis()), -1);
109+
}
110+
102111
if (!zkManager.exists(confStorePath)) {
103112
zkManager.create(confStorePath);
104113
HashMap<String, String> mapSchedConf = new HashMap<>();
105114
for (Map.Entry<String, String> entry : schedConf) {
106115
mapSchedConf.put(entry.getKey(), entry.getValue());
107116
}
108117
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
118+
zkManager.setData(confVersionPath,
119+
String.valueOf(System.currentTimeMillis()), -1);
109120
}
110121
}
111122

@@ -185,6 +196,9 @@ public void confirmMutation(boolean isValid)
185196
}
186197
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
187198
zkAcl, fencingNodePath);
199+
zkManager.setData(confVersionPath,
200+
String.valueOf(System.currentTimeMillis()), -1);
201+
188202
}
189203
pendingMutation = null;
190204
}
@@ -213,6 +227,11 @@ public synchronized Configuration retrieve() {
213227
return null;
214228
}
215229

230+
@Override
231+
public long getConfigVersion() throws Exception {
232+
return Long.parseLong(zkManager.getStringData(confVersionPath));
233+
}
234+
216235
@Override
217236
public List<LogMutation> getConfirmedConfHistory(long fromId) {
218237
return null; // unimplemented

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ public final class RMWSConsts {
5151
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
5252
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
5353

54+
/** Path for {@code RMWebServices#getSchedulerConfigurationVersion}. */
55+
public static final String SCHEDULER_CONF_VERSION = "/scheduler-conf/version";
56+
5457
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
5558
public static final String SCHEDULER_LOGS = "/scheduler/logs";
5659

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2345,7 +2345,7 @@ public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr)
23452345
}
23462346
} else {
23472347
return Response.status(Status.BAD_REQUEST)
2348-
.entity("Configuration change only supported by " +
2348+
.entity("Scheduler Configuration format only supported by " +
23492349
"MutableConfScheduler.").build();
23502350
}
23512351
}
@@ -2435,6 +2435,38 @@ public Response getSchedulerConfiguration(@Context HttpServletRequest hsr)
24352435
}
24362436
}
24372437

2438+
@GET
2439+
@Path(RMWSConsts.SCHEDULER_CONF_VERSION)
2440+
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
2441+
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
2442+
public Response getSchedulerConfigurationVersion(@Context
2443+
HttpServletRequest hsr) throws AuthorizationException {
2444+
// Only admin user is allowed to get scheduler conf version
2445+
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
2446+
initForWritableEndpoints(callerUGI, true);
2447+
2448+
ResourceScheduler scheduler = rm.getResourceScheduler();
2449+
if (scheduler instanceof MutableConfScheduler
2450+
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
2451+
MutableConfigurationProvider mutableConfigurationProvider =
2452+
((MutableConfScheduler) scheduler).getMutableConfProvider();
2453+
2454+
try {
2455+
long configVersion = mutableConfigurationProvider
2456+
.getConfigVersion();
2457+
return Response.status(Status.OK).entity(configVersion).build();
2458+
} catch (Exception e) {
2459+
LOG.error("Exception thrown when fetching configuration version.", e);
2460+
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
2461+
.build();
2462+
}
2463+
} else {
2464+
return Response.status(Status.BAD_REQUEST)
2465+
.entity("Configuration Version only supported by "
2466+
+ "MutableConfScheduler.").build();
2467+
}
2468+
}
2469+
24382470
@GET
24392471
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
24402472
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,21 @@ public void testFormatConfiguration() throws Exception {
137137
assertNull(confStore.retrieve());
138138
}
139139

140+
@Test
141+
public void testGetConfigurationVersion() throws Exception {
142+
confStore.initialize(conf, schedConf, rmContext);
143+
long v1 = confStore.getConfigVersion();
144+
Thread.sleep(2000);
145+
Map<String, String> update = new HashMap<>();
146+
update.put("keyver", "valver");
147+
YarnConfigurationStore.LogMutation mutation =
148+
new YarnConfigurationStore.LogMutation(update, TEST_USER);
149+
confStore.logMutation(mutation);
150+
confStore.confirmMutation(true);
151+
long v2 = confStore.getConfigVersion();
152+
assertTrue(v2 > v1);
153+
}
154+
140155
@Test
141156
public void testPersistUpdatedConfiguration() throws Exception {
142157
confStore.initialize(conf, schedConf, rmContext);

0 commit comments

Comments
 (0)