Skip to content

Commit 1641390

Browse files
authored
Merge branch 'trunk' into YARN-11424-V2
2 parents 00ffaa6 + 016362a commit 1641390

File tree

69 files changed

+3774
-249
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+3774
-249
lines changed

hadoop-client-modules/hadoop-client/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@
6969
<groupId>com.github.pjfanning</groupId>
7070
<artifactId>jersey-json</artifactId>
7171
</exclusion>
72+
<exclusion>
73+
<groupId>org.codehaus.jettison</groupId>
74+
<artifactId>jettison</artifactId>
75+
</exclusion>
7276
<exclusion>
7377
<groupId>com.sun.jersey</groupId>
7478
<artifactId>jersey-server</artifactId>
@@ -182,6 +186,10 @@
182186
<groupId>com.github.pjfanning</groupId>
183187
<artifactId>jersey-json</artifactId>
184188
</exclusion>
189+
<exclusion>
190+
<groupId>org.codehaus.jettison</groupId>
191+
<artifactId>jettison</artifactId>
192+
</exclusion>
185193
<exclusion>
186194
<groupId>io.netty</groupId>
187195
<artifactId>netty</artifactId>
@@ -233,6 +241,10 @@
233241
<groupId>com.github.pjfanning</groupId>
234242
<artifactId>jersey-json</artifactId>
235243
</exclusion>
244+
<exclusion>
245+
<groupId>org.codehaus.jettison</groupId>
246+
<artifactId>jettison</artifactId>
247+
</exclusion>
236248
<exclusion>
237249
<groupId>com.sun.jersey</groupId>
238250
<artifactId>jersey-servlet</artifactId>
@@ -290,6 +302,10 @@
290302
<groupId>com.github.pjfanning</groupId>
291303
<artifactId>jersey-json</artifactId>
292304
</exclusion>
305+
<exclusion>
306+
<groupId>org.codehaus.jettison</groupId>
307+
<artifactId>jettison</artifactId>
308+
</exclusion>
293309
<exclusion>
294310
<groupId>io.netty</groupId>
295311
<artifactId>netty</artifactId>

hadoop-common-project/hadoop-common/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@
175175
</exclusion>
176176
</exclusions>
177177
</dependency>
178+
<dependency>
179+
<!--
180+
adding jettison as direct dependency (as jersey-json's jettison dependency is vulnerable with verison 1.1),
181+
so those who depends on hadoop-common externally will get the non-vulnerable jettison
182+
-->
183+
<groupId>org.codehaus.jettison</groupId>
184+
<artifactId>jettison</artifactId>
185+
</dependency>
178186
<dependency>
179187
<groupId>com.sun.jersey</groupId>
180188
<artifactId>jersey-server</artifactId>

hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -592,17 +592,19 @@ StateStoreMetrics
592592
-----------------
593593
StateStoreMetrics shows the statistics of the State Store component in Router-based federation.
594594

595-
| Name | Description |
596-
|:---- |:---- |
597-
| `ReadsNumOps` | Number of GET transactions for State Store within an interval time of metric |
598-
| `ReadsAvgTime` | Average time of GET transactions for State Store in milliseconds |
599-
| `WritesNumOps` | Number of PUT transactions for State Store within an interval time of metric |
600-
| `WritesAvgTime` | Average time of PUT transactions for State Store in milliseconds |
601-
| `RemovesNumOps` | Number of REMOVE transactions for State Store within an interval time of metric |
602-
| `RemovesAvgTime` | Average time of REMOVE transactions for State Store in milliseconds |
603-
| `FailuresNumOps` | Number of failed transactions for State Store within an interval time of metric |
604-
| `FailuresAvgTime` | Average time of failed transactions for State Store in milliseconds |
605-
| `Cache`*BaseRecord*`Size` | Number of store records to cache in State Store |
595+
| Name | Description |
596+
|:------------------------------------------|:-----------------------------------------------------------------------------------|
597+
| `ReadsNumOps` | Number of GET transactions for State Store within an interval time of metric |
598+
| `ReadsAvgTime` | Average time of GET transactions for State Store in milliseconds |
599+
| `WritesNumOps` | Number of PUT transactions for State Store within an interval time of metric |
600+
| `WritesAvgTime` | Average time of PUT transactions for State Store in milliseconds |
601+
| `RemovesNumOps` | Number of REMOVE transactions for State Store within an interval time of metric |
602+
| `RemovesAvgTime` | Average time of REMOVE transactions for State Store in milliseconds |
603+
| `FailuresNumOps` | Number of failed transactions for State Store within an interval time of metric |
604+
| `FailuresAvgTime` | Average time of failed transactions for State Store in milliseconds |
605+
| `Cache`*BaseRecord*`Size` | Number of store records to cache in State Store |
606+
| `Cache`*BaseRecord*`LoadNumOps` | Number of times store records are loaded in the State Store Cache from State Store |
607+
| `Cache`*BaseRecord*`LoadAvgTime` | Average time of loading State Store Cache from State Store in milliseconds |
606608

607609
yarn context
608610
============

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
2121
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
2222

23+
import java.util.Collections;
2324
import java.util.HashMap;
2425
import java.util.Map;
2526

@@ -54,6 +55,7 @@ public class StateStoreMetrics implements StateStoreMBean {
5455
private MutableRate failures;
5556

5657
private Map<String, MutableGaugeInt> cacheSizes;
58+
private final Map<String, MutableRate> cacheLoadMetrics = new HashMap<>();
5759

5860
protected StateStoreMetrics() {}
5961

@@ -150,6 +152,32 @@ public void setLocationCache(String name, long count) {
150152
counter.set(count);
151153
}
152154

155+
/**
156+
* Set the cache loading metrics for the state store interface.
157+
*
158+
* @param name Name of the record of the cache.
159+
* @param value The time duration interval as the cache value.
160+
*/
161+
public void setCacheLoading(String name, long value) {
162+
String cacheLoad = "Cache" + name + "Load";
163+
MutableRate cacheLoadMetric = cacheLoadMetrics.get(cacheLoad);
164+
if (cacheLoadMetric == null) {
165+
cacheLoadMetric = registry.newRate(cacheLoad, name, false);
166+
cacheLoadMetrics.put(cacheLoad, cacheLoadMetric);
167+
}
168+
cacheLoadMetrics.get(cacheLoad).add(value);
169+
}
170+
171+
/**
172+
* Retrieve unmodifiable map of cache loading metrics.
173+
*
174+
* @return unmodifiable map of cache loading metrics.
175+
*/
176+
@VisibleForTesting
177+
public Map<String, MutableRate> getCacheLoadMetrics() {
178+
return Collections.unmodifiableMap(cacheLoadMetrics);
179+
}
180+
153181
@VisibleForTesting
154182
public void reset() {
155183
reads.resetMinMax();

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public boolean loadCache(boolean force) throws IOException {
113113
if (force || isUpdateTime()) {
114114
List<R> newRecords = null;
115115
long t = -1;
116+
long startTime = Time.monotonicNow();
116117
try {
117118
QueryResult<R> result = getDriver().get(getRecordClass());
118119
newRecords = result.getRecords();
@@ -143,6 +144,7 @@ public boolean loadCache(boolean force) throws IOException {
143144
StateStoreMetrics metrics = getDriver().getMetrics();
144145
if (metrics != null) {
145146
String recordName = getRecordClass().getSimpleName();
147+
metrics.setCacheLoading(recordName, Time.monotonicNow() - startTime);
146148
metrics.setCacheSize(recordName, this.records.size());
147149
}
148150

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
4949
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
5050
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
51+
import org.apache.hadoop.metrics2.lib.MutableRate;
52+
5153
import org.junit.After;
5254
import org.junit.AfterClass;
5355
import org.slf4j.Logger;
@@ -76,6 +78,10 @@ protected StateStoreDriver getStateStoreDriver() {
7678
return stateStore.getDriver();
7779
}
7880

81+
protected StateStoreService getStateStoreService() {
82+
return stateStore;
83+
}
84+
7985
@After
8086
public void cleanMetrics() {
8187
if (stateStore != null) {
@@ -574,6 +580,36 @@ private static Map<String, Class<?>> getFields(BaseRecord record) {
574580
return getters;
575581
}
576582

583+
public long getMountTableCacheLoadSamples(StateStoreDriver driver) throws IOException {
584+
final MutableRate mountTableCache = getMountTableCache(driver);
585+
return mountTableCache.lastStat().numSamples();
586+
}
587+
588+
private static MutableRate getMountTableCache(StateStoreDriver driver) throws IOException {
589+
StateStoreMetrics metrics = stateStore.getMetrics();
590+
final Query<MountTable> query = new Query<>(MountTable.newInstance());
591+
driver.getMultiple(MountTable.class, query);
592+
final Map<String, MutableRate> cacheLoadMetrics = metrics.getCacheLoadMetrics();
593+
final MutableRate mountTableCache = cacheLoadMetrics.get("CacheMountTableLoad");
594+
assertNotNull("CacheMountTableLoad should be present in the state store metrics",
595+
mountTableCache);
596+
return mountTableCache;
597+
}
598+
599+
public void testCacheLoadMetrics(StateStoreDriver driver, long numRefresh,
600+
double expectedHigherThan) throws IOException, IllegalArgumentException {
601+
final MutableRate mountTableCache = getMountTableCache(driver);
602+
// CacheMountTableLoadNumOps
603+
final long mountTableCacheLoadNumOps = getMountTableCacheLoadSamples(driver);
604+
assertEquals("Num of samples collected should match", numRefresh, mountTableCacheLoadNumOps);
605+
// CacheMountTableLoadAvgTime ms
606+
final double mountTableCacheLoadAvgTimeMs = mountTableCache.lastStat().mean();
607+
assertTrue(
608+
"Mean time duration for cache load is expected to be higher than " + expectedHigherThan
609+
+ " ms." + " Actual value: " + mountTableCacheLoadAvgTimeMs,
610+
mountTableCacheLoadAvgTimeMs > expectedHigherThan);
611+
}
612+
577613
/**
578614
* Get the type of field.
579615
*

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,16 @@ public void testMetrics()
7373
throws IllegalArgumentException, IllegalAccessException, IOException {
7474
testMetrics(getStateStoreDriver());
7575
}
76+
77+
@Test
78+
public void testCacheLoadMetrics() throws IOException {
79+
// inject value of CacheMountTableLoad as -1 initially, if tests get CacheMountTableLoadAvgTime
80+
// value as -1 ms, that would mean no other sample with value >= 0 would have been received and
81+
// hence this would be failure to assert that mount table avg load time is higher than -1
82+
getStateStoreService().getMetrics().setCacheLoading("MountTable", -1);
83+
long curMountTableLoadNum = getMountTableCacheLoadSamples(getStateStoreDriver());
84+
getStateStoreService().refreshCaches(true);
85+
testCacheLoadMetrics(getStateStoreDriver(), curMountTableLoadNum + 1, -1);
86+
}
87+
7688
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,16 @@ public void testInsertWithErrorDuringWrite()
115115

116116
testInsertWithErrorDuringWrite(driver, MembershipState.class);
117117
}
118+
119+
@Test
120+
public void testCacheLoadMetrics() throws IOException {
121+
// inject value of CacheMountTableLoad as -1 initially, if tests get CacheMountTableLoadAvgTime
122+
// value as -1 ms, that would mean no other sample with value >= 0 would have been received and
123+
// hence this would be failure to assert that mount table avg load time is higher than -1
124+
getStateStoreService().getMetrics().setCacheLoading("MountTable", -1);
125+
long curMountTableLoadNum = getMountTableCacheLoadSamples(getStateStoreDriver());
126+
getStateStoreService().refreshCaches(true);
127+
getStateStoreService().refreshCaches(true);
128+
testCacheLoadMetrics(getStateStoreDriver(), curMountTableLoadNum + 2, -1);
129+
}
118130
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,18 @@ public void testFetchErrors()
206206
stateStoreDriver.setEnableConcurrent(true);
207207
testFetchErrors(stateStoreDriver);
208208
}
209+
210+
@Test
211+
public void testCacheLoadMetrics() throws IOException {
212+
// inject value of CacheMountTableLoad as -1 initially, if tests get CacheMountTableLoadAvgTime
213+
// value as -1 ms, that would mean no other sample with value >= 0 would have been received and
214+
// hence this would be failure to assert that mount table avg load time is higher than -1
215+
getStateStoreService().getMetrics().setCacheLoading("MountTable", -1);
216+
long curMountTableLoadNum = getMountTableCacheLoadSamples(getStateStoreDriver());
217+
getStateStoreService().refreshCaches(true);
218+
getStateStoreService().refreshCaches(true);
219+
getStateStoreService().refreshCaches(true);
220+
testCacheLoadMetrics(getStateStoreDriver(), curMountTableLoadNum + 3, -1);
221+
}
222+
209223
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3987,17 +3987,11 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
39873987
}
39883988

39893989
if (shouldProcessExtraRedundancy(num, expectedRedundancy)) {
3990-
if (num.replicasOnStaleNodes() > 0) {
3991-
// If any of the replicas of this block are on nodes that are
3992-
// considered "stale", then these replicas may in fact have
3993-
// already been deleted. So, we cannot safely act on the
3994-
// over-replication until a later point in time, when
3995-
// the "stale" nodes have block reported.
3990+
// extra redundancy block
3991+
if (!processExtraRedundancyBlockWithoutPostpone(block, expectedRedundancy,
3992+
null, null)) {
39963993
return MisReplicationResult.POSTPONE;
39973994
}
3998-
3999-
// extra redundancy block
4000-
processExtraRedundancyBlock(block, expectedRedundancy, null, null);
40013995
return MisReplicationResult.OVER_REPLICATED;
40023996
}
40033997

@@ -4020,12 +4014,26 @@ public void setReplication(
40204014
}
40214015
}
40224016

4017+
/**
4018+
* Process blocks with redundant replicas. If there are replicas in
4019+
* stale storages, mark them in the postponedMisreplicatedBlocks.
4020+
*/
4021+
private void processExtraRedundancyBlock(final BlockInfo block,
4022+
final short replication, final DatanodeDescriptor addedNode,
4023+
DatanodeDescriptor delNodeHint) {
4024+
if (!processExtraRedundancyBlockWithoutPostpone(block, replication,
4025+
addedNode, delNodeHint)) {
4026+
postponeBlock(block);
4027+
}
4028+
}
4029+
40234030
/**
40244031
* Find how many of the containing nodes are "extra", if any.
40254032
* If there are any extras, call chooseExcessRedundancies() to
40264033
* mark them in the excessRedundancyMap.
4034+
* @return true if all redundancy replicas are removed.
40274035
*/
4028-
private void processExtraRedundancyBlock(final BlockInfo block,
4036+
private boolean processExtraRedundancyBlockWithoutPostpone(final BlockInfo block,
40294037
final short replication, final DatanodeDescriptor addedNode,
40304038
DatanodeDescriptor delNodeHint) {
40314039
assert namesystem.hasWriteLock();
@@ -4035,17 +4043,17 @@ private void processExtraRedundancyBlock(final BlockInfo block,
40354043
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
40364044
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
40374045
.getNodes(block);
4046+
boolean hasStaleStorage = false;
4047+
Set<DatanodeStorageInfo> staleStorages = new HashSet<>();
40384048
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
40394049
if (storage.getState() != State.NORMAL) {
40404050
continue;
40414051
}
40424052
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
40434053
if (storage.areBlockContentsStale()) {
4044-
LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}"
4045-
+ " since storage {} does not yet have up-to-date information.",
4046-
block, storage);
4047-
postponeBlock(block);
4048-
return;
4054+
hasStaleStorage = true;
4055+
staleStorages.add(storage);
4056+
continue;
40494057
}
40504058
if (!isExcess(cur, block)) {
40514059
if (cur.isInService()) {
@@ -4058,6 +4066,13 @@ private void processExtraRedundancyBlock(final BlockInfo block,
40584066
}
40594067
chooseExcessRedundancies(nonExcess, block, replication, addedNode,
40604068
delNodeHint);
4069+
if (hasStaleStorage) {
4070+
LOG.trace("BLOCK* processExtraRedundancyBlockWithoutPostpone: Postponing {}"
4071+
+ " since storages {} does not yet have up-to-date information.",
4072+
block, staleStorages);
4073+
return false;
4074+
}
4075+
return true;
40614076
}
40624077

40634078
private void chooseExcessRedundancies(
@@ -4071,12 +4086,14 @@ private void chooseExcessRedundancies(
40714086
if (storedBlock.isStriped()) {
40724087
chooseExcessRedundancyStriped(bc, nonExcess, storedBlock, delNodeHint);
40734088
} else {
4074-
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
4075-
bc.getStoragePolicyID());
4076-
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
4077-
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
4078-
chooseExcessRedundancyContiguous(nonExcess, storedBlock, replication,
4079-
addedNode, delNodeHint, excessTypes);
4089+
if (nonExcess.size() > replication) {
4090+
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
4091+
bc.getStoragePolicyID());
4092+
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
4093+
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
4094+
chooseExcessRedundancyContiguous(nonExcess, storedBlock, replication,
4095+
addedNode, delNodeHint, excessTypes);
4096+
}
40804097
}
40814098
}
40824099

0 commit comments

Comments
 (0)