|
19 | 19 | package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
20 | 20 |
|
21 | 21 | import org.apache.hadoop.test.GenericTestUtils;
|
| 22 | +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; |
| 23 | +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; |
| 24 | +import org.apache.hadoop.yarn.api.records.Priority; |
| 25 | +import org.apache.hadoop.yarn.api.records.Resource; |
| 26 | +import org.apache.hadoop.yarn.api.records.ResourceSizing; |
| 27 | +import org.apache.hadoop.yarn.api.records.SchedulingRequest; |
| 28 | +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; |
| 29 | +import org.apache.hadoop.yarn.conf.YarnConfiguration; |
22 | 30 | import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
23 | 31 | import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
24 | 32 | import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
27 | 35 | import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
28 | 36 | import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
29 | 37 | import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
| 38 | +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy; |
30 | 39 | import org.junit.jupiter.api.Test;
|
31 | 40 |
|
32 | 41 | import java.util.ArrayList;
|
33 | 42 |
|
| 43 | +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; |
| 44 | +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; |
34 | 45 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.APP_BACKOFF_MISSED_THRESHOLD;
|
35 | 46 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.BACKOFF_ENABLED;
|
36 | 47 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.APP_BACKOFF_INTERVAL_MS;
|
37 | 48 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_APP_BACKOFF_MISSED_THRESHOLD;
|
38 | 49 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_APP_BACKOFF_INTERVAL_MS;
|
| 50 | +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED; |
39 | 51 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
40 | 52 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
|
41 | 53 | import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
|
@@ -109,86 +121,100 @@ public void testSchedulingWithAppBackoffEnabled() throws Exception {
|
109 | 121 | long appBackoffIntervalMs = 100L;
|
110 | 122 | CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
111 | 123 | setupQueueConfiguration(conf);
|
| 124 | + enabledMultiNodesPlacement(conf); |
112 | 125 | conf.setAppBackoffEnabled(A1, true);
|
113 | 126 | conf.setAppBackoffIntervalMs(A1, appBackoffIntervalMs);
|
114 | 127 | conf.setAppBackoffMissedThreshold(A1, 3L);
|
115 | 128 |
|
116 |
| - // Register a node with 10GB memory |
| 129 | + // Register a node |
117 | 130 | MockRM rm = new MockRM(conf);
|
118 | 131 | rm.start();
|
119 | 132 | MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB);
|
120 | 133 |
|
121 |
| - // Submit an application in queue B1 that requests 5GB memory |
122 |
| - MockRMAppSubmissionData data1 = |
123 |
| - MockRMAppSubmissionData.Builder.createWithMemory(5 * GB, rm) |
124 |
| - .withAppName("app1") |
125 |
| - .withUser("user") |
126 |
| - .withAcls(null) |
127 |
| - .withQueue(B1.getLeafName()) |
128 |
| - .withUnmanagedAM(false) |
129 |
| - .build(); |
130 |
| - RMApp app1 = MockRMAppSubmitter.submit(rm, data1); |
131 |
| - MockRM.launchAndRegisterAM(app1, rm, nm1); |
132 |
| - |
133 |
| - // Submit an application in queue A1 that requests 2GB memory, |
134 |
| - // available memory will be 3GB for the whole cluster. |
| 134 | + // Submit an application in queue A1 |
135 | 135 | MockRMAppSubmissionData data =
|
136 | 136 | MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm)
|
137 |
| - .withAppName("app2") |
| 137 | + .withAppName("app1") |
138 | 138 | .withUser("user")
|
139 | 139 | .withAcls(null)
|
140 | 140 | .withQueue(A1.getLeafName())
|
141 | 141 | .withUnmanagedAM(false)
|
142 | 142 | .build();
|
143 |
| - RMApp app2 = MockRMAppSubmitter.submit(rm, data); |
144 |
| - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); |
145 |
| - |
146 |
| - // Request 5GB memory for a container that cannot be satisfied |
147 |
| - am2.allocate("*", 5 * GB, 1, new ArrayList<>()); |
| 143 | + RMApp app = MockRMAppSubmitter.submit(rm, data); |
| 144 | + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); |
| 145 | + |
| 146 | + // Submit a request that cannot be satisfied due to the |
| 147 | + // placement-constraint condition |
| 148 | + PlacementConstraint pc = targetIn("node", |
| 149 | + allocationTag("hbase-master")).build(); |
| 150 | + SchedulingRequest schedulingRequest = SchedulingRequest.newInstance( |
| 151 | + 1, Priority.newInstance(1), ExecutionTypeRequest.newInstance(), null, |
| 152 | + ResourceSizing.newInstance(1, Resource.newInstance(2 * GB, 1)), pc); |
| 153 | + am.addSchedulingRequest(ImmutableList.of(schedulingRequest)); |
| 154 | + am.doHeartbeat(); |
148 | 155 |
|
149 | 156 | CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
150 | 157 | AbstractLeafQueue queueA1 =
|
151 | 158 | (AbstractLeafQueue) cs.getQueue(A1.getLeafName());
|
152 |
| - FiCaSchedulerApp schedulerApp2 = |
153 |
| - cs.getApplicationAttempt(am2.getApplicationAttemptId()); |
| 159 | + FiCaSchedulerApp schedulerApp = |
| 160 | + cs.getApplicationAttempt(am.getApplicationAttemptId()); |
154 | 161 |
|
155 | 162 | // Simulate missed scheduling opportunities
|
156 | 163 | for (int i = 0; i < 3; i++) {
|
157 | 164 | cs.handle(new NodeUpdateSchedulerEvent(
|
158 | 165 | rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
159 | 166 | }
|
160 |
| - assertFalse(queueA1.isAppInBackoffState(app2.getApplicationId())); |
161 |
| - assertEquals(3L, schedulerApp2.getAppMissedSchedulingOpportunities()); |
| 167 | + assertFalse(queueA1.isAppInBackoffState(app.getApplicationId())); |
| 168 | + assertEquals(3L, schedulerApp.getAppMissedSchedulingOpportunities()); |
162 | 169 |
|
163 | 170 | // Make the app enter backoff state when it reaches the missed threshold
|
164 | 171 | cs.handle(new NodeUpdateSchedulerEvent(
|
165 | 172 | rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
166 | 173 |
|
167 |
| - // Verify the app is in backoff state |
168 |
| - assertTrue(queueA1.isAppInBackoffState(app2.getApplicationId())); |
169 |
| - assertEquals(0L, schedulerApp2.getAppMissedSchedulingOpportunities()); |
| 174 | + // Verify app is in backoff state |
| 175 | + assertTrue(queueA1.isAppInBackoffState(app.getApplicationId())); |
| 176 | + assertEquals(0L, schedulerApp.getAppMissedSchedulingOpportunities()); |
170 | 177 |
|
171 | 178 | // Wait for the backoff interval to expire
|
172 | 179 | GenericTestUtils.waitFor(
|
173 |
| - () -> !queueA1.isAppInBackoffState(app2.getApplicationId()), |
| 180 | + () -> !queueA1.isAppInBackoffState(app.getApplicationId()), |
174 | 181 | appBackoffIntervalMs, appBackoffIntervalMs * 2);
|
175 | 182 |
|
176 | 183 | // Verify app is no longer in backoff state after the backoff interval
|
177 |
| - assertFalse(queueA1.isAppInBackoffState(app2.getApplicationId())); |
| 184 | + assertFalse(queueA1.isAppInBackoffState(app.getApplicationId())); |
178 | 185 |
|
179 | 186 | // Simulate another missed scheduling opportunity
|
180 |
| - cs.handle(new NodeUpdateSchedulerEvent(rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); |
181 |
| - assertFalse(queueA1.isAppInBackoffState(app2.getApplicationId())); |
182 |
| - assertEquals(1L, schedulerApp2.getAppMissedSchedulingOpportunities()); |
| 187 | + cs.handle(new NodeUpdateSchedulerEvent( |
| 188 | + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); |
| 189 | + assertFalse(queueA1.isAppInBackoffState(app.getApplicationId())); |
| 190 | + assertEquals(1L, schedulerApp.getAppMissedSchedulingOpportunities()); |
183 | 191 |
|
184 |
| - // Kill app1 to release resource on nm1 |
185 |
| - rm.killApp(app1.getApplicationId()); |
| 192 | + // Request another request which can be allocated at first |
| 193 | + am.allocate("*", 2 * GB, 1, new ArrayList<>()); |
| 194 | + cs.handle(new NodeUpdateSchedulerEvent( |
| 195 | + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); |
186 | 196 |
|
187 |
| - // Now pending request should be allocated and not in backoff state |
188 |
| - cs.handle(new NodeUpdateSchedulerEvent(rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); |
189 |
| - assertFalse(queueA1.isAppInBackoffState(schedulerApp2.getApplicationId())); |
190 |
| - assertEquals(0L, schedulerApp2.getAppMissedSchedulingOpportunities()); |
| 197 | + // new request should be allocated and app is not in backoff state |
| 198 | + assertFalse(queueA1.isAppInBackoffState(schedulerApp.getApplicationId())); |
| 199 | + assertEquals(0L, schedulerApp.getAppMissedSchedulingOpportunities()); |
191 | 200 |
|
192 | 201 | rm.stop();
|
193 | 202 | }
|
| 203 | + |
| 204 | + private void enabledMultiNodesPlacement(CapacitySchedulerConfiguration conf) { |
| 205 | + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, |
| 206 | + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); |
| 207 | + conf.setBoolean(MULTI_NODE_PLACEMENT_ENABLED, true); |
| 208 | + conf.setBoolean(PREFIX + MULTI_NODE_PLACEMENT_ENABLED, true); |
| 209 | + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, |
| 210 | + "resource-based"); |
| 211 | + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, |
| 212 | + "resource-based"); |
| 213 | + String policyName = |
| 214 | + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + |
| 215 | + ".resource-based.class"; |
| 216 | + conf.set(policyName, ResourceUsageMultiNodeLookupPolicy.class.getName()); |
| 217 | + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, |
| 218 | + true); |
| 219 | + } |
194 | 220 | }
|
0 commit comments