Skip to content

Commit 59a955d

Browse files
Kimahrimanaajisaka
authored andcommitted
HADOOP-17804. Expose prometheus metrics only after a flush and dedupe with tag values (#3369)
Signed-off-by: Akira Ajisaka <[email protected]> (cherry picked from commit 4ced012)
1 parent 1f61944 commit 59a955d

File tree

2 files changed

+168
-40
lines changed

2 files changed

+168
-40
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.io.IOException;
2828
import java.io.Writer;
29+
import java.util.Collection;
2930
import java.util.Map;
3031
import java.util.concurrent.ConcurrentHashMap;
3132
import java.util.regex.Pattern;
@@ -42,7 +43,10 @@ public class PrometheusMetricsSink implements MetricsSink {
4243
/**
4344
* Cached output lines for each metrics.
4445
*/
45-
private final Map<String, String> metricLines = new ConcurrentHashMap<>();
46+
private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetrics =
47+
new ConcurrentHashMap<>();
48+
private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> nextPromMetrics =
49+
new ConcurrentHashMap<>();
4650

4751
private static final Pattern SPLIT_PATTERN =
4852
Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
@@ -53,42 +57,16 @@ public PrometheusMetricsSink() {
5357

5458
@Override
5559
public void putMetrics(MetricsRecord metricsRecord) {
56-
for (AbstractMetric metrics : metricsRecord.metrics()) {
57-
if (metrics.type() == MetricType.COUNTER
58-
|| metrics.type() == MetricType.GAUGE) {
60+
for (AbstractMetric metric : metricsRecord.metrics()) {
61+
if (metric.type() == MetricType.COUNTER
62+
|| metric.type() == MetricType.GAUGE) {
5963

6064
String key = prometheusName(
61-
metricsRecord.name(), metrics.name());
62-
63-
StringBuilder builder = new StringBuilder();
64-
builder.append("# TYPE ")
65-
.append(key)
66-
.append(" ")
67-
.append(metrics.type().toString().toLowerCase())
68-
.append("\n")
69-
.append(key)
70-
.append("{");
71-
String sep = "";
72-
73-
//add tags
74-
for (MetricsTag tag : metricsRecord.tags()) {
75-
String tagName = tag.name().toLowerCase();
76-
77-
//ignore specific tag which includes sub-hierarchy
78-
if (!tagName.equals("numopenconnectionsperuser")) {
79-
builder.append(sep)
80-
.append(tagName)
81-
.append("=\"")
82-
.append(tag.value())
83-
.append("\"");
84-
sep = ",";
85-
}
86-
}
87-
builder.append("} ");
88-
builder.append(metrics.value());
89-
builder.append("\n");
90-
metricLines.put(key, builder.toString());
65+
metricsRecord.name(), metric.name());
9166

67+
nextPromMetrics.computeIfAbsent(key,
68+
any -> new ConcurrentHashMap<>())
69+
.put(metricsRecord.tags(), metric);
9270
}
9371
}
9472
}
@@ -108,17 +86,55 @@ public String prometheusName(String recordName,
10886

10987
@Override
11088
public void flush() {
111-
89+
promMetrics = nextPromMetrics;
90+
nextPromMetrics = new ConcurrentHashMap<>();
11291
}
11392

11493
@Override
115-
public void init(SubsetConfiguration subsetConfiguration) {
116-
94+
public void init(SubsetConfiguration conf) {
11795
}
11896

11997
public void writeMetrics(Writer writer) throws IOException {
120-
for (String line : metricLines.values()) {
121-
writer.write(line);
98+
for (Map.Entry<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetric :
99+
promMetrics.entrySet()) {
100+
AbstractMetric firstMetric = promMetric.getValue().values().iterator().next();
101+
102+
StringBuilder builder = new StringBuilder();
103+
builder.append("# HELP ")
104+
.append(promMetric.getKey())
105+
.append(" ")
106+
.append(firstMetric.description())
107+
.append("\n")
108+
.append("# TYPE ")
109+
.append(promMetric.getKey())
110+
.append(" ")
111+
.append(firstMetric.type().toString().toLowerCase())
112+
.append("\n");
113+
114+
for (Map.Entry<Collection<MetricsTag>, AbstractMetric> metric :
115+
promMetric.getValue().entrySet()) {
116+
builder.append(promMetric.getKey())
117+
.append("{");
118+
119+
String sep = "";
120+
for (MetricsTag tag : metric.getKey()) {
121+
String tagName = tag.name().toLowerCase();
122+
123+
if (!tagName.equals("numopenconnectionsperuser")) {
124+
builder.append(sep)
125+
.append(tagName)
126+
.append("=\"")
127+
.append(tag.value())
128+
.append("\"");
129+
sep = ",";
130+
}
131+
}
132+
builder.append("} ");
133+
builder.append(metric.getValue().value());
134+
builder.append("\n");
135+
}
136+
137+
writer.write(builder.toString());
122138
}
123139
}
124140
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.metrics2.MetricsSystem;
2525
import org.apache.hadoop.metrics2.annotation.Metric;
2626
import org.apache.hadoop.metrics2.annotation.Metrics;
27+
import org.apache.hadoop.metrics2.annotation.Metric.Type;
2728
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
2829
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
2930

@@ -48,7 +49,6 @@ public void testPublish() throws IOException {
4849
TestMetrics testMetrics = metrics
4950
.register("TestMetrics", "Testing metrics", new TestMetrics());
5051

51-
metrics.start();
5252
testMetrics.numBucketCreateFails.incr();
5353
metrics.publishMetricsNow();
5454
ByteArrayOutputStream stream = new ByteArrayOutputStream();
@@ -67,6 +67,104 @@ public void testPublish() throws IOException {
6767
"test_metrics_num_bucket_create_fails{context=\"dfs\"")
6868
);
6969

70+
metrics.unregisterSource("TestMetrics");
71+
metrics.stop();
72+
metrics.shutdown();
73+
}
74+
75+
/**
76+
* Fix for HADOOP-17804, make sure Prometheus metrics get deduped based on metric
77+
* and tags, not just the metric.
78+
*/
79+
@Test
80+
public void testPublishMultiple() throws IOException {
81+
//GIVEN
82+
MetricsSystem metrics = DefaultMetricsSystem.instance();
83+
84+
metrics.init("test");
85+
PrometheusMetricsSink sink = new PrometheusMetricsSink();
86+
metrics.register("Prometheus", "Prometheus", sink);
87+
TestMetrics testMetrics1 = metrics
88+
.register("TestMetrics1", "Testing metrics", new TestMetrics("1"));
89+
TestMetrics testMetrics2 = metrics
90+
.register("TestMetrics2", "Testing metrics", new TestMetrics("2"));
91+
92+
testMetrics1.numBucketCreateFails.incr();
93+
testMetrics2.numBucketCreateFails.incr();
94+
metrics.publishMetricsNow();
95+
ByteArrayOutputStream stream = new ByteArrayOutputStream();
96+
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
97+
98+
//WHEN
99+
sink.writeMetrics(writer);
100+
writer.flush();
101+
102+
//THEN
103+
String writtenMetrics = stream.toString(UTF_8.name());
104+
System.out.println(writtenMetrics);
105+
Assert.assertTrue(
106+
"The expected first metric line is missing from prometheus metrics output",
107+
writtenMetrics.contains(
108+
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
109+
);
110+
Assert.assertTrue(
111+
"The expected second metric line is missing from prometheus metrics output",
112+
writtenMetrics.contains(
113+
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
114+
);
115+
116+
metrics.unregisterSource("TestMetrics1");
117+
metrics.unregisterSource("TestMetrics2");
118+
metrics.stop();
119+
metrics.shutdown();
120+
}
121+
122+
/**
123+
* Fix for HADOOP-17804, make sure Prometheus metrics start fresh after each flush.
124+
*/
125+
@Test
126+
public void testPublishFlush() throws IOException {
127+
//GIVEN
128+
MetricsSystem metrics = DefaultMetricsSystem.instance();
129+
130+
metrics.init("test");
131+
PrometheusMetricsSink sink = new PrometheusMetricsSink();
132+
metrics.register("Prometheus", "Prometheus", sink);
133+
TestMetrics testMetrics = metrics
134+
.register("TestMetrics", "Testing metrics", new TestMetrics("1"));
135+
136+
testMetrics.numBucketCreateFails.incr();
137+
metrics.publishMetricsNow();
138+
139+
metrics.unregisterSource("TestMetrics");
140+
testMetrics = metrics
141+
.register("TestMetrics", "Testing metrics", new TestMetrics("2"));
142+
143+
testMetrics.numBucketCreateFails.incr();
144+
metrics.publishMetricsNow();
145+
146+
ByteArrayOutputStream stream = new ByteArrayOutputStream();
147+
OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
148+
149+
//WHEN
150+
sink.writeMetrics(writer);
151+
writer.flush();
152+
153+
//THEN
154+
String writtenMetrics = stream.toString(UTF_8.name());
155+
System.out.println(writtenMetrics);
156+
Assert.assertFalse(
157+
"The first metric should not exist after flushing",
158+
writtenMetrics.contains(
159+
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
160+
);
161+
Assert.assertTrue(
162+
"The expected metric line is missing from prometheus metrics output",
163+
writtenMetrics.contains(
164+
"test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
165+
);
166+
167+
metrics.unregisterSource("TestMetrics");
70168
metrics.stop();
71169
metrics.shutdown();
72170
}
@@ -126,6 +224,20 @@ public void testNamingWhitespaces() {
126224
*/
127225
@Metrics(about = "Test Metrics", context = "dfs")
128226
private static class TestMetrics {
227+
private String id;
228+
229+
TestMetrics() {
230+
this("1");
231+
}
232+
233+
TestMetrics(String id) {
234+
this.id = id;
235+
}
236+
237+
@Metric(value={"testTag", ""}, type=Type.TAG)
238+
String testTag1() {
239+
return "testTagValue" + id;
240+
}
129241

130242
@Metric
131243
private MutableCounterLong numBucketCreateFails;

0 commit comments

Comments
 (0)