Skip to content

Commit a441ebe

Browse files
authored
Ingest internal telemetry from the OTel Collector when it is running (#9928)
Add a monitoring component to ingest telemetry values from the collector's prometheus endpoint into the `elastic_agent.collector` dataset. This includes a remapping script processor to convert the collector's metrics to their closest ECS equivalents when possible. This is a temporary approach that will not scale to full collector metrics in a general configuration, but gives us a backwards-compatible stopgap during the initial monitoring-only rollout.
1 parent 47112bd commit a441ebe

File tree

13 files changed

+975
-447
lines changed

13 files changed

+975
-447
lines changed

NOTICE-fips.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.

NOTICE.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: enhancement
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Include OTel Collector internal telemetry in Agent monitoring
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: elastic-agent
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ require (
124124
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.135.0
125125
go.opentelemetry.io/collector/receiver/nopreceiver v0.135.0
126126
go.opentelemetry.io/collector/receiver/otlpreceiver v0.135.0
127+
go.opentelemetry.io/collector/service v0.135.0
127128
go.opentelemetry.io/ebpf-profiler v0.0.202536
128129
go.uber.org/zap v1.27.0
129130
go.yaml.in/yaml/v3 v3.0.4
@@ -721,7 +722,6 @@ require (
721722
go.opentelemetry.io/collector/scraper v0.135.0 // indirect
722723
go.opentelemetry.io/collector/scraper/scraperhelper v0.135.0 // indirect
723724
go.opentelemetry.io/collector/semconv v0.128.1-0.20250610090210-188191247685 // indirect
724-
go.opentelemetry.io/collector/service v0.135.0 // indirect
725725
go.opentelemetry.io/collector/service/hostcapabilities v0.135.0 // indirect
726726
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
727727
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 // indirect

internal/pkg/agent/application/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func New(
133133
if err != nil {
134134
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
135135
}
136-
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
136+
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, rawConfig.OTel, agentInfo, isOtelExecModeSubprocess)
137137

138138
runtime, err := runtime.NewManager(
139139
log,

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ type MonitorManager interface {
105105
// Enabled when configured to collect metrics/logs.
106106
Enabled() bool
107107

108-
// Reload reloads the configuration for the upgrade manager.
108+
// Reload reloads the configuration for the monitoring manager.
109109
Reload(rawConfig *config.Config) error
110110

111111
// MonitoringConfig injects monitoring configuration into resolved ast tree.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// A script for use in the Beats script processor, to remap raw OTel telemetry
2+
// from its prometheus endpoint to backwards-compatible Beats metrics fields
3+
// that can be viewed in Agent dashboards.
4+
5+
function process(event) {
6+
// This hard-coded exporter name will not work for the general
7+
// (non-monitoring) use case.
8+
var elastic_exporter = event.Get("prometheus.labels.exporter") == "elasticsearch/_agent-component/monitoring";
9+
var elastic_scope = event.Get("prometheus.labels.otel_scope_name") == "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter";
10+
11+
// We accept general collector fields that are scoped to the elasticsearch
12+
// exporter (queue metrics, sent / error stats), or fields specifically
13+
// scoped to the elasticsearch exporter (custom elastic metrics).
14+
if (!elastic_exporter && !elastic_scope) {
15+
event.Cancel();
16+
return;
17+
}
18+
19+
// Hack: if the scope is elastic-custom fields, deterministically mangle the
20+
// agent.id. Since the label set is different, these are passed through in
21+
// different events, and if we don't do this one of the events will be
22+
// rejected as a duplicate since they have the same component id, agent id,
23+
// and metricset.
24+
var id = event.Get("agent.id");
25+
if (id != null && id.length > 0) {
26+
// Increment / wrap the last hex character of the uuid
27+
var prefix = id.substring(0, id.length - 1);
28+
var last = id.substring(id.length - 1);
29+
var rotated = "0";
30+
if (last < "f") {
31+
rotated = String.fromCharCode(last.charCodeAt(0) + 1);
32+
}
33+
id = prefix + rotated;
34+
event.Put("agent.id", id);
35+
}
36+
37+
// The event will be discarded unless we find some valid metric to convert.
38+
var keep_event = false;
39+
40+
var queue_size = event.Get("prometheus.metrics.otelcol_exporter_queue_size");
41+
var queue_capacity = event.Get("prometheus.metrics.otelcol_exporter_queue_capacity");
42+
if (queue_size != null) {
43+
keep_event = true;
44+
event.Put("beat.stats.libbeat.pipeline.queue.filled.events", queue_size);
45+
}
46+
if (queue_capacity != null) {
47+
keep_event = true;
48+
event.Put("beat.stats.libbeat.pipeline.queue.max_events", queue_capacity);
49+
}
50+
if (queue_size != null && queue_capacity != null) {
51+
var queue_pct = queue_size / queue_capacity;
52+
if (!isNaN(queue_pct)) {
53+
event.Put("beat.stats.libbeat.pipeline.queue.filled.pct", queue_pct);
54+
}
55+
}
56+
57+
var total_sent = 0;
58+
var total_sent_valid = false;
59+
// Add send statistics from all source types
60+
var sent_logs = event.Get("prometheus.metrics.otelcol_exporter_sent_log_records_total");
61+
if (sent_logs != null) {
62+
total_sent += sent_logs;
63+
total_sent_valid = true;
64+
}
65+
var sent_spans = event.Get("prometheus.metrics.otelcol_exporter_sent_spans_total");
66+
if (sent_spans != null) {
67+
total_sent += sent_spans;
68+
total_sent_valid = true;
69+
}
70+
var sent_metrics = event.Get("prometheus.metrics.otelcol_exporter_sent_metric_points_total");
71+
if (sent_metrics != null) {
72+
total_sent += sent_metrics;
73+
total_sent_valid = true;
74+
}
75+
if (total_sent_valid) {
76+
event.Put("beat.stats.libbeat.output.events.acked", total_sent);
77+
keep_event = true;
78+
}
79+
80+
var total_failed = 0;
81+
var total_failed_valid = false;
82+
// Add failed statistics from all source types
83+
var failed_logs = event.Get("prometheus.metrics.otelcol_exporter_send_failed_log_records_total");
84+
if (failed_logs != null) {
85+
total_failed += failed_logs;
86+
total_failed_valid = true;
87+
}
88+
var failed_spans = event.Get("prometheus.metrics.otelcol_exporter_send_failed_spans_total");
89+
if (failed_spans != null) {
90+
total_failed += failed_spans;
91+
total_failed_valid = true;
92+
}
93+
var failed_metrics = event.Get("prometheus.metrics.otelcol_exporter_send_failed_metric_points_total");
94+
if (failed_metrics != null) {
95+
total_failed += failed_metrics;
96+
total_failed_valid = true;
97+
}
98+
if (total_failed_valid) {
99+
event.Put("beat.stats.libbeat.output.events.dropped", total_failed);
100+
keep_event = true;
101+
}
102+
103+
var flushed_bytes = event.Get("prometheus.metrics.otelcol_elasticsearch_flushed_bytes_total");
104+
if (flushed_bytes != null) {
105+
event.Put("beat.stats.libbeat.output.write.bytes", flushed_bytes);
106+
keep_event = true;
107+
}
108+
109+
var retried_docs = event.Get("prometheus.metrics.otelcol_elasticsearch_docs_retried_ratio_total");
110+
if (retried_docs != null) {
111+
// "failed" in the beats metric means an event failed to ingest but was
112+
// not dropped, and will be retried.
113+
event.Put("beat.stats.libbeat.output.events.failed", retried_docs);
114+
keep_event = true;
115+
}
116+
117+
var request_count = event.Get("prometheus.metrics.otelcol_elasticsearch_bulk_requests_count_ratio_total");
118+
if (request_count != null) {
119+
// This is not an exact semantic match for how Beats measures batch count,
120+
// but it's close.
121+
event.Put("beat.stats.libbeat.output.events.batches", request_count);
122+
keep_event = true;
123+
}
124+
125+
var processed_docs_count = event.Get("prometheus.metrics.otelcol_elasticsearch_docs_processed_ratio_total");
126+
if (processed_docs_count != null) {
127+
// Approximate semantic match: the otel metric counts all document
128+
// ingestion attempts, including success, failure, and retries,
129+
// which is a better match for the Beats definition of total events
130+
// than otelcol_elasticsearch_docs_received_ratio_total which
131+
// includes only unique events seen (regardless of retries etc).
132+
event.Put("beat.stats.libbeat.output.events.total", processed_docs_count);
133+
keep_event = true;
134+
}
135+
136+
if (!keep_event) {
137+
event.Cancel();
138+
}
139+
}

0 commit comments

Comments
 (0)