|
22 | 22 | import java.util.ArrayList; |
23 | 23 | import java.util.Arrays; |
24 | 24 | import java.util.Collection; |
| 25 | +import java.util.Collections; |
25 | 26 | import java.util.EnumSet; |
| 27 | + |
| 28 | +import java.util.HashMap; |
26 | 29 | import java.util.List; |
27 | 30 | import java.util.Map; |
28 | 31 | import java.util.Set; |
|
33 | 36 |
|
34 | 37 | import com.google.gson.Gson; |
35 | 38 | import com.google.gson.reflect.TypeToken; |
| 39 | + |
| 40 | +import org.apache.commons.lang3.builder.EqualsBuilder; |
| 41 | +import org.apache.commons.lang3.builder.HashCodeBuilder; |
| 42 | +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| 43 | +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; |
36 | 44 | import org.slf4j.Logger; |
37 | 45 | import org.slf4j.LoggerFactory; |
38 | 46 | import org.apache.hadoop.classification.InterfaceAudience.Private; |
@@ -142,6 +150,7 @@ public abstract class AbstractYarnScheduler |
142 | 150 | Thread updateThread; |
143 | 151 | private final Object updateThreadMonitor = new Object(); |
144 | 152 | private Timer releaseCache; |
| 153 | + private boolean autoCorrectContainerAllocation; |
145 | 154 |
|
146 | 155 | /* |
147 | 156 | * All schedulers which are inheriting AbstractYarnScheduler should use |
@@ -196,6 +205,10 @@ public void serviceInit(Configuration conf) throws Exception { |
196 | 205 | nmHeartbeatInterval = |
197 | 206 | conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, |
198 | 207 | YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); |
| 208 | + skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); |
| 209 | + autoCorrectContainerAllocation = |
| 210 | + conf.getBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION, |
| 211 | + YarnConfiguration.DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION); |
199 | 212 | long configuredMaximumAllocationWaitTime = |
200 | 213 | conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, |
201 | 214 | YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); |
@@ -589,6 +602,106 @@ public void recoverContainersOnNode(List<NMContainerStatus> containerReports, |
589 | 602 | } |
590 | 603 | } |
591 | 604 |
|
| 605 | + /** |
| 606 | + * Autocorrect container resourceRequests by decrementing the number of newly allocated containers |
| 607 | + * from the current container request. This also updates the newlyAllocatedContainers to be within |
| 608 | + * the limits of the current container resourceRequests. |
| 609 | + * ResourceRequests locality/resourceName is not considered while autocorrecting the container |
| 610 | + * request, hence when there are two types of resourceRequest which is same except for the |
| 611 | + * locality/resourceName, it is counted as same {@link ContainerObjectType} and the container |
| 612 | + * ask and number of newly allocated container is decremented accordingly. |
| 613 | + * For example when a client requests for 4 containers with locality/resourceName |
| 614 | + * as "node1", AMRMClientaugments the resourceRequest into two |
| 615 | + * where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1), |
| 616 | + * if Yarn allocated 6 containers previously, it will release 2 containers as well as |
| 617 | + * update the container ask to 0. |
| 618 | + * |
| 619 | + * If there is a client which directly calls Yarn (without AMRMClient) with |
| 620 | + * two where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1) |
| 621 | + * the autocorrection may not work as expected. The use case of such client is very rare. |
| 622 | + * |
| 623 | + * <p> |
| 624 | + * This method is called from {@link AbstractYarnScheduler#allocate} method. It is package private |
| 625 | + * to be used within the scheduler package only. |
| 626 | + * @param resourceRequests List of resources to be allocated |
| 627 | + * @param application ApplicationAttempt |
| 628 | + */ |
| 629 | + @VisibleForTesting |
| 630 | + protected void autoCorrectContainerAllocation(List<ResourceRequest> resourceRequests, |
| 631 | + SchedulerApplicationAttempt application) { |
| 632 | + |
| 633 | + // if there is no resourceRequests for containers or no newly allocated container from |
| 634 | + // the previous request there is nothing to do. |
| 635 | + if (!autoCorrectContainerAllocation || resourceRequests.isEmpty() || |
| 636 | + application.newlyAllocatedContainers.isEmpty()) { |
| 637 | + return; |
| 638 | + } |
| 639 | + |
| 640 | + // iterate newlyAllocatedContainers and form a mapping of container type |
| 641 | + // and number of its occurrence. |
| 642 | + Map<ContainerObjectType, List<RMContainer>> allocatedContainerMap = new HashMap<>(); |
| 643 | + for (RMContainer rmContainer : application.newlyAllocatedContainers) { |
| 644 | + Container container = rmContainer.getContainer(); |
| 645 | + ContainerObjectType containerObjectType = new ContainerObjectType( |
| 646 | + container.getAllocationRequestId(), container.getPriority(), |
| 647 | + container.getExecutionType(), container.getResource()); |
| 648 | + allocatedContainerMap.computeIfAbsent(containerObjectType, |
| 649 | + k -> new ArrayList<>()).add(rmContainer); |
| 650 | + } |
| 651 | + |
| 652 | + Map<ContainerObjectType, Integer> extraContainerAllocatedMap = new HashMap<>(); |
| 653 | + // iterate through resourceRequests and update the request by |
| 654 | + // decrementing the already allocated containers. |
| 655 | + for (ResourceRequest request : resourceRequests) { |
| 656 | + ContainerObjectType containerObjectType = |
| 657 | + new ContainerObjectType(request.getAllocationRequestId(), |
| 658 | + request.getPriority(), request.getExecutionTypeRequest().getExecutionType(), |
| 659 | + request.getCapability()); |
| 660 | + int numContainerAllocated = allocatedContainerMap.getOrDefault(containerObjectType, |
| 661 | + Collections.emptyList()).size(); |
| 662 | + if (numContainerAllocated > 0) { |
| 663 | + int numContainerAsk = request.getNumContainers(); |
| 664 | + int updatedContainerRequest = numContainerAsk - numContainerAllocated; |
| 665 | + if (updatedContainerRequest < 0) { |
| 666 | + // add an entry to extra allocated map |
| 667 | + extraContainerAllocatedMap.put(containerObjectType, Math.abs(updatedContainerRequest)); |
| 668 | + LOG.debug("{} container of the resource type: {} will be released", |
| 669 | + Math.abs(updatedContainerRequest), request); |
| 670 | + // if newlyAllocatedContainer count is more than the current container |
| 671 | + // resourceRequests, reset it to 0. |
| 672 | + updatedContainerRequest = 0; |
| 673 | + } |
| 674 | + |
| 675 | + // update the request |
| 676 | + LOG.debug("Updating container resourceRequests from {} to {} for the resource type: {}", |
| 677 | + numContainerAsk, updatedContainerRequest, request); |
| 678 | + request.setNumContainers(updatedContainerRequest); |
| 679 | + } |
| 680 | + } |
| 681 | + |
| 682 | + // Iterate over the entries in extraContainerAllocatedMap |
| 683 | + for (Map.Entry<ContainerObjectType, Integer> entry : extraContainerAllocatedMap.entrySet()) { |
| 684 | + ContainerObjectType containerObjectType = entry.getKey(); |
| 685 | + int extraContainers = entry.getValue(); |
| 686 | + |
| 687 | + // Get the list of allocated containers for the current ContainerObjectType |
| 688 | + List<RMContainer> allocatedContainers = allocatedContainerMap.get(containerObjectType); |
| 689 | + if (allocatedContainers != null) { |
| 690 | + for (RMContainer rmContainer : allocatedContainers) { |
| 691 | + if (extraContainers > 0) { |
| 692 | + // Change the state of the container from ALLOCATED to EXPIRED since it is not required. |
| 693 | + LOG.debug("Removing extra container:{}", rmContainer.getContainer()); |
| 694 | + completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( |
| 695 | + rmContainer.getContainerId(), SchedulerUtils.EXPIRED_CONTAINER), |
| 696 | + RMContainerEventType.EXPIRE); |
| 697 | + application.newlyAllocatedContainers.remove(rmContainer); |
| 698 | + extraContainers--; |
| 699 | + } |
| 700 | + } |
| 701 | + } |
| 702 | + } |
| 703 | + } |
| 704 | + |
592 | 705 | private RMContainer recoverAndCreateContainer(NMContainerStatus status, |
593 | 706 | RMNode node, String queueName) { |
594 | 707 | Container container = |
@@ -623,6 +736,14 @@ private void recoverResourceRequestForContainer(RMContainer rmContainer) { |
623 | 736 | return; |
624 | 737 | } |
625 | 738 |
|
| 739 | + // when auto correct container allocation is enabled, there can be a case when extra containers |
| 740 | + // go to expired state from allocated state. When such scenario happens do not re-attempt the |
| 741 | + // container request since this is expected. |
| 742 | + if (autoCorrectContainerAllocation && |
| 743 | + RMContainerState.EXPIRED.equals(rmContainer.getState())) { |
| 744 | + return; |
| 745 | + } |
| 746 | + |
626 | 747 | // Add resource request back to Scheduler ApplicationAttempt. |
627 | 748 |
|
628 | 749 | // We lookup the application-attempt here again using |
@@ -1514,4 +1635,101 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, |
1514 | 1635 | public void resetSchedulerMetrics() { |
1515 | 1636 | // reset scheduler metrics |
1516 | 1637 | } |
| 1638 | + |
| 1639 | + /** |
| 1640 | + * Gets the apps from a given queue. |
| 1641 | + * |
| 1642 | + * Mechanics: |
| 1643 | + * 1. Get all {@link ApplicationAttemptId}s in the given queue by |
| 1644 | + * {@link #getAppsInQueue(String)} method. |
| 1645 | + * 2. Always need to check validity for the given queue by the returned |
| 1646 | + * values. |
| 1647 | + * |
| 1648 | + * @param queueName queue name |
| 1649 | + * @return a collection of app attempt ids in the given queue, it maybe empty. |
| 1650 | + * @throws YarnException if {@link #getAppsInQueue(String)} return null, will |
| 1651 | + * throw this exception. |
| 1652 | + */ |
| 1653 | + private List<ApplicationAttemptId> getAppsFromQueue(String queueName) |
| 1654 | + throws YarnException { |
| 1655 | + List<ApplicationAttemptId> apps = getAppsInQueue(queueName); |
| 1656 | + if (apps == null) { |
| 1657 | + throw new YarnException("The specified queue: " + queueName |
| 1658 | + + " doesn't exist"); |
| 1659 | + } |
| 1660 | + return apps; |
| 1661 | + } |
| 1662 | + |
| 1663 | + /** |
| 1664 | + * ContainerObjectType is a container object with the following properties. |
| 1665 | + * Namely allocationId, priority, executionType and resourceType. |
| 1666 | + */ |
| 1667 | + protected class ContainerObjectType extends Object { |
| 1668 | + private final long allocationId; |
| 1669 | + private final Priority priority; |
| 1670 | + private final ExecutionType executionType; |
| 1671 | + private final Resource resource; |
| 1672 | + |
| 1673 | + public ContainerObjectType(long allocationId, Priority priority, |
| 1674 | + ExecutionType executionType, Resource resource) { |
| 1675 | + this.allocationId = allocationId; |
| 1676 | + this.priority = priority; |
| 1677 | + this.executionType = executionType; |
| 1678 | + this.resource = resource; |
| 1679 | + } |
| 1680 | + |
| 1681 | + public long getAllocationId() { |
| 1682 | + return allocationId; |
| 1683 | + } |
| 1684 | + |
| 1685 | + public Priority getPriority() { |
| 1686 | + return priority; |
| 1687 | + } |
| 1688 | + |
| 1689 | + public ExecutionType getExecutionType() { |
| 1690 | + return executionType; |
| 1691 | + } |
| 1692 | + |
| 1693 | + public Resource getResource() { |
| 1694 | + return resource; |
| 1695 | + } |
| 1696 | + |
| 1697 | + @Override |
| 1698 | + public int hashCode() { |
| 1699 | + return new HashCodeBuilder(17, 37) |
| 1700 | + .append(allocationId) |
| 1701 | + .append(priority) |
| 1702 | + .append(executionType) |
| 1703 | + .append(resource) |
| 1704 | + .toHashCode(); |
| 1705 | + } |
| 1706 | + |
| 1707 | + @Override |
| 1708 | + public boolean equals(Object obj) { |
| 1709 | + if (obj == null) { |
| 1710 | + return false; |
| 1711 | + } |
| 1712 | + if (obj.getClass() != this.getClass()) { |
| 1713 | + return false; |
| 1714 | + } |
| 1715 | + |
| 1716 | + ContainerObjectType other = (ContainerObjectType) obj; |
| 1717 | + return new EqualsBuilder() |
| 1718 | + .append(allocationId, other.getAllocationId()) |
| 1719 | + .append(priority, other.getPriority()) |
| 1720 | + .append(executionType, other.getExecutionType()) |
| 1721 | + .append(resource, other.getResource()) |
| 1722 | + .isEquals(); |
| 1723 | + } |
| 1724 | + |
| 1725 | + @Override |
| 1726 | + public String toString() { |
| 1727 | + return "{ContainerObjectType: " |
| 1728 | + + ", Priority: " + getPriority() |
| 1729 | + + ", Allocation Id: " + getAllocationId() |
| 1730 | + + ", Execution Type: " + getExecutionType() |
| 1731 | + + ", Resource: " + getResource() |
| 1732 | + + "}"; |
| 1733 | + } |
| 1734 | + } |
1517 | 1735 | } |
0 commit comments