Skip to content

Commit 6f83110

Browse files
authored
Merge branch 'apache:trunk' into YARN-11320
2 parents a6e5f3d + f7b1bb4 commit 6f83110

File tree

37 files changed

+1769
-88
lines changed

37 files changed

+1769
-88
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/AuditConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ private AuditConstants() {
9090
*/
9191
public static final String PARAM_PROCESS = "ps";
9292

93+
/**
94+
* Header: Range for GET request data: {@value}.
95+
*/
96+
public static final String PARAM_RANGE = "rg";
97+
9398
/**
9499
* Task Attempt ID query header: {@value}.
95100
*/
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.impl;
20+
21+
import java.lang.ref.WeakReference;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.metrics2.MetricsCollector;
25+
import org.apache.hadoop.metrics2.MetricsSource;
26+
27+
import static java.util.Objects.requireNonNull;
28+
29+
/**
30+
* A weak referenced metrics source which avoids hanging on to large objects
31+
* if somehow they don't get fully closed/cleaned up.
32+
* The JVM may clean up all objects which are only weakly referenced whenever
33+
* it does a GC, <i>even if there is no memory pressure</i>.
34+
* To avoid these refs being removed, always keep a strong reference around
35+
* somewhere.
36+
*/
37+
@InterfaceAudience.Private
38+
public class WeakRefMetricsSource implements MetricsSource {
39+
40+
/**
41+
* Name to know when unregistering.
42+
*/
43+
private final String name;
44+
45+
/**
46+
* Underlying metrics source.
47+
*/
48+
private final WeakReference<MetricsSource> sourceWeakReference;
49+
50+
/**
51+
* Constructor.
52+
* @param name Name to know when unregistering.
53+
* @param source metrics source
54+
*/
55+
public WeakRefMetricsSource(final String name, final MetricsSource source) {
56+
this.name = name;
57+
this.sourceWeakReference = new WeakReference<>(requireNonNull(source));
58+
}
59+
60+
/**
61+
* If the weak reference is non null, update the metrics.
62+
* @param collector to contain the resulting metrics snapshot
63+
* @param all if true, return all metrics even if unchanged.
64+
*/
65+
@Override
66+
public void getMetrics(final MetricsCollector collector, final boolean all) {
67+
MetricsSource metricsSource = sourceWeakReference.get();
68+
if (metricsSource != null) {
69+
metricsSource.getMetrics(collector, all);
70+
}
71+
}
72+
73+
/**
74+
* Name to know when unregistering.
75+
* @return the name passed in during construction.
76+
*/
77+
public String getName() {
78+
return name;
79+
}
80+
81+
/**
82+
* Get the source, will be null if the reference has been GC'd
83+
* @return the source reference
84+
*/
85+
public MetricsSource getSource() {
86+
return sourceWeakReference.get();
87+
}
88+
89+
@Override
90+
public String toString() {
91+
return "WeakRefMetricsSource{" +
92+
"name='" + name + '\'' +
93+
", sourceWeakReference is " +
94+
(sourceWeakReference.get() == null ? "unset" : "set") +
95+
'}';
96+
}
97+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsStoreImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public long incrementCounter(final String key, final long value) {
190190
return counter.get();
191191
} else {
192192
long l = incAtomicLong(counter, value);
193-
LOG.debug("Incrementing counter {} by {} with final value {}",
193+
LOG.trace("Incrementing counter {} by {} with final value {}",
194194
key, value, l);
195195
return l;
196196
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ShellBasedIdMapping.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040

41+
import static org.apache.hadoop.util.Shell.bashQuote;
42+
4143
/**
4244
* A simple shell-based implementation of {@link IdMappingServiceProvider}
4345
* Map id to user name or group name. It does update every 15 minutes. Only a
@@ -472,26 +474,27 @@ synchronized private void updateMapIncr(final String name,
472474

473475
boolean updated = false;
474476
updateStaticMapping();
477+
String name2 = bashQuote(name);
475478

476479
if (OS.startsWith("Linux") || OS.equals("SunOS") || OS.contains("BSD")) {
477480
if (isGrp) {
478481
updated = updateMapInternal(gidNameMap, "group",
479-
getName2IdCmdNIX(name, true), ":",
482+
getName2IdCmdNIX(name2, true), ":",
480483
staticMapping.gidMapping);
481484
} else {
482485
updated = updateMapInternal(uidNameMap, "user",
483-
getName2IdCmdNIX(name, false), ":",
486+
getName2IdCmdNIX(name2, false), ":",
484487
staticMapping.uidMapping);
485488
}
486489
} else {
487490
// Mac
488491
if (isGrp) {
489492
updated = updateMapInternal(gidNameMap, "group",
490-
getName2IdCmdMac(name, true), "\\s+",
493+
getName2IdCmdMac(name2, true), "\\s+",
491494
staticMapping.gidMapping);
492495
} else {
493496
updated = updateMapInternal(uidNameMap, "user",
494-
getName2IdCmdMac(name, false), "\\s+",
497+
getName2IdCmdMac(name2, false), "\\s+",
495498
staticMapping.uidMapping);
496499
}
497500
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public static void checkWindowsCommandLineLength(String...commands)
146146
* @param arg the argument to quote
147147
* @return the quoted string
148148
*/
149-
static String bashQuote(String arg) {
149+
@InterfaceAudience.Private
150+
public static String bashQuote(String arg) {
150151
StringBuilder buffer = new StringBuilder(arg.length() + 2);
151152
buffer.append('\'')
152153
.append(arg.replace("'", "'\\''"))

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.netty.buffer.Unpooled;
2727
import io.netty.channel.ChannelHandlerContext;
2828
import io.netty.channel.ChannelInboundHandlerAdapter;
29+
import io.netty.util.ReferenceCountUtil;
2930
import org.apache.hadoop.classification.VisibleForTesting;
3031
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
3132
import org.apache.hadoop.oncrpc.security.VerifierNone;
@@ -163,8 +164,16 @@ public void stopDaemons() {}
163164
public void channelRead(ChannelHandlerContext ctx, Object msg)
164165
throws Exception {
165166
RpcInfo info = (RpcInfo) msg;
167+
try {
168+
channelRead(ctx, info);
169+
} finally {
170+
ReferenceCountUtil.release(info.data());
171+
}
172+
}
173+
174+
private void channelRead(ChannelHandlerContext ctx, RpcInfo info)
175+
throws Exception {
166176
RpcCall call = (RpcCall) info.header();
167-
168177
SocketAddress remoteAddress = info.remoteAddress();
169178
if (LOG.isTraceEnabled()) {
170179
LOG.trace(program + " procedure #" + call.getProcedure());
@@ -256,4 +265,4 @@ public int getPort() {
256265
public int getPortmapUdpTimeoutMillis() {
257266
return portmapUdpTimeoutMillis;
258267
}
259-
}
268+
}

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
129129
RpcInfo info = null;
130130
try {
131131
RpcCall callHeader = RpcCall.read(in);
132-
ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer()
133-
.slice());
132+
ByteBuf dataBuffer = buf.slice(b.position(), b.remaining());
134133

135134
info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
136135
remoteAddress);
137136
} catch (Exception exc) {
138137
LOG.info("Malformed RPC request from " + remoteAddress);
139138
} finally {
140-
buf.release();
139+
// only release buffer if it is not passed to downstream handler
140+
if (info == null) {
141+
buf.release();
142+
}
141143
}
142144

143145
if (info != null) {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,26 @@
124124
<artifactId>assertj-core</artifactId>
125125
<scope>test</scope>
126126
</dependency>
127+
<dependency>
128+
<groupId>org.junit.platform</groupId>
129+
<artifactId>junit-platform-launcher</artifactId>
130+
<scope>test</scope>
131+
</dependency>
132+
<dependency>
133+
<groupId>org.junit.jupiter</groupId>
134+
<artifactId>junit-jupiter-api</artifactId>
135+
<scope>test</scope>
136+
</dependency>
137+
<dependency>
138+
<groupId>org.junit.jupiter</groupId>
139+
<artifactId>junit-jupiter-engine</artifactId>
140+
<scope>test</scope>
141+
</dependency>
142+
<dependency>
143+
<groupId>org.junit.platform</groupId>
144+
<artifactId>junit-platform-launcher</artifactId>
145+
<scope>test</scope>
146+
</dependency>
127147
</dependencies>
128148

129149
<build>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@
138138
import org.apache.hadoop.fs.statistics.DurationTracker;
139139
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
140140
import org.apache.hadoop.fs.statistics.IOStatistics;
141-
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
142141
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
143142
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
144143
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@@ -459,6 +458,13 @@ public void initialize(URI name, Configuration originalConf)
459458
AuditSpan span = null;
460459
try {
461460
LOG.debug("Initializing S3AFileSystem for {}", bucket);
461+
if (LOG.isTraceEnabled()) {
462+
// log a full trace for deep diagnostics of where an object is created,
463+
// for tracking down memory leak issues.
464+
LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}",
465+
name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false),
466+
new RuntimeException(super.toString()));
467+
}
462468
// clone the configuration into one with propagated bucket options
463469
Configuration conf = propagateBucketOptions(originalConf, bucket);
464470
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
@@ -3999,22 +4005,18 @@ public void close() throws IOException {
39994005
}
40004006
isClosed = true;
40014007
LOG.debug("Filesystem {} is closed", uri);
4002-
if (getConf() != null) {
4003-
String iostatisticsLoggingLevel =
4004-
getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
4005-
IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
4006-
logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
4007-
}
40084008
try {
40094009
super.close();
40104010
} finally {
40114011
stopAllServices();
4012-
}
4013-
// Log IOStatistics at debug.
4014-
if (LOG.isDebugEnabled()) {
4015-
// robust extract and convert to string
4016-
LOG.debug("Statistics for {}: {}", uri,
4017-
IOStatisticsLogging.ioStatisticsToPrettyString(getIOStatistics()));
4012+
// log IO statistics, including of any file deletion during
4013+
// superclass close
4014+
if (getConf() != null) {
4015+
String iostatisticsLoggingLevel =
4016+
getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
4017+
IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
4018+
logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
4019+
}
40184020
}
40194021
}
40204022

0 commit comments

Comments
 (0)