Skip to content

Commit 0ced163

Browse files
committed
HADOOP-16830. HADOOP-17271. Checkstyle, findbugs and mock failures.
Test failure was just a mock brittleness issue, just took a long time with the debugger to work out what. An NPE was being raised in S3AFileSystem.getDurationFactory() as the instrumentation was null; this was happening in commit abort and so swallowed. Fix: * getDurationFactory() to return null if instrumentation is null; trackDuration code is all robust to this * Mock overrides the new method (This is why I hate mocks BTW. They can simulate failures, but are so, so brittle). Also: * Tests on the trackDuration calls now make sure they are all invoked. * Some copy and paste of the trackDuration logic which somewhat reduces the depth of lambda-expressions. * Serialization of IOStatisticsSnapshot: method to list required classes and a test to verify this holds. To aid secure use. * IOStatisticsLogging.logToStortedString is now ioStatisticsToPrettyString and filters out all empty/unset values (e.g counters == 0, min/max unset, means with samples == 0. Makes for leaner logs Findbugs. Once S3AInstrumentation declares itself an IOStatisticsSource, all nested classes need to do this.getIOStatistics to stop findbugs warning. Change-Id: I7d1da267c442b1e22dd025be97457cd4a6ae5306
1 parent 231819c commit 0ced163

File tree

20 files changed

+279
-69
lines changed

20 files changed

+279
-69
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsLogging.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import javax.annotation.Nullable;
2222
import java.util.Map;
2323
import java.util.TreeMap;
24+
import java.util.function.Predicate;
2425

2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -86,22 +87,28 @@ public static String ioStatisticsToString(
8687
}
8788

8889
/**
89-
* Convert IOStatistics to a string form, with all the metrics sorted.
90+
* Convert IOStatistics to a string form, with all the metrics sorted
91+
* and empty value stripped.
9092
* This is more expensive than the simple conversion, so should only
9193
* be used for logging/output where it's known/highly likely that the
9294
* caller wants to see the values. Not for debug logging.
9395
* @param statistics A statistics instance.
9496
* @return string value or the empty string if null
9597
*/
96-
public static String ioStatisticsToSortedString(
98+
public static String ioStatisticsToPrettyString(
9799
@Nullable final IOStatistics statistics) {
98100
if (statistics != null) {
99101
StringBuilder sb = new StringBuilder();
100-
mapToSortedString(sb, "counters", statistics.counters());
101-
mapToSortedString(sb, "\ngauges", statistics.gauges());
102-
mapToSortedString(sb, "\nminimums", statistics.minimums());
103-
mapToSortedString(sb, "\nmaximums", statistics.maximums());
104-
mapToSortedString(sb, "\nmeans", statistics.meanStatistics());
102+
mapToSortedString(sb, "counters", statistics.counters(),
103+
p -> p == 0);
104+
mapToSortedString(sb, "\ngauges", statistics.gauges(),
105+
p -> p == 0);
106+
mapToSortedString(sb, "\nminimums", statistics.minimums(),
107+
p -> p < 0);
108+
mapToSortedString(sb, "\nmaximums", statistics.maximums(),
109+
p -> p < 0);
110+
mapToSortedString(sb, "\nmeans", statistics.meanStatistics(),
111+
MeanStatistic::isEmpty);
105112

106113
return sb.toString();
107114
} else {
@@ -147,8 +154,9 @@ private static <E> void mapToString(StringBuilder sb,
147154
*/
148155
private static <E> void mapToSortedString(StringBuilder sb,
149156
final String type,
150-
final Map<String, E> map) {
151-
mapToString(sb, type, sortedMap(map), "\n");
157+
final Map<String, E> map,
158+
final Predicate<E> isEmpty) {
159+
mapToString(sb, type, sortedMap(map, isEmpty), "\n");
152160
}
153161

154162
/**
@@ -160,9 +168,14 @@ private static <E> void mapToSortedString(StringBuilder sb,
160168
* @return a treemap with all the entries.
161169
*/
162170
private static <E> Map<String, E> sortedMap(
163-
final Map<String, E> source) {
171+
final Map<String, E> source,
172+
final Predicate<E> isEmpty) {
164173
Map<String, E> tm = new TreeMap<>();
165-
tm.putAll(source);
174+
for (Map.Entry<String, E> entry : source.entrySet()) {
175+
if (!isEmpty.test(entry.getValue())) {
176+
tm.put(entry.getKey(), entry.getValue());
177+
}
178+
}
166179
return tm;
167180
}
168181

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import java.io.ObjectInputStream;
2424
import java.io.ObjectOutputStream;
2525
import java.io.Serializable;
26+
import java.util.Arrays;
27+
import java.util.List;
2628
import java.util.Map;
2729
import java.util.TreeMap;
2830
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.stream.Collectors;
2932

3033
import com.fasterxml.jackson.annotation.JsonProperty;
3134

@@ -41,13 +44,19 @@
4144

4245
/**
4346
* Snapshot of statistics from a different source.
44-
* <p></p>
47+
* <p>
4548
* It is serializable so that frameworks which can use java serialization
4649
* to propagate data (Spark, Flink...) can send the statistics
4750
* back. For this reason, TreeMaps are explicitly used as field types,
4851
* even though IDEs can recommend use of Map instead.
49-
* <p></p>
52+
* For security reasons, untrusted java object streams should never be
53+
* deserialized. If for some reason this is required, use
54+
* {@link #requiredSerializationClasses()} to get the list of classes
55+
* used when deserializing instances of this object.
56+
* <p>
57+
* <p>
5058
* It is annotated for correct serializations with jackson2.
59+
* </p>
5160
*/
5261
@SuppressWarnings("CollectionDeclaredAsConcreteClass")
5362
@InterfaceAudience.Public
@@ -57,6 +66,16 @@ public final class IOStatisticsSnapshot
5766

5867
private static final long serialVersionUID = -1762522703841538084L;
5968

69+
/**
70+
* List of chasses needed to deserialize.
71+
*/
72+
private static final Class[] DESERIALIZATION_CLASSES = {
73+
IOStatisticsSnapshot.class,
74+
TreeMap.class,
75+
Long.class,
76+
MeanStatistic.class,
77+
};
78+
6079
/**
6180
* Counters.
6281
*/
@@ -253,4 +272,14 @@ private void readObject(final ObjectInputStream s)
253272
(TreeMap<String, MeanStatistic>) s.readObject());
254273
}
255274

275+
/**
276+
* What classes are needed to deserialize this class?
277+
* Needed to securely unmarshall this from untrusted sources.
278+
* @return a list of required classes to deserialize the data.
279+
*/
280+
public static List<Class> requiredSerializationClasses() {
281+
return Arrays.stream(DESERIALIZATION_CLASSES)
282+
.collect(Collectors.toList());
283+
}
284+
256285
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public final class StoreStatisticNames {
179179
= "object_delete_request";
180180

181181
/**
182-
* The count of objects deleted in delete requests
182+
* The count of objects deleted in delete requests.
183183
*/
184184
public static final String OBJECT_DELETE_OBJECTS
185185
= "object_delete_objects";

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,23 @@ public static void trackDurationOfInvocation(
445445
DurationTrackerFactory factory,
446446
String statistic,
447447
InvocationRaisingIOE input) throws IOException {
448-
trackDurationOfOperation(factory, statistic,
449-
input.asCallable()).apply();
448+
// create the tracker outside try-with-resources so
449+
// that failures can be set in the catcher.
450+
DurationTracker tracker = createTracker(factory, statistic);
451+
try {
452+
// exec the input function and return its value
453+
input.apply();
454+
} catch (IOException | RuntimeException e) {
455+
// input function failed: note it
456+
tracker.failed();
457+
// and rethrow
458+
throw e;
459+
} finally {
460+
// update the tracker.
461+
// this is called after the catch() call would have
462+
// set the failed flag.
463+
tracker.close();
464+
}
450465
}
451466

452467
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/InvocationRaisingIOE.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,4 @@ public interface InvocationRaisingIOE {
3939
*/
4040
void apply() throws IOException;
4141

42-
/**
43-
* Convert to a callable for ease of passing around.
44-
* @return the callable.
45-
*/
46-
default CallableRaisingIOE<Void> asCallable() {
47-
return () -> {
48-
apply();
49-
return null;
50-
};
51-
}
5242
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractStreamIOStatisticsTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323

2424
import org.assertj.core.api.Assertions;
25+
import org.junit.AfterClass;
2526
import org.junit.Test;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -31,11 +32,15 @@
3132
import org.apache.hadoop.fs.FileSystem;
3233
import org.apache.hadoop.fs.Path;
3334
import org.apache.hadoop.fs.statistics.IOStatistics;
35+
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
36+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
3437

3538
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
3639
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
3740
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
3841
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatisticsSource;
42+
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
43+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
3944
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
4045
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
4146

@@ -54,6 +59,33 @@ public abstract class AbstractContractStreamIOStatisticsTest
5459
private static final Logger LOG =
5560
LoggerFactory.getLogger(AbstractContractStreamIOStatisticsTest.class);
5661

62+
/**
63+
* FileSystem statistics are collected across every test case.
64+
*/
65+
protected static final IOStatisticsSnapshot FILESYSTEM_IOSTATS =
66+
snapshotIOStatistics();
67+
68+
@Override
69+
public void teardown() throws Exception {
70+
final FileSystem fs = getFileSystem();
71+
if (fs instanceof IOStatisticsSource) {
72+
FILESYSTEM_IOSTATS.aggregate(((IOStatisticsSource)fs).getIOStatistics());
73+
}
74+
super.teardown();
75+
}
76+
77+
/**
78+
* Dump the filesystem statistics after the class if contains any values
79+
*/
80+
@AfterClass
81+
public static void dumpFileSystemIOStatistics() {
82+
if (!FILESYSTEM_IOSTATS.counters().isEmpty()) {
83+
// if there is at least one counter
84+
LOG.info("Aggregate FileSystem Statistics {}",
85+
ioStatisticsToPrettyString(FILESYSTEM_IOSTATS));
86+
}
87+
}
88+
5789
@Test
5890
public void testOutputStreamStatisticKeys() throws Throwable {
5991
describe("Look at the statistic keys of an output stream");

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import java.io.ByteArrayInputStream;
2222
import java.io.ByteArrayOutputStream;
2323
import java.io.IOException;
24+
import java.io.InputStream;
2425
import java.io.ObjectInputStream;
2526
import java.io.ObjectOutputStream;
27+
import java.io.ObjectStreamClass;
2628
import java.io.Serializable;
29+
import java.util.List;
2730
import java.util.Map;
31+
import java.util.stream.Collectors;
2832

2933
import org.assertj.core.api.AbstractLongAssert;
3034
import org.assertj.core.api.ObjectAssert;
@@ -489,10 +493,37 @@ public static IOStatistics statisticsJavaRoundTrip(final IOStatistics stat)
489493
}
490494
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
491495
IOStatistics deser;
492-
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
496+
try (ObjectInputStream ois = new RestrictedInput(bais,
497+
IOStatisticsSnapshot.requiredSerializationClasses())) {
493498
deser = (IOStatistics) ois.readObject();
494499
}
495500
return deser;
496501
}
497502

503+
private static final class RestrictedInput extends ObjectInputStream {
504+
505+
private final List<String> allowedClasses;
506+
507+
private RestrictedInput(final InputStream in,
508+
final List<Class> allowedClasses) throws IOException {
509+
510+
super(in);
511+
this.allowedClasses = allowedClasses.stream()
512+
.map(Class::getName)
513+
.collect(Collectors.toList());
514+
}
515+
516+
@Override
517+
protected Class<?> resolveClass(final ObjectStreamClass desc)
518+
throws IOException, ClassNotFoundException {
519+
final String classname = desc.getName();
520+
if (!allowedClasses.contains(classname)) {
521+
throw new ClassNotFoundException("Class " + classname
522+
+ " Not in list of allowed classes");
523+
}
524+
525+
return super.resolveClass(desc);
526+
}
527+
}
528+
498529
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.FileNotFoundException;
2222
import java.io.IOException;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.atomic.AtomicInteger;
2425
import java.util.function.Function;
2526

2627
import org.junit.After;
@@ -57,6 +58,8 @@ public class TestDurationTracking extends AbstractHadoopTestBase {
5758

5859
private IOStatisticsStore stats;
5960

61+
private final AtomicInteger invocationCounter = new AtomicInteger(0);
62+
6063
@Before
6164
public void setup() {
6265
stats = iostatisticsStore()
@@ -91,31 +94,47 @@ public void testDurationTryWithResources() throws Throwable {
9194

9295
/**
9396
* A little sleep method; exceptions are swallowed.
97+
* Increments {@link #invocationCounter}.
98+
* Increments {@inheritDoc #atomicCounter}.
9499
*/
95100
public void sleep() {
96101
sleepf(10);
97102
}
98103

99104
/**
100105
* A little sleep function; exceptions are swallowed.
106+
* Increments {@link #invocationCounter}.
101107
*/
102108
protected int sleepf(final int millis) {
109+
invocationCounter.incrementAndGet();
103110
try {
104111
Thread.sleep(millis);
105112
} catch (InterruptedException ignored) {
106113
}
107114
return millis;
108115
}
109116

117+
/**
118+
* Assert that the sleep counter has been invoked
119+
* the expected number of times.
120+
* @param expected expected value
121+
*/
122+
private void assertCounterValue(final int expected) {
123+
assertThat(invocationCounter.get())
124+
.describedAs("Sleep invocation Counter")
125+
.isEqualTo(expected);
126+
}
127+
110128
/**
111129
* Test that a function raising an IOE can be wrapped.
112130
*/
113131
@Test
114132
public void testDurationFunctionIOE() throws Throwable {
115133
FunctionRaisingIOE<Integer, Integer> fn =
116134
trackFunctionDuration(stats, REQUESTS,
117-
(Integer x) -> x);
118-
assertThat(fn.apply(1)).isEqualTo(1);
135+
(Integer x) -> invocationCounter.getAndSet(x));
136+
assertThat(fn.apply(1)).isEqualTo(0);
137+
assertCounterValue(1);
119138
assertSummaryValues(
120139
fetchSuccessSummary(stats, REQUESTS),
121140
1, 0, 0);
@@ -194,6 +213,7 @@ public void testCallableFailureDuration() throws Throwable {
194213
sleepf(100);
195214
throw new RuntimeException("oops");
196215
}));
216+
assertCounterValue(1);
197217
assertSummaryValues(
198218
fetchSuccessSummary(stats, REQUESTS),
199219
1, -1, -1);
@@ -208,7 +228,10 @@ public void testCallableFailureDuration() throws Throwable {
208228
@Test
209229
public void testInvocationDuration() throws Throwable {
210230
// call the operation
211-
trackDurationOfInvocation(stats, REQUESTS, () -> sleepf(100));
231+
trackDurationOfInvocation(stats, REQUESTS, () -> {
232+
sleepf(100);
233+
});
234+
assertCounterValue(1);
212235
DurationStatisticSummary summary = fetchSuccessSummary(stats, REQUESTS);
213236
assertSummaryValues(summary, 1, 0, 0);
214237
assertSummaryMean(summary, 1, 0);

0 commit comments

Comments
 (0)