Skip to content

Commit 2271931

Browse files
authored
Merge branch 'apache:trunk' into YARN-11223
2 parents d930629 + d383cc4 commit 2271931

File tree

76 files changed

+4167
-245
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+4167
-245
lines changed

hadoop-common-project/hadoop-common/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,7 +1151,7 @@
11511151
<id>src-test-compile-protoc-legacy</id>
11521152
<phase>generate-test-sources</phase>
11531153
<goals>
1154-
<goal>compile</goal>
1154+
<goal>test-compile</goal>
11551155
</goals>
11561156
<configuration>
11571157
<skip>false</skip>
@@ -1160,7 +1160,7 @@
11601160
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
11611161
</protocArtifact>
11621162
<includeDependenciesInDescriptorSet>false</includeDependenciesInDescriptorSet>
1163-
<protoSourceRoot>${basedir}/src/test/proto</protoSourceRoot>
1163+
<protoTestSourceRoot>${basedir}/src/test/proto</protoTestSourceRoot>
11641164
<outputDirectory>${project.build.directory}/generated-test-sources/java</outputDirectory>
11651165
<clearOutputDirectory>false</clearOutputDirectory>
11661166
<includes>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.io.compress;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* An exception class for when a closed compressor/decopressor is being used
25+
* {@link org.apache.hadoop.io.compress.Compressor}
26+
* {@link org.apache.hadoop.io.compress.Decompressor}
27+
*/
28+
public class AlreadyClosedException extends IOException {
29+
30+
public AlreadyClosedException(String message) {
31+
super(message);
32+
}
33+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public static void returnCompressor(Compressor compressor) {
205205
}
206206
// if the compressor can't be reused, don't pool it.
207207
if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
208+
compressor.end();
208209
return;
209210
}
210211
compressor.reset();
@@ -225,6 +226,7 @@ public static void returnDecompressor(Decompressor decompressor) {
225226
}
226227
// if the decompressor can't be reused, don't pool it.
227228
if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
229+
decompressor.end();
228230
return;
229231
}
230232
decompressor.reset();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.zip.GZIPOutputStream;
2525

2626
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.io.compress.AlreadyClosedException;
2728
import org.apache.hadoop.io.compress.Compressor;
2829
import org.apache.hadoop.io.compress.DoNotPool;
2930
import org.apache.hadoop.util.DataChecksum;
@@ -83,6 +84,10 @@ public int compress(byte[] b, int off, int len) throws IOException {
8384
throw new IOException("compress called on finished compressor");
8485
}
8586

87+
if (state == BuiltInGzipDecompressor.GzipStateLabel.ENDED) {
88+
throw new AlreadyClosedException("compress called on closed compressor");
89+
}
90+
8691
int compressedBytesWritten = 0;
8792

8893
// If we are not within uncompressed data yet, output the header.
@@ -139,6 +144,8 @@ public long getBytesWritten() {
139144
@Override
140145
public void end() {
141146
deflater.end();
147+
148+
state = BuiltInGzipDecompressor.GzipStateLabel.ENDED;
142149
}
143150

144151
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.zip.DataFormatException;
2424
import java.util.zip.Inflater;
2525

26+
import org.apache.hadoop.io.compress.AlreadyClosedException;
2627
import org.apache.hadoop.io.compress.Decompressor;
2728
import org.apache.hadoop.io.compress.DoNotPool;
2829
import org.apache.hadoop.util.DataChecksum;
@@ -109,7 +110,11 @@ public enum GzipStateLabel {
109110
* Immediately after the trailer (and potentially prior to the next gzip
110111
* member/substream header), without reset() having been called.
111112
*/
112-
FINISHED;
113+
FINISHED,
114+
/**
115+
* Immediately after end() has been called.
116+
*/
117+
ENDED;
113118
}
114119

115120
/**
@@ -186,6 +191,10 @@ public synchronized int decompress(byte[] b, int off, int len)
186191
throws IOException {
187192
int numAvailBytes = 0;
188193

194+
if (state == GzipStateLabel.ENDED) {
195+
throw new AlreadyClosedException("decompress called on closed decompressor");
196+
}
197+
189198
if (state != GzipStateLabel.DEFLATE_STREAM) {
190199
executeHeaderState();
191200

@@ -476,6 +485,8 @@ public synchronized void reset() {
476485
@Override
477486
public synchronized void end() {
478487
inflater.end();
488+
489+
state = GzipStateLabel.ENDED;
479490
}
480491

481492
/**

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@
2424
import java.util.Arrays;
2525
import java.util.List;
2626
import java.util.concurrent.CompletableFuture;
27-
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.TimeUnit;
2828
import java.util.function.IntFunction;
2929

3030
import org.assertj.core.api.Assertions;
31-
import org.junit.Assert;
3231
import org.junit.Test;
3332
import org.junit.runner.RunWith;
3433
import org.junit.runners.Parameterized;
@@ -43,13 +42,14 @@
4342
import org.apache.hadoop.fs.Path;
4443
import org.apache.hadoop.fs.impl.FutureIOSupport;
4544
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
46-
import org.apache.hadoop.test.LambdaTestUtils;
4745

4846
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
4947
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
5048
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
5149
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
5250
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
51+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
52+
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
5353

5454
@RunWith(Parameterized.class)
5555
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
@@ -281,16 +281,11 @@ public void testEOFRanges() throws Exception {
281281
in.readVectored(fileRanges, allocate);
282282
for (FileRange res : fileRanges) {
283283
CompletableFuture<ByteBuffer> data = res.getData();
284-
try {
285-
ByteBuffer buffer = data.get();
286-
// Shouldn't reach here.
287-
Assert.fail("EOFException must be thrown while reading EOF");
288-
} catch (ExecutionException ex) {
289-
// ignore as expected.
290-
} catch (Exception ex) {
291-
LOG.error("Exception while running vectored read ", ex);
292-
Assert.fail("Exception while running vectored read " + ex);
293-
}
284+
interceptFuture(EOFException.class,
285+
"",
286+
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
287+
TimeUnit.SECONDS,
288+
data);
294289
}
295290
}
296291
}
@@ -410,7 +405,7 @@ protected <T extends Throwable> void verifyExceptionalVectoredRead(
410405
fs.openFile(path(VECTORED_READ_FILE_NAME))
411406
.build();
412407
try (FSDataInputStream in = builder.get()) {
413-
LambdaTestUtils.intercept(clazz,
408+
intercept(clazz,
414409
() -> in.readVectored(fileRanges, allocate));
415410
}
416411
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121

22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.OutputStream;
25+
import java.util.Random;
2226
import java.util.concurrent.Callable;
2327
import java.util.concurrent.ExecutorService;
2428
import java.util.concurrent.Executors;
2529
import java.util.concurrent.LinkedBlockingDeque;
2630
import java.util.concurrent.TimeUnit;
2731

2832
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
34+
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
35+
import org.apache.hadoop.test.LambdaTestUtils;
2936
import org.junit.Before;
3037
import org.junit.Test;
3138

@@ -189,4 +196,56 @@ public void testDecompressorNotReturnSameInstance() {
189196
CodecPool.returnDecompressor(decompressor);
190197
}
191198
}
199+
200+
@Test(timeout = 10000)
201+
public void testDoNotPoolCompressorNotUseableAfterReturn() throws Exception {
202+
203+
final GzipCodec gzipCodec = new GzipCodec();
204+
gzipCodec.setConf(new Configuration());
205+
206+
// BuiltInGzipCompressor is an explicit example of a Compressor with the @DoNotPool annotation
207+
final Compressor compressor = new BuiltInGzipCompressor(new Configuration());
208+
CodecPool.returnCompressor(compressor);
209+
210+
final CompressionOutputStream outputStream =
211+
gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor);
212+
LambdaTestUtils.intercept(
213+
AlreadyClosedException.class,
214+
"compress called on closed compressor",
215+
"Compressor from Codec with @DoNotPool should not be " +
216+
"useable after returning to CodecPool",
217+
() -> outputStream.write(1));
218+
}
219+
220+
@Test(timeout = 10000)
221+
public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception {
222+
223+
final GzipCodec gzipCodec = new GzipCodec();
224+
gzipCodec.setConf(new Configuration());
225+
226+
final Random random = new Random();
227+
final byte[] bytes = new byte[1024];
228+
random.nextBytes(bytes);
229+
230+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
231+
try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) {
232+
outputStream.write(bytes);
233+
}
234+
235+
final byte[] gzipBytes = baos.toByteArray();
236+
final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes);
237+
238+
// BuiltInGzipDecompressor is an explicit example of a Decompressor
239+
// with the @DoNotPool annotation
240+
final Decompressor decompressor = new BuiltInGzipDecompressor();
241+
CodecPool.returnDecompressor(decompressor);
242+
243+
final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor);
244+
LambdaTestUtils.intercept(
245+
AlreadyClosedException.class,
246+
"decompress called on closed decompressor",
247+
"Decompressor from Codec with @DoNotPool should not be " +
248+
"useable after returning to CodecPool",
249+
() -> inputStream.read());
250+
}
192251
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import javax.management.ObjectName;
5151
import javax.management.StandardMBean;
5252

53+
import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
5354
import org.apache.hadoop.conf.Configuration;
5455
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
5556
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -113,6 +114,8 @@ public class RBFMetrics implements RouterMBean, FederationMBean {
113114
/** Prevent holding the page from load too long. */
114115
private final long timeOut;
115116

117+
/** Enable/Disable getNodeUsage. **/
118+
private boolean enableGetDNUsage;
116119

117120
/** Router interface. */
118121
private final Router router;
@@ -175,6 +178,8 @@ public RBFMetrics(Router router) throws IOException {
175178
Configuration conf = router.getConfig();
176179
this.timeOut = conf.getTimeDuration(RBFConfigKeys.DN_REPORT_TIME_OUT,
177180
RBFConfigKeys.DN_REPORT_TIME_OUT_MS_DEFAULT, TimeUnit.MILLISECONDS);
181+
this.enableGetDNUsage = conf.getBoolean(RBFConfigKeys.DFS_ROUTER_ENABLE_GET_DN_USAGE_KEY,
182+
RBFConfigKeys.DFS_ROUTER_ENABLE_GET_DN_USAGE_DEFAULT);
178183
this.topTokenRealOwners = conf.getInt(
179184
RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY,
180185
RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY_DEFAULT);
@@ -184,6 +189,11 @@ public RBFMetrics(Router router) throws IOException {
184189
ms.register(RBFMetrics.class.getName(), "RBFActivity Metrics", this);
185190
}
186191

192+
@VisibleForTesting
193+
public void setEnableGetDNUsage(boolean enableGetDNUsage) {
194+
this.enableGetDNUsage = enableGetDNUsage;
195+
}
196+
187197
/**
188198
* Unregister the JMX beans.
189199
*/
@@ -537,35 +547,34 @@ public int getNumEnteringMaintenanceDataNodes() {
537547

538548
@Override // NameNodeMXBean
539549
public String getNodeUsage() {
540-
float median = 0;
541-
float max = 0;
542-
float min = 0;
543-
float dev = 0;
550+
double median = 0;
551+
double max = 0;
552+
double min = 0;
553+
double dev = 0;
544554

545555
final Map<String, Map<String, Object>> info = new HashMap<>();
546556
try {
547-
RouterRpcServer rpcServer = this.router.getRpcServer();
548-
DatanodeInfo[] live = rpcServer.getDatanodeReport(
549-
DatanodeReportType.LIVE, false, timeOut);
557+
DatanodeInfo[] live = null;
558+
if (this.enableGetDNUsage) {
559+
RouterRpcServer rpcServer = this.router.getRpcServer();
560+
live = rpcServer.getDatanodeReport(DatanodeReportType.LIVE, false, timeOut);
561+
} else {
562+
LOG.debug("Getting node usage is disabled.");
563+
}
550564

551-
if (live.length > 0) {
552-
float totalDfsUsed = 0;
553-
float[] usages = new float[live.length];
565+
if (live != null && live.length > 0) {
566+
double[] usages = new double[live.length];
554567
int i = 0;
555568
for (DatanodeInfo dn : live) {
556569
usages[i++] = dn.getDfsUsedPercent();
557-
totalDfsUsed += dn.getDfsUsedPercent();
558570
}
559-
totalDfsUsed /= live.length;
560571
Arrays.sort(usages);
561572
median = usages[usages.length / 2];
562573
max = usages[usages.length - 1];
563574
min = usages[0];
564575

565-
for (i = 0; i < usages.length; i++) {
566-
dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed);
567-
}
568-
dev = (float) Math.sqrt(dev / usages.length);
576+
StandardDeviation deviation = new StandardDeviation();
577+
dev = deviation.evaluate(usages);
569578
}
570579
} catch (IOException e) {
571580
LOG.error("Cannot get the live nodes: {}", e.getMessage());

0 commit comments

Comments
 (0)