Skip to content

Commit 1c51f36

Browse files
YARN-9788. Queue Management API does not support parallel updates. Contributed by Prabhu Joseph
1 parent 13cea04 commit 1c51f36

File tree

14 files changed

+116
-71
lines changed

14 files changed

+116
-71
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
4646
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
4747
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
48+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
4849
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
4950
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
5051
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
@@ -247,10 +248,10 @@ public void testFormatSchedulerConf() throws Exception {
247248
globalUpdates.put("schedKey1", "schedVal1");
248249
schedUpdateInfo.setGlobalParams(globalUpdates);
249250

250-
provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(),
251-
schedUpdateInfo);
251+
LogMutation log = provider.logAndApplyMutation(
252+
UserGroupInformation.getCurrentUser(), schedUpdateInfo);
252253
rm.getRMContext().getRMAdminService().refreshQueues();
253-
provider.confirmPendingMutation(true);
254+
provider.confirmPendingMutation(log, true);
254255

255256
Configuration schedulerConf = provider.getConfiguration();
256257
assertEquals("schedVal1", schedulerConf.get("schedKey1"));

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.security.UserGroupInformation;
23+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
2324
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
2425

2526
import java.io.IOException;
@@ -46,18 +47,21 @@ public interface MutableConfigurationProvider {
4647
* Log user's requested configuration mutation, and applies it in-memory.
4748
* @param user User who requested the change
4849
* @param confUpdate User's requested configuration change
50+
* @return LogMutation with update info from given SchedConfUpdateInfo
4951
* @throws Exception if logging the mutation fails
5052
*/
51-
void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
52-
confUpdate) throws Exception;
53+
LogMutation logAndApplyMutation(UserGroupInformation user,
54+
SchedConfUpdateInfo confUpdate) throws Exception;
5355

5456
/**
5557
* Confirm last logged mutation.
58+
* @param pendingMutation the log mutation to apply
5659
* @param isValid if the last logged mutation is applied to scheduler
5760
* properly.
5861
* @throws Exception if confirming mutation fails
5962
*/
60-
void confirmPendingMutation(boolean isValid) throws Exception;
63+
void confirmPendingMutation(LogMutation pendingMutation,
64+
boolean isValid) throws Exception;
6165

6266
/**
6367
* Returns scheduler configuration cached in this provider.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
5858
private int maxVersion;
5959
private Path schedulerConfDir;
6060
private FileSystem fileSystem;
61-
private LogMutation pendingMutation;
6261
private PathFilter configFilePathFilter;
6362
private volatile Configuration schedConf;
6463
private volatile Configuration oldConf;
@@ -134,10 +133,9 @@ public boolean accept(Path path) {
134133
*/
135134
@Override
136135
public void logMutation(LogMutation logMutation) throws IOException {
137-
pendingMutation = logMutation;
138136
LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
139137
oldConf = new Configuration(schedConf);
140-
Map<String, String> mutations = pendingMutation.getUpdates();
138+
Map<String, String> mutations = logMutation.getUpdates();
141139
for (Map.Entry<String, String> kv : mutations.entrySet()) {
142140
if (kv.getValue() == null) {
143141
this.schedConf.unset(kv.getKey());
@@ -149,12 +147,14 @@ public void logMutation(LogMutation logMutation) throws IOException {
149147
}
150148

151149
/**
150+
* @param pendingMutation the log mutation to apply
152151
* @param isValid if true, finalize temp configuration file
153152
* if false, remove temp configuration file and rollback
154153
* @throws Exception throw IOE when write temp configuration file fail
155154
*/
156155
@Override
157-
public void confirmMutation(boolean isValid) throws Exception {
156+
public void confirmMutation(LogMutation pendingMutation,
157+
boolean isValid) throws Exception {
158158
if (pendingMutation == null || tempConfigPath == null) {
159159
LOG.warn("pendingMutation or tempConfigPath is null, do nothing");
160160
return;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
public class InMemoryConfigurationStore extends YarnConfigurationStore {
3333

3434
private Configuration schedConf;
35-
private LogMutation pendingMutation;
3635
private long configVersion;
3736

3837
@Override
@@ -42,13 +41,17 @@ public void initialize(Configuration conf, Configuration schedConf,
4241
this.configVersion = 1L;
4342
}
4443

44+
/**
45+
* This method does not log as it does not support backing store.
46+
* The mutation to be applied on top of schedConf will be directly passed
47+
* in confirmMutation.
48+
*/
4549
@Override
4650
public void logMutation(LogMutation logMutation) {
47-
pendingMutation = logMutation;
4851
}
4952

5053
@Override
51-
public void confirmMutation(boolean isValid) {
54+
public void confirmMutation(LogMutation pendingMutation, boolean isValid) {
5255
if (isValid) {
5356
for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
5457
.entrySet()) {
@@ -60,7 +63,6 @@ public void confirmMutation(boolean isValid) {
6063
}
6164
this.configVersion = this.configVersion + 1L;
6265
}
63-
pendingMutation = null;
6466
}
6567

6668
@Override

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
7575
private DB versiondb;
7676
private long maxLogs;
7777
private Configuration conf;
78-
private LogMutation pendingMutation;
7978
@VisibleForTesting
8079
protected static final Version CURRENT_VERSION_INFO = Version
8180
.newInstance(0, 1);
@@ -232,11 +231,11 @@ public void logMutation(LogMutation logMutation) throws IOException {
232231
}
233232
db.put(bytes(LOG_KEY), serLogMutations(logs));
234233
}
235-
pendingMutation = logMutation;
236234
}
237235

238236
@Override
239-
public void confirmMutation(boolean isValid) throws IOException {
237+
public void confirmMutation(LogMutation pendingMutation,
238+
boolean isValid) throws IOException {
240239
WriteBatch updateBatch = db.createWriteBatch();
241240
if (isValid) {
242241
for (Map.Entry<String, String> changes :
@@ -252,7 +251,6 @@ public void confirmMutation(boolean isValid) throws IOException {
252251
bytes(String.valueOf(configVersion)));
253252
}
254253
db.write(updateBatch);
255-
pendingMutation = null;
256254
}
257255

258256
private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public ConfigurationMutationACLPolicy getAclMutationPolicy() {
128128
}
129129

130130
@Override
131-
public void logAndApplyMutation(UserGroupInformation user,
131+
public LogMutation logAndApplyMutation(UserGroupInformation user,
132132
SchedConfUpdateInfo confUpdate) throws Exception {
133133
oldConf = new Configuration(schedConf);
134134
Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
@@ -141,6 +141,7 @@ public void logAndApplyMutation(UserGroupInformation user,
141141
schedConf.set(kv.getKey(), kv.getValue());
142142
}
143143
}
144+
return log;
144145
}
145146

146147
@Override
@@ -184,10 +185,11 @@ public void revertToOldConfig(Configuration config) throws Exception {
184185
}
185186

186187
@Override
187-
public void confirmPendingMutation(boolean isValid) throws Exception {
188+
public void confirmPendingMutation(LogMutation pendingMutation,
189+
boolean isValid) throws Exception {
188190
formatLock.readLock().lock();
189191
try {
190-
confStore.confirmMutation(isValid);
192+
confStore.confirmMutation(pendingMutation, isValid);
191193
if (!isValid) {
192194
schedConf = oldConf;
193195
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public abstract class YarnConfigurationStore {
5252
* LogMutation encapsulates the fields needed for configuration mutation
5353
* audit logging and recovery.
5454
*/
55-
static class LogMutation implements Serializable {
55+
public static class LogMutation implements Serializable {
5656
private Map<String, String> updates;
5757
private String user;
5858

@@ -113,11 +113,13 @@ public void close() throws IOException {}
113113
* last logged by {@code logMutation} and marks the mutation as persisted (no
114114
* longer pending). If isValid is true, merge the mutation with the persisted
115115
* configuration.
116+
* @param pendingMutation the log mutation to apply
116117
* @param isValid if true, update persisted configuration with pending
117118
* mutation.
118119
* @throws Exception if mutation confirmation fails
119120
*/
120-
public abstract void confirmMutation(boolean isValid) throws Exception;
121+
public abstract void confirmMutation(LogMutation pendingMutation,
122+
boolean isValid) throws Exception;
121123

122124
/**
123125
* Retrieve the persisted configuration.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
5454
protected static final Version CURRENT_VERSION_INFO = Version
5555
.newInstance(0, 1);
5656
private Configuration conf;
57-
private LogMutation pendingMutation;
5857

5958
private String znodeParentPath;
6059

@@ -175,12 +174,11 @@ public void logMutation(LogMutation logMutation) throws Exception {
175174
zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
176175
fencingNodePath);
177176
}
178-
pendingMutation = logMutation;
179177
}
180178

181179
@Override
182-
public void confirmMutation(boolean isValid)
183-
throws Exception {
180+
public void confirmMutation(LogMutation pendingMutation,
181+
boolean isValid) throws Exception {
184182
if (isValid) {
185183
Configuration storedConfigs = retrieve();
186184
Map<String, String> mapConf = new HashMap<>();
@@ -201,7 +199,6 @@ public void confirmMutation(boolean isValid)
201199
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
202200

203201
}
204-
pendingMutation = null;
205202
}
206203

207204
@Override

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
149149
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
150150
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
151+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
151152
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
152153
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
153154
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@@ -2643,14 +2644,15 @@ public Void run() throws Exception {
26432644
throw new org.apache.hadoop.security.AccessControlException("User"
26442645
+ " is not admin of all modified queues.");
26452646
}
2646-
provider.logAndApplyMutation(callerUGI, mutationInfo);
2647+
LogMutation logMutation = provider.logAndApplyMutation(callerUGI,
2648+
mutationInfo);
26472649
try {
26482650
rm.getRMContext().getRMAdminService().refreshQueues();
26492651
} catch (IOException | YarnException e) {
2650-
provider.confirmPendingMutation(false);
2652+
provider.confirmPendingMutation(logMutation, false);
26512653
throw e;
26522654
}
2653-
provider.confirmPendingMutation(true);
2655+
provider.confirmPendingMutation(logMutation, true);
26542656
return null;
26552657
}
26562658
});

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ public void testConfigurationUpdate() throws Exception {
6464
YarnConfigurationStore.LogMutation mutation1 =
6565
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
6666
confStore.logMutation(mutation1);
67-
confStore.confirmMutation(true);
67+
confStore.confirmMutation(mutation1, true);
6868
assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
6969

7070
Map<String, String> update2 = new HashMap<>();
7171
update2.put("keyUpdate2", "valUpdate2");
7272
YarnConfigurationStore.LogMutation mutation2 =
7373
new YarnConfigurationStore.LogMutation(update2, TEST_USER);
7474
confStore.logMutation(mutation2);
75-
confStore.confirmMutation(false);
75+
confStore.confirmMutation(mutation2, false);
7676
assertNull("Configuration should not be updated",
7777
confStore.retrieve().get("keyUpdate2"));
7878
confStore.close();
@@ -89,7 +89,7 @@ public void testNullConfigurationUpdate() throws Exception {
8989
YarnConfigurationStore.LogMutation mutation =
9090
new YarnConfigurationStore.LogMutation(update, TEST_USER);
9191
confStore.logMutation(mutation);
92-
confStore.confirmMutation(true);
92+
confStore.confirmMutation(mutation, true);
9393
assertNull(confStore.retrieve().get("key"));
9494
confStore.close();
9595
}

0 commit comments

Comments
 (0)