Skip to content

Commit 82c1dd5

Browse files
committed
Upgrade metric collection
1 parent 70b0d89 commit 82c1dd5

File tree

2 files changed

+92
-59
lines changed

2 files changed

+92
-59
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,11 @@ private static ClientOverrideConfiguration createClientOverrideConfiguration(
289289
}
290290
}
291291

292+
if (parameters.getMetrics() != null) {
293+
clientOverrideConfigBuilder.addMetricPublisher(
294+
new AwsStatisticsCollector(parameters.getMetrics()));
295+
}
296+
292297
final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
293298
clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());
294299

@@ -390,10 +395,6 @@ private void configureBasicParams(AmazonS3Builder builder,
390395
builder.withClientConfiguration(awsConf);
391396
builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
392397

393-
if (parameters.getMetrics() != null) {
394-
builder.withMetricsCollector(
395-
new AwsStatisticsCollector(parameters.getMetrics()));
396-
}
397398
if (parameters.getMonitoringListener() != null) {
398399
builder.withMonitoringListener(parameters.getMonitoringListener());
399400
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/AwsStatisticsCollector.java

Lines changed: 87 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -21,33 +21,28 @@
2121
import java.time.Duration;
2222
import java.util.function.Consumer;
2323
import java.util.function.LongConsumer;
24+
import java.util.stream.Collectors;
25+
import java.util.stream.Stream;
2426

25-
import com.amazonaws.Request;
26-
import com.amazonaws.Response;
27-
import com.amazonaws.metrics.RequestMetricCollector;
28-
import com.amazonaws.util.TimingInfo;
27+
import software.amazon.awssdk.core.metrics.CoreMetric;
28+
import software.amazon.awssdk.http.HttpMetric;
29+
import software.amazon.awssdk.http.HttpStatusCode;
30+
import software.amazon.awssdk.metrics.MetricCollection;
31+
import software.amazon.awssdk.metrics.MetricPublisher;
32+
import software.amazon.awssdk.metrics.SdkMetric;
2933

3034
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
3135

32-
import static com.amazonaws.util.AWSRequestMetrics.Field.ClientExecuteTime;
33-
import static com.amazonaws.util.AWSRequestMetrics.Field.HttpClientRetryCount;
34-
import static com.amazonaws.util.AWSRequestMetrics.Field.HttpRequestTime;
35-
import static com.amazonaws.util.AWSRequestMetrics.Field.RequestCount;
36-
import static com.amazonaws.util.AWSRequestMetrics.Field.RequestMarshallTime;
37-
import static com.amazonaws.util.AWSRequestMetrics.Field.RequestSigningTime;
38-
import static com.amazonaws.util.AWSRequestMetrics.Field.ResponseProcessingTime;
39-
import static com.amazonaws.util.AWSRequestMetrics.Field.ThrottleException;
40-
4136
/**
4237
* Collect statistics from the AWS SDK and forward to an instance of
4338
* {@link StatisticsFromAwsSdk} and thence into the S3A statistics.
4439
* <p>
4540
* See {@code com.facebook.presto.hive.s3.PrestoS3FileSystemMetricCollector}
4641
* for the inspiration for this.
4742
* <p>
48-
* See {@code com.amazonaws.util.AWSRequestMetrics} for metric names.
43+
* See {@code software.amazon.awssdk.core.metrics.CoreMetric} for metric names.
4944
*/
50-
public class AwsStatisticsCollector extends RequestMetricCollector {
45+
public class AwsStatisticsCollector implements MetricPublisher {
5146

5247
/**
5348
* final destination of updates.
@@ -69,61 +64,98 @@ public AwsStatisticsCollector(final StatisticsFromAwsSdk collector) {
6964
* @param response AWS response
7065
*/
7166
@Override
72-
public void collectMetrics(
73-
final Request<?> request,
74-
final Response<?> response) {
75-
76-
TimingInfo timingInfo = request.getAWSRequestMetrics().getTimingInfo();
77-
78-
counter(timingInfo, HttpClientRetryCount.name(),
79-
collector::updateAwsRetryCount);
80-
counter(timingInfo, RequestCount.name(),
81-
collector::updateAwsRequestCount);
82-
counter(timingInfo, ThrottleException.name(),
83-
collector::updateAwsThrottleExceptionsCount);
84-
85-
timing(timingInfo, ClientExecuteTime.name(),
86-
collector::noteAwsClientExecuteTime);
87-
timing(timingInfo, HttpRequestTime.name(),
88-
collector::noteAwsRequestTime);
89-
timing(timingInfo, RequestMarshallTime.name(),
90-
collector::noteRequestMarshallTime);
91-
timing(timingInfo, RequestSigningTime.name(),
92-
collector::noteRequestSigningTime);
93-
timing(timingInfo, ResponseProcessingTime.name(),
94-
collector::noteResponseProcessingTime);
67+
public void publish(MetricCollection metricCollection) {
68+
final long[] throttling = {0};
69+
recurseThroughChildren(metricCollection)
70+
.collect(Collectors.toList())
71+
.forEach(m -> {
72+
counter(m, CoreMetric.RETRY_COUNT, retries -> {
73+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.HttpClientRetryCount
74+
collector.updateAwsRetryCount(retries);
75+
76+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.RequestCount (always HttpClientRetryCount+1)
77+
collector.updateAwsRequestCount(retries + 1);
78+
});
79+
80+
// TODO: confirm replacement
81+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.ThrottleException
82+
counter(m, HttpMetric.HTTP_STATUS_CODE, statusCode -> {
83+
if (statusCode == HttpStatusCode.THROTTLING) {
84+
throttling[0] += 1;
85+
}
86+
});
87+
88+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.ClientExecuteTime
89+
timing(m, CoreMetric.API_CALL_DURATION,
90+
collector::noteAwsClientExecuteTime);
91+
92+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.HttpRequestTime
93+
timing(m, CoreMetric.SERVICE_CALL_DURATION,
94+
collector::noteAwsRequestTime);
95+
96+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.RequestMarshallTime
97+
timing(m, CoreMetric.MARSHALLING_DURATION,
98+
collector::noteRequestMarshallTime);
99+
100+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.RequestSigningTime
101+
timing(m, CoreMetric.SIGNING_DURATION,
102+
collector::noteRequestSigningTime);
103+
104+
// TODO: confirm replacement
105+
// Replaces com.amazonaws.util.AWSRequestMetrics.Field.ResponseProcessingTime
106+
timing(m, CoreMetric.UNMARSHALLING_DURATION,
107+
collector::noteResponseProcessingTime);
108+
});
109+
110+
collector.updateAwsThrottleExceptionsCount(throttling[0]);
111+
}
112+
113+
@Override
114+
public void close() {
115+
95116
}
96117

97118
/**
98119
* Process a timing.
99-
* @param timingInfo timing info
100-
* @param subMeasurementName sub measurement
120+
* @param collection metric collection
121+
* @param metric metric
101122
* @param durationConsumer consumer
102123
*/
103124
private void timing(
104-
TimingInfo timingInfo,
105-
String subMeasurementName,
125+
MetricCollection collection,
126+
SdkMetric<Duration> metric,
106127
Consumer<Duration> durationConsumer) {
107-
TimingInfo t1 = timingInfo.getSubMeasurement(subMeasurementName);
108-
if (t1 != null && t1.getTimeTakenMillisIfKnown() != null) {
109-
durationConsumer.accept(Duration.ofMillis(
110-
t1.getTimeTakenMillisIfKnown().longValue()));
111-
}
128+
collection
129+
.metricValues(metric)
130+
.forEach(v -> durationConsumer.accept(v));
112131
}
113132

114133
/**
115134
* Process a counter.
116-
* @param timingInfo timing info
117-
* @param subMeasurementName sub measurement
135+
* @param collection metric collection
136+
* @param metric metric
118137
* @param consumer consumer
119138
*/
120139
private void counter(
121-
TimingInfo timingInfo,
122-
String subMeasurementName,
140+
MetricCollection collection,
141+
SdkMetric<Integer> metric,
123142
LongConsumer consumer) {
124-
Number n = timingInfo.getCounter(subMeasurementName);
125-
if (n != null) {
126-
consumer.accept(n.longValue());
127-
}
143+
collection
144+
.metricValues(metric)
145+
.forEach(v -> consumer.accept(v.longValue()));
146+
}
147+
148+
/**
149+
* Metric collections can be nested. Exposes a stream of the given
150+
* collection and its nested children.
151+
* @param metrics initial collection
152+
* @return a stream of all nested metric collections
153+
*/
154+
private static Stream<MetricCollection> recurseThroughChildren(
155+
MetricCollection metrics) {
156+
return Stream.concat(
157+
Stream.of(metrics),
158+
metrics.children().stream()
159+
.flatMap(c -> recurseThroughChildren(c)));
128160
}
129161
}

0 commit comments

Comments
 (0)