Skip to content

Commit e05ba2f

Browse files
feat: emit system resource metrics for EDOT subprocess
1 parent a77d4cd commit e05ba2f

File tree

7 files changed

+190
-21
lines changed

7 files changed

+190
-21
lines changed

internal/pkg/agent/application/application.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,16 @@ func New(
122122
override(cfg)
123123
}
124124

125+
otelExecMode := otelmanager.GetExecutionModeFromConfig(rawConfig)
126+
isOtelSubprocessExecution := otelExecMode == otelmanager.SubprocessExecutionMode
127+
125128
// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
126129
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
127130
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)
128131
if err != nil {
129132
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
130133
}
131-
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)
134+
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelSubprocessExecution)
132135

133136
runtime, err := runtime.NewManager(
134137
log,
@@ -240,7 +243,7 @@ func New(
240243
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
241244
}
242245

243-
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
246+
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
244247
if err != nil {
245248
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
246249
}

internal/pkg/agent/application/monitoring/v1_monitor.go

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ const (
5858
monitoringOutput = "monitoring"
5959
defaultMonitoringNamespace = "default"
6060
agentName = "elastic-agent"
61+
edotCollectorName = "elastic-agent/collector"
6162
metricBeatName = "metricbeat"
6263
fileBeatName = "filebeat"
6364

@@ -73,6 +74,9 @@ const (
7374
// metricset stream failure threshold before the stream is marked as DEGRADED
7475
// to avoid marking the agent degraded for transient errors, we set the default threshold to 5
7576
defaultMetricsStreamFailureThreshold = uint(5)
77+
78+
beatsMonitoringComponentInfoID = "beat/" + monitoringMetricsUnitID
79+
httpMonitoringComponentInfoID = "http/" + monitoringMetricsUnitID
7680
)
7781

7882
var (
@@ -84,10 +88,11 @@ var (
8488
// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc)
8589
// by injecting the monitoring config into an existing fleet config
8690
type BeatsMonitor struct {
87-
enabled bool // feature flag disabling whole v1 monitoring story
88-
config *monitoringConfig
89-
operatingSystem string
90-
agentInfo info.Agent
91+
enabled bool // feature flag disabling whole v1 monitoring story
92+
config *monitoringConfig
93+
operatingSystem string
94+
agentInfo info.Agent
95+
isOtelRuntimeSubprocess bool
9196
}
9297

9398
// componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use
@@ -106,14 +111,15 @@ type monitoringConfig struct {
106111
}
107112

108113
// New creates a new BeatsMonitor instance.
109-
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor {
114+
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
110115
return &BeatsMonitor{
111116
enabled: enabled,
112117
config: &monitoringConfig{
113118
C: cfg,
114119
},
115-
operatingSystem: operatingSystem,
116-
agentInfo: agentInfo,
120+
operatingSystem: operatingSystem,
121+
agentInfo: agentInfo,
122+
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
117123
}
118124
}
119125

@@ -418,12 +424,12 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo
418424
if b.config.C.MonitorMetrics {
419425
componentInfos = append(componentInfos,
420426
componentInfo{
421-
ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID),
427+
ID: beatsMonitoringComponentInfoID,
422428
BinaryName: metricBeatName,
423429
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
424430
},
425431
componentInfo{
426-
ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID),
432+
ID: httpMonitoringComponentInfoID,
427433
BinaryName: metricBeatName,
428434
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
429435
})
@@ -661,20 +667,28 @@ func (b *BeatsMonitor) getHttpStreams(
661667
"namespace": "agent",
662668
"period": metricsCollectionIntervalString,
663669
"index": indexName,
664-
"processors": processorsForAgentHttpStream(monitoringNamespace, dataset, b.agentInfo),
670+
"processors": processorsForAgentHttpStream(agentName, agentName, agentName, monitoringNamespace, dataset, b.agentInfo),
665671
}
666672
if failureThreshold != nil {
667673
agentStream[failureThresholdKey] = *failureThreshold
668674
}
669675
httpStreams = append(httpStreams, agentStream)
670676

677+
var edotSubprocessEndpoints []interface{}
678+
671679
for _, compInfo := range componentInfos {
672680
binaryName := compInfo.BinaryName
673681
if !isSupportedMetricsBinary(binaryName) {
674682
continue
675683
}
676684

677685
endpoints := []interface{}{PrefixedEndpoint(BeatsMonitoringEndpoint(compInfo.ID))}
686+
if compInfo.RuntimeManager == component.OtelRuntimeManager && compInfo.ID == httpMonitoringComponentInfoID && b.isOtelRuntimeSubprocess {
687+
// when Otel runtime is running as subprocess, we need to monitor it as a separate stream, thus we utilise the already
688+
// exposed endpoint of httpMonitoringComponentInfoID.
689+
edotSubprocessEndpoints = endpoints
690+
}
691+
678692
name := sanitizeName(binaryName)
679693

680694
// Do not create http streams if runtime-manager is otel and binary is of beat type
@@ -727,6 +741,30 @@ func (b *BeatsMonitor) getHttpStreams(
727741
}
728742
}
729743

744+
if edotSubprocessEndpoints != nil {
745+
// Otel runtime subprocess metrics are collected using the same processors as the elastic-agent since we want only the
746+
// system resources.
747+
edotSubprocessStream := map[string]any{
748+
idKey: fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID),
749+
"data_stream": map[string]interface{}{
750+
"type": "metrics",
751+
"dataset": dataset,
752+
"namespace": monitoringNamespace,
753+
},
754+
"metricsets": []interface{}{"json"},
755+
"path": "/stats",
756+
"hosts": edotSubprocessEndpoints,
757+
"namespace": "agent",
758+
"period": metricsCollectionIntervalString,
759+
"index": indexName,
760+
"processors": processorsForAgentHttpStream(agentName, edotCollectorName, edotCollectorName, monitoringNamespace, dataset, b.agentInfo),
761+
}
762+
if failureThreshold != nil {
763+
edotSubprocessStream[failureThresholdKey] = *failureThreshold
764+
}
765+
httpStreams = append(httpStreams, edotSubprocessStream)
766+
}
767+
730768
return httpStreams
731769
}
732770

@@ -933,28 +971,28 @@ func processorsForHttpStream(binaryName, unitID, dataset string, agentInfo info.
933971
}
934972

935973
// processorsForAgentHttpStream returns the processors used for the agent metric stream in the beats input.
936-
func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agent) []any {
974+
func processorsForAgentHttpStream(binaryName, processName, unitID, namespace, dataset string, agentInfo info.Agent) []any {
937975
return []interface{}{
938976
addDataStreamFieldsProcessor(dataset, namespace),
939977
addEventFieldsProcessor(dataset),
940-
addElasticAgentFieldsProcessor(agentName, agentInfo),
978+
addElasticAgentFieldsProcessor(processName, agentInfo),
941979
addAgentFieldsProcessor(agentInfo.AgentID()),
942980
addCopyFieldsProcessor(httpCopyRules(), true, false),
943981
dropFieldsProcessor([]any{"http"}, true),
944-
addComponentFieldsProcessor(agentName, agentName),
982+
addComponentFieldsProcessor(binaryName, unitID),
945983
}
946984
}
947985

948986
// addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field.
949-
func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any {
987+
func addElasticAgentFieldsProcessor(processName string, agentInfo info.Agent) map[string]any {
950988
return map[string]any{
951989
"add_fields": map[string]any{
952990
"target": "elastic_agent",
953991
"fields": map[string]any{
954992
"id": agentInfo.AgentID(),
955993
"version": agentInfo.Version(),
956994
"snapshot": agentInfo.Snapshot(),
957-
"process": binaryName,
995+
"process": processName,
958996
},
959997
},
960998
}

internal/pkg/agent/application/monitoring/v1_monitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ func TestMonitorReload(t *testing.T) {
10661066
monitorcfg.MonitorLogs = false
10671067
monitorcfg.MonitorMetrics = false
10681068

1069-
beatsMonitor := New(true, "", monitorcfg, nil)
1069+
beatsMonitor := New(true, "", monitorcfg, nil, false)
10701070
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
10711071
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
10721072

internal/pkg/agent/cmd/inspect.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/elastic/elastic-agent/internal/pkg/config"
3030
"github.com/elastic/elastic-agent/internal/pkg/config/operations"
3131
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
32+
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
3233
"github.com/elastic/elastic-agent/pkg/component"
3334
"github.com/elastic/elastic-agent/pkg/core/logger"
3435
"github.com/elastic/elastic-agent/pkg/utils"
@@ -409,7 +410,8 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component
409410
return nil, fmt.Errorf("could not load agent info: %w", err)
410411
}
411412

412-
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo)
413+
isOtelSubprocessExecution := otelmanager.IsSubprocessExecution(config)
414+
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelSubprocessExecution)
413415
return monitor.MonitoringConfig, nil
414416
}
415417

internal/pkg/otel/manager/manager.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ import (
1717

1818
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1919
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
20+
"github.com/elastic/elastic-agent/internal/pkg/config"
2021
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
2122
"github.com/elastic/elastic-agent/pkg/component"
2223
"github.com/elastic/elastic-agent/pkg/component/runtime"
24+
"github.com/elastic/elastic-agent/pkg/features"
2325

2426
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
2527
"go.opentelemetry.io/collector/confmap"
@@ -109,6 +111,25 @@ type OTelManager struct {
109111
stopTimeout time.Duration
110112
}
111113

114+
// IsSubprocessExecution returns true if the OTel runtime is running in subprocess mode.
115+
func IsSubprocessExecution(cfg *config.Config) bool {
116+
return GetExecutionModeFromConfig(cfg) == SubprocessExecutionMode
117+
}
118+
119+
// GetExecutionModeFromConfig sets the execution mode of the OTel runtime based on the config.
120+
func GetExecutionModeFromConfig(cfg *config.Config) ExecutionMode {
121+
flags, err := features.Parse(cfg)
122+
if err != nil {
123+
return EmbeddedExecutionMode
124+
}
125+
126+
if flags.OtelSubprocessExecution() {
127+
return SubprocessExecutionMode
128+
} else {
129+
return EmbeddedExecutionMode
130+
}
131+
}
132+
112133
// NewOTelManager returns a OTelManager.
113134
func NewOTelManager(
114135
logger *logger.Logger,

pkg/features/features.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@ import (
2121
// 8.11+ - default is enabled
2222
const defaultTamperProtection = true
2323

24+
// The default value of otel runtime subprocess execution mode if the flag is missing
25+
const defaultOtelSubprocessMode = false
26+
2427
var (
2528
current = Flags{
26-
tamperProtection: defaultTamperProtection,
29+
tamperProtection: defaultTamperProtection,
30+
otelSubprocessExecution: defaultOtelSubprocessMode,
2731
}
2832
)
2933

@@ -36,7 +40,8 @@ type Flags struct {
3640
fqdn bool
3741
fqdnCallbacks map[string]BoolValueOnChangeCallback
3842

39-
tamperProtection bool
43+
tamperProtection bool
44+
otelSubprocessExecution bool
4045
}
4146

4247
type cfg struct {
@@ -48,6 +53,9 @@ type cfg struct {
4853
TamperProtection *struct {
4954
Enabled bool `json:"enabled" yaml:"enabled" config:"enabled"`
5055
} `json:"tamper_protection,omitempty" yaml:"tamper_protection,omitempty" config:"tamper_protection,omitempty"`
56+
Otel *struct {
57+
SubprocessExecution bool `json:"subprocess_execution" yaml:"subprocess_execution" config:"subprocess_execution"`
58+
} `json:"otel,omitempty" yaml:"otel,omitempty" config:"otel,omitempty"`
5159
} `json:"features" yaml:"features" config:"features"`
5260
} `json:"agent" yaml:"agent" config:"agent"`
5361
}
@@ -66,6 +74,13 @@ func (f *Flags) TamperProtection() bool {
6674
return f.tamperProtection
6775
}
6876

77+
func (f *Flags) OtelSubprocessExecution() bool {
78+
f.mu.RLock()
79+
defer f.mu.RUnlock()
80+
81+
return f.otelSubprocessExecution
82+
}
83+
6984
func (f *Flags) AsProto() *proto.Features {
7085
return &proto.Features{
7186
Fqdn: &proto.FQDNFeature{
@@ -121,6 +136,14 @@ func (f *Flags) setTamperProtection(newValue bool) {
121136
f.tamperProtection = newValue
122137
}
123138

139+
// setOtelSubprocessExecution sets the value of the OtelSubprocessExecution flag in Flags.
140+
func (f *Flags) setOtelSubprocessExecution(newValue bool) {
141+
f.mu.Lock()
142+
defer f.mu.Unlock()
143+
144+
f.otelSubprocessExecution = newValue
145+
}
146+
124147
// setSource sets the source from he given cfg.
125148
func (f *Flags) setSource(c cfg) error {
126149
// Use JSON marshalling-unmarshalling to convert cfg to mapstr
@@ -186,6 +209,12 @@ func Parse(policy any) (*Flags, error) {
186209
flags.setTamperProtection(defaultTamperProtection)
187210
}
188211

212+
if parsedFlags.Agent.Features.Otel != nil {
213+
flags.setOtelSubprocessExecution(parsedFlags.Agent.Features.Otel.SubprocessExecution)
214+
} else {
215+
flags.setOtelSubprocessExecution(defaultOtelSubprocessMode)
216+
}
217+
189218
if err := flags.setSource(parsedFlags); err != nil {
190219
return nil, fmt.Errorf("error creating feature flags source: %w", err)
191220
}
@@ -208,6 +237,7 @@ func Apply(c *config.Config) error {
208237

209238
current.setFQDN(parsed.FQDN())
210239
current.setTamperProtection(parsed.TamperProtection())
240+
current.setOtelSubprocessExecution(parsed.OtelSubprocessExecution())
211241
return err
212242
}
213243

@@ -220,3 +250,8 @@ func FQDN() bool {
220250
func TamperProtection() bool {
221251
return current.TamperProtection()
222252
}
253+
254+
// OtelSubprocessExecution reports if otel subprocess execution feature is enabled
255+
func OtelSubprocessExecution() bool {
256+
return current.OtelSubprocessExecution()
257+
}

0 commit comments

Comments
 (0)