Skip to content

Commit 4456ea6

Browse files
YARN-8586. Extract log aggregation related fields and methods from RMAppImpl. Contributed by Peter Bacsko
1 parent 2216ec5 commit 4456ea6

File tree

2 files changed

+406
-292
lines changed

2 files changed

+406
-292
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

Lines changed: 23 additions & 292 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,19 @@
1919
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
2020

2121
import java.net.InetAddress;
22-
import java.util.ArrayList;
2322
import java.util.Collections;
2423
import java.util.EnumSet;
2524
import java.util.HashMap;
26-
import java.util.Iterator;
2725
import java.util.LinkedHashMap;
2826
import java.util.List;
2927
import java.util.Map;
30-
import java.util.Map.Entry;
3128
import java.util.Set;
3229
import java.util.TreeSet;
33-
import java.util.concurrent.ConcurrentHashMap;
3430
import java.util.concurrent.ConcurrentSkipListSet;
3531
import java.util.concurrent.locks.ReentrantReadWriteLock;
3632
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
3733
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
3834

39-
import org.apache.commons.lang3.StringUtils;
4035
import org.slf4j.Logger;
4136
import org.slf4j.LoggerFactory;
4237
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -182,19 +177,7 @@ public class RMAppImpl implements RMApp, Recoverable {
182177
new AppFinishedTransition();
183178
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
184179

185-
private final boolean logAggregationEnabled;
186-
private long logAggregationStartTime = 0;
187-
private final long logAggregationStatusTimeout;
188-
private final Map<NodeId, LogAggregationReport> logAggregationStatus =
189-
new ConcurrentHashMap<NodeId, LogAggregationReport>();
190-
private volatile LogAggregationStatus logAggregationStatusForAppReport;
191-
private int logAggregationSucceed = 0;
192-
private int logAggregationFailed = 0;
193-
private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
194-
new HashMap<NodeId, List<String>>();
195-
private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
196-
new HashMap<NodeId, List<String>>();
197-
private final int maxLogAggregationDiagnosticsInMemory;
180+
private final RMAppLogAggregation logAggregation;
198181
private Map<ApplicationTimeoutType, Long> applicationTimeouts =
199182
new HashMap<ApplicationTimeoutType, Long>();
200183

@@ -511,26 +494,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
511494
applicationSchedulingEnvs
512495
.putAll(submissionContext.getApplicationSchedulingPropertiesMap());
513496

514-
long localLogAggregationStatusTimeout =
515-
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
516-
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
517-
if (localLogAggregationStatusTimeout <= 0) {
518-
this.logAggregationStatusTimeout =
519-
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
520-
} else {
521-
this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
522-
}
523-
this.logAggregationEnabled =
524-
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
525-
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
526-
if (this.logAggregationEnabled) {
527-
this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
528-
} else {
529-
this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
530-
}
531-
maxLogAggregationDiagnosticsInMemory = conf.getInt(
532-
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
533-
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
497+
this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock);
534498

535499
// amBlacklistingEnabled can be configured globally
536500
// Just use the global values
@@ -1090,13 +1054,9 @@ public void transition(RMAppImpl app, RMAppEvent event) {
10901054
// otherwise, add it to ranNodes for further process
10911055
app.ranNodes.add(nodeAddedEvent.getNodeId());
10921056

1093-
if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
1094-
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
1095-
LogAggregationReport.newInstance(app.applicationId,
1096-
app.logAggregationEnabled ? LogAggregationStatus.NOT_START
1097-
: LogAggregationStatus.DISABLED, ""));
1098-
}
1099-
};
1057+
app.logAggregation.addReportIfNecessary(
1058+
nodeAddedEvent.getNodeId(), app.getApplicationId());
1059+
}
11001060
}
11011061

11021062
// synchronously recover attempt to ensure any incoming external events
@@ -1530,13 +1490,13 @@ private void completeAndCleanupApp(RMAppImpl app) {
15301490
finalState));
15311491
}
15321492

1533-
// Send app completed event to AppManager
15341493
app.handler.handle(new RMAppManagerEvent(app.applicationId,
15351494
RMAppManagerEventType.APP_COMPLETED));
15361495
}
15371496

15381497
private void handleAppFinished(RMAppImpl app) {
1539-
app.logAggregationStartTime = app.systemClock.getTime();
1498+
app.logAggregation
1499+
.recordLogAggregationStartTime(app.systemClock.getTime());
15401500
// record finish time
15411501
app.finishTime = app.storedFinishTime;
15421502
if (app.finishTime == 0) {
@@ -1778,263 +1738,31 @@ public List<ResourceRequest> getAMResourceRequests() {
17781738

17791739
@Override
17801740
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
1781-
this.readLock.lock();
1782-
try {
1783-
if (!isLogAggregationFinished() && isAppInFinalState(this) &&
1784-
systemClock.getTime() > this.logAggregationStartTime
1785-
+ this.logAggregationStatusTimeout) {
1786-
for (Entry<NodeId, LogAggregationReport> output :
1787-
logAggregationStatus.entrySet()) {
1788-
if (!output.getValue().getLogAggregationStatus()
1789-
.equals(LogAggregationStatus.TIME_OUT)
1790-
&& !output.getValue().getLogAggregationStatus()
1791-
.equals(LogAggregationStatus.SUCCEEDED)
1792-
&& !output.getValue().getLogAggregationStatus()
1793-
.equals(LogAggregationStatus.FAILED)) {
1794-
output.getValue().setLogAggregationStatus(
1795-
LogAggregationStatus.TIME_OUT);
1796-
}
1797-
}
1798-
}
1799-
return Collections.unmodifiableMap(logAggregationStatus);
1800-
} finally {
1801-
this.readLock.unlock();
1802-
}
1741+
return logAggregation.getLogAggregationReportsForApp(this);
18031742
}
18041743

18051744
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
1806-
this.writeLock.lock();
1807-
try {
1808-
if (this.logAggregationEnabled && !isLogAggregationFinished()) {
1809-
LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
1810-
boolean stateChangedToFinal = false;
1811-
if (curReport == null) {
1812-
this.logAggregationStatus.put(nodeId, report);
1813-
if (isLogAggregationFinishedForNM(report)) {
1814-
stateChangedToFinal = true;
1815-
}
1816-
} else {
1817-
if (isLogAggregationFinishedForNM(report)) {
1818-
if (!isLogAggregationFinishedForNM(curReport)) {
1819-
stateChangedToFinal = true;
1820-
}
1821-
}
1822-
if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
1823-
|| curReport.getLogAggregationStatus() !=
1824-
LogAggregationStatus.RUNNING_WITH_FAILURE) {
1825-
if (curReport.getLogAggregationStatus()
1826-
== LogAggregationStatus.TIME_OUT
1827-
&& report.getLogAggregationStatus()
1828-
== LogAggregationStatus.RUNNING) {
1829-
// If the log aggregation status got from latest NM heartbeat
1830-
// is RUNNING, and current log aggregation status is TIME_OUT,
1831-
// based on whether there are any failure messages for this NM,
1832-
// we will reset the log aggregation status as RUNNING or
1833-
// RUNNING_WITH_FAILURE
1834-
if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
1835-
!logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
1836-
report.setLogAggregationStatus(
1837-
LogAggregationStatus.RUNNING_WITH_FAILURE);
1838-
}
1839-
}
1840-
curReport.setLogAggregationStatus(report
1841-
.getLogAggregationStatus());
1842-
}
1843-
}
1844-
updateLogAggregationDiagnosticMessages(nodeId, report);
1845-
if (isAppInFinalState(this) && stateChangedToFinal) {
1846-
updateLogAggregationStatus(nodeId);
1847-
}
1848-
}
1849-
} finally {
1850-
this.writeLock.unlock();
1851-
}
1745+
logAggregation.aggregateLogReport(nodeId, report, this);
18521746
}
18531747

18541748
@Override
1855-
public LogAggregationStatus getLogAggregationStatusForAppReport() {
1856-
this.readLock.lock();
1857-
try {
1858-
if (! logAggregationEnabled) {
1859-
return LogAggregationStatus.DISABLED;
1860-
}
1861-
if (isLogAggregationFinished()) {
1862-
return this.logAggregationStatusForAppReport;
1863-
}
1864-
Map<NodeId, LogAggregationReport> reports =
1865-
getLogAggregationReportsForApp();
1866-
if (reports.size() == 0) {
1867-
return this.logAggregationStatusForAppReport;
1868-
}
1869-
int logNotStartCount = 0;
1870-
int logCompletedCount = 0;
1871-
int logTimeOutCount = 0;
1872-
int logFailedCount = 0;
1873-
int logRunningWithFailure = 0;
1874-
for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
1875-
switch (report.getValue().getLogAggregationStatus()) {
1876-
case NOT_START:
1877-
logNotStartCount++;
1878-
break;
1879-
case RUNNING_WITH_FAILURE:
1880-
logRunningWithFailure ++;
1881-
break;
1882-
case SUCCEEDED:
1883-
logCompletedCount++;
1884-
break;
1885-
case FAILED:
1886-
logFailedCount++;
1887-
logCompletedCount++;
1888-
break;
1889-
case TIME_OUT:
1890-
logTimeOutCount++;
1891-
logCompletedCount++;
1892-
break;
1893-
default:
1894-
break;
1895-
}
1896-
}
1897-
if (logNotStartCount == reports.size()) {
1898-
return LogAggregationStatus.NOT_START;
1899-
} else if (logCompletedCount == reports.size()) {
1900-
// We should satisfy two condition in order to return SUCCEEDED or FAILED
1901-
// 1) make sure the application is in final state
1902-
// 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
1903-
// The SUCCEEDED/FAILED status is the final status which means
1904-
// the log aggregation is finished. And the log aggregation status will
1905-
// not be updated anymore.
1906-
if (logFailedCount > 0 && isAppInFinalState(this)) {
1907-
this.logAggregationStatusForAppReport =
1908-
LogAggregationStatus.FAILED;
1909-
return LogAggregationStatus.FAILED;
1910-
} else if (logTimeOutCount > 0) {
1911-
this.logAggregationStatusForAppReport =
1912-
LogAggregationStatus.TIME_OUT;
1913-
return LogAggregationStatus.TIME_OUT;
1914-
}
1915-
if (isAppInFinalState(this)) {
1916-
this.logAggregationStatusForAppReport =
1917-
LogAggregationStatus.SUCCEEDED;
1918-
return LogAggregationStatus.SUCCEEDED;
1919-
}
1920-
} else if (logRunningWithFailure > 0) {
1921-
return LogAggregationStatus.RUNNING_WITH_FAILURE;
1922-
}
1923-
return LogAggregationStatus.RUNNING;
1924-
} finally {
1925-
this.readLock.unlock();
1926-
}
1749+
public boolean isLogAggregationFinished() {
1750+
return logAggregation.isFinished();
19271751
}
19281752

19291753
@Override
19301754
public boolean isLogAggregationEnabled() {
1931-
return logAggregationEnabled;
1755+
return logAggregation.isEnabled();
19321756
}
19331757

1934-
@Override
1935-
public boolean isLogAggregationFinished() {
1936-
return this.logAggregationStatusForAppReport
1937-
.equals(LogAggregationStatus.SUCCEEDED)
1938-
|| this.logAggregationStatusForAppReport
1939-
.equals(LogAggregationStatus.FAILED)
1940-
|| this.logAggregationStatusForAppReport
1941-
.equals(LogAggregationStatus.TIME_OUT);
1942-
1943-
}
1944-
1945-
private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
1946-
return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
1947-
|| report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
1948-
}
1949-
1950-
private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
1951-
LogAggregationReport report) {
1952-
if (report.getDiagnosticMessage() != null
1953-
&& !report.getDiagnosticMessage().isEmpty()) {
1954-
if (report.getLogAggregationStatus()
1955-
== LogAggregationStatus.RUNNING ) {
1956-
List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
1957-
if (diagnostics == null) {
1958-
diagnostics = new ArrayList<String>();
1959-
logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
1960-
} else {
1961-
if (diagnostics.size()
1962-
== maxLogAggregationDiagnosticsInMemory) {
1963-
diagnostics.remove(0);
1964-
}
1965-
}
1966-
diagnostics.add(report.getDiagnosticMessage());
1967-
this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
1968-
StringUtils.join(diagnostics, "\n"));
1969-
} else if (report.getLogAggregationStatus()
1970-
== LogAggregationStatus.RUNNING_WITH_FAILURE) {
1971-
List<String> failureMessages =
1972-
logAggregationFailureMessagesForNMs.get(nodeId);
1973-
if (failureMessages == null) {
1974-
failureMessages = new ArrayList<String>();
1975-
logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
1976-
} else {
1977-
if (failureMessages.size()
1978-
== maxLogAggregationDiagnosticsInMemory) {
1979-
failureMessages.remove(0);
1980-
}
1981-
}
1982-
failureMessages.add(report.getDiagnosticMessage());
1983-
}
1984-
}
1985-
}
1986-
1987-
private void updateLogAggregationStatus(NodeId nodeId) {
1988-
LogAggregationStatus status =
1989-
this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
1990-
if (status.equals(LogAggregationStatus.SUCCEEDED)) {
1991-
this.logAggregationSucceed++;
1992-
} else if (status.equals(LogAggregationStatus.FAILED)) {
1993-
this.logAggregationFailed++;
1994-
}
1995-
if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
1996-
this.logAggregationStatusForAppReport =
1997-
LogAggregationStatus.SUCCEEDED;
1998-
// Since the log aggregation status for this application for all NMs
1999-
// is SUCCEEDED, it means all logs are aggregated successfully.
2000-
// We could remove all the cached log aggregation reports
2001-
this.logAggregationStatus.clear();
2002-
this.logAggregationDiagnosticsForNMs.clear();
2003-
this.logAggregationFailureMessagesForNMs.clear();
2004-
} else if (this.logAggregationSucceed + this.logAggregationFailed
2005-
== this.logAggregationStatus.size()) {
2006-
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
2007-
// We have collected the log aggregation status for all NMs.
2008-
// The log aggregation status is FAILED which means the log
2009-
// aggregation fails in some NMs. We are only interested in the
2010-
// nodes where the log aggregation is failed. So we could remove
2011-
// the log aggregation details for those succeeded NMs
2012-
for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
2013-
this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
2014-
Map.Entry<NodeId, LogAggregationReport> entry = it.next();
2015-
if (entry.getValue().getLogAggregationStatus()
2016-
.equals(LogAggregationStatus.SUCCEEDED)) {
2017-
it.remove();
2018-
}
2019-
}
2020-
// the log aggregation has finished/failed.
2021-
// and the status will not be updated anymore.
2022-
this.logAggregationDiagnosticsForNMs.clear();
2023-
}
1758+
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
1759+
return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
20241760
}
20251761

2026-
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
2027-
this.readLock.lock();
2028-
try {
2029-
List<String> failureMessages =
2030-
this.logAggregationFailureMessagesForNMs.get(nodeId);
2031-
if (failureMessages == null || failureMessages.isEmpty()) {
2032-
return StringUtils.EMPTY;
2033-
}
2034-
return StringUtils.join(failureMessages, "\n");
2035-
} finally {
2036-
this.readLock.unlock();
2037-
}
1762+
@Override
1763+
public LogAggregationStatus getLogAggregationStatusForAppReport() {
1764+
return logAggregation
1765+
.getLogAggregationStatusForAppReport(this);
20381766
}
20391767

20401768
@Override
@@ -2153,8 +1881,11 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType,
21531881
}
21541882

21551883
@VisibleForTesting
2156-
public long getLogAggregationStartTime() {
2157-
return logAggregationStartTime;
1884+
long getLogAggregationStartTime() {
1885+
return logAggregation.getLogAggregationStartTime();
21581886
}
21591887

1888+
Clock getSystemClock() {
1889+
return systemClock;
1890+
}
21601891
}

0 commit comments

Comments
 (0)