Skip to content

Commit 77955e7

Browse files
[8.19] (backport #10003) feat: emit system resource metrics for EDOT subprocess (#10142)
* feat: emit system resource metrics for EDOT subprocess (#10003) * feat: emit system resource metrics for EDOT subprocess * ci: extend unit-tests to cover for the edot subprocess resource metrics stream * feat: add standalone monitoring server in supervised EDOT * feat: move otel execution mode feature flag to a separate package * feat: rework otel config package to avoid globals (cherry picked from commit 9f15088) # Conflicts: # internal/pkg/agent/application/monitoring/v1_monitor.go * fix: resolve conflicts --------- Co-authored-by: Panos Koutsovasilis <[email protected]>
1 parent 219ea59 commit 77955e7

File tree

11 files changed

+385
-95
lines changed

11 files changed

+385
-95
lines changed

internal/pkg/agent/application/application.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
3434
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
3535
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
36+
otelconfig "github.com/elastic/elastic-agent/internal/pkg/otel/config"
3637
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
3738
"github.com/elastic/elastic-agent/internal/pkg/queue"
3839
"github.com/elastic/elastic-agent/internal/pkg/release"
@@ -120,13 +121,16 @@ func New(
120121
override(cfg)
121122
}
122123

124+
otelExecMode := otelconfig.GetExecutionModeFromConfig(log, rawConfig)
125+
isOtelExecModeSubprocess := otelExecMode == otelmanager.SubprocessExecutionMode
126+
123127
// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
124128
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
125129
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)
126130
if err != nil {
127131
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
128132
}
129-
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)
133+
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
130134

131135
runtime, err := runtime.NewManager(
132136
log,
@@ -238,7 +242,7 @@ func New(
238242
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
239243
}
240244

241-
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
245+
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
242246
if err != nil {
243247
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
244248
}

internal/pkg/agent/application/monitoring/v1_monitor.go

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
3131
"github.com/elastic/elastic-agent/internal/pkg/config"
3232
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
33+
otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
3334
)
3435

3536
const (
@@ -84,10 +85,11 @@ var (
8485
// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc)
8586
// by injecting the monitoring config into an existing fleet config
8687
type BeatsMonitor struct {
87-
enabled bool // feature flag disabling whole v1 monitoring story
88-
config *monitoringConfig
89-
operatingSystem string
90-
agentInfo info.Agent
88+
enabled bool // feature flag disabling whole v1 monitoring story
89+
config *monitoringConfig
90+
operatingSystem string
91+
agentInfo info.Agent
92+
isOtelRuntimeSubprocess bool
9193
}
9294

9395
// componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use
@@ -106,14 +108,15 @@ type monitoringConfig struct {
106108
}
107109

108110
// New creates a new BeatsMonitor instance.
109-
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor {
111+
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
110112
return &BeatsMonitor{
111113
enabled: enabled,
112114
config: &monitoringConfig{
113115
C: cfg,
114116
},
115-
operatingSystem: operatingSystem,
116-
agentInfo: agentInfo,
117+
operatingSystem: operatingSystem,
118+
agentInfo: agentInfo,
119+
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
117120
}
118121
}
119122

@@ -349,7 +352,17 @@ func (b *BeatsMonitor) Prepare(unit string) error {
349352
return nil
350353
}
351354

352-
// Cleanup removes
355+
// Returns true if any component in the list uses the otel runtime.
356+
func usingOtelRuntime(componentInfos []componentInfo) bool {
357+
for _, ci := range componentInfos {
358+
if ci.RuntimeManager == component.OtelRuntimeManager {
359+
return true
360+
}
361+
}
362+
return false
363+
}
364+
365+
// Cleanup removes files that were created for monitoring.
353366
func (b *BeatsMonitor) Cleanup(unit string) error {
354367
if !b.Enabled() {
355368
return nil
@@ -661,13 +674,35 @@ func (b *BeatsMonitor) getHttpStreams(
661674
"namespace": "agent",
662675
"period": metricsCollectionIntervalString,
663676
"index": indexName,
664-
"processors": processorsForAgentHttpStream(monitoringNamespace, dataset, b.agentInfo),
677+
"processors": processorsForAgentHttpStream(agentName, agentName, agentName, monitoringNamespace, dataset, b.agentInfo),
665678
}
666679
if failureThreshold != nil {
667680
agentStream[failureThresholdKey] = *failureThreshold
668681
}
669682
httpStreams = append(httpStreams, agentStream)
670683

684+
if usingOtelRuntime(componentInfos) && b.isOtelRuntimeSubprocess {
685+
edotSubprocessStream := map[string]any{
686+
idKey: fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID),
687+
"data_stream": map[string]interface{}{
688+
"type": "metrics",
689+
"dataset": dataset,
690+
"namespace": monitoringNamespace,
691+
},
692+
"metricsets": []interface{}{"json"},
693+
"path": "/stats",
694+
"hosts": []interface{}{PrefixedEndpoint(otelMonitoring.EDOTMonitoringEndpoint())},
695+
"namespace": "agent",
696+
"period": metricsCollectionIntervalString,
697+
"index": indexName,
698+
"processors": processorsForAgentHttpStream(agentName, otelMonitoring.EDOTComponentID, otelMonitoring.EDOTComponentID, monitoringNamespace, dataset, b.agentInfo),
699+
}
700+
if failureThreshold != nil {
701+
edotSubprocessStream[failureThresholdKey] = *failureThreshold
702+
}
703+
httpStreams = append(httpStreams, edotSubprocessStream)
704+
}
705+
671706
for _, compInfo := range componentInfos {
672707
binaryName := compInfo.BinaryName
673708
if !isSupportedMetricsBinary(binaryName) {
@@ -933,28 +968,28 @@ func processorsForHttpStream(binaryName, unitID, dataset string, agentInfo info.
933968
}
934969

935970
// processorsForAgentHttpStream returns the processors used for the agent metric stream in the beats input.
936-
func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agent) []any {
971+
func processorsForAgentHttpStream(binaryName, processName, unitID, namespace, dataset string, agentInfo info.Agent) []any {
937972
return []interface{}{
938973
addDataStreamFieldsProcessor(dataset, namespace),
939974
addEventFieldsProcessor(dataset),
940-
addElasticAgentFieldsProcessor(agentName, agentInfo),
975+
addElasticAgentFieldsProcessor(processName, agentInfo),
941976
addAgentFieldsProcessor(agentInfo.AgentID()),
942977
addCopyFieldsProcessor(httpCopyRules(), true, false),
943978
dropFieldsProcessor([]any{"http"}, true),
944-
addComponentFieldsProcessor(agentName, agentName),
979+
addComponentFieldsProcessor(binaryName, unitID),
945980
}
946981
}
947982

948983
// addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field.
949-
func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any {
984+
func addElasticAgentFieldsProcessor(processName string, agentInfo info.Agent) map[string]any {
950985
return map[string]any{
951986
"add_fields": map[string]any{
952987
"target": "elastic_agent",
953988
"fields": map[string]any{
954989
"id": agentInfo.AgentID(),
955990
"version": agentInfo.Version(),
956991
"snapshot": agentInfo.Snapshot(),
957-
"process": binaryName,
992+
"process": processName,
958993
},
959994
},
960995
}

internal/pkg/agent/application/monitoring/v1_monitor_test.go

Lines changed: 91 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package monitoring
77
import (
88
"context"
99
"encoding/json"
10+
"fmt"
1011
"os"
1112
"path/filepath"
1213
"runtime"
@@ -932,67 +933,102 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
932933
}
933934

934935
func TestMonitoringWithOtelRuntime(t *testing.T) {
935-
agentInfo, err := info.NewAgentInfo(context.Background(), false)
936-
require.NoError(t, err, "Error creating agent info")
937-
938-
cfg := &monitoringConfig{
939-
C: &monitoringcfg.MonitoringConfig{
940-
Enabled: true,
941-
MonitorLogs: true,
942-
MonitorMetrics: true,
943-
Namespace: "test",
944-
HTTP: &monitoringcfg.MonitoringHTTPConfig{
945-
Enabled: false,
946-
},
947-
RuntimeManager: monitoringcfg.OtelRuntimeManager,
948-
},
949-
}
950-
951-
policy := map[string]any{
952-
"agent": map[string]any{
953-
"monitoring": map[string]any{
954-
"metrics": true,
955-
"logs": false,
956-
},
936+
for _, tc := range []struct {
937+
name string
938+
edotSubprocess bool
939+
}{
940+
{
941+
name: "otel runtime inprocess",
942+
edotSubprocess: false,
957943
},
958-
"outputs": map[string]any{
959-
"default": map[string]any{},
944+
{
945+
name: "otel runtime subprocess",
946+
edotSubprocess: true,
960947
},
961-
}
948+
} {
949+
t.Run(tc.name, func(t *testing.T) {
950+
agentInfo, err := info.NewAgentInfo(context.Background(), false)
951+
require.NoError(t, err, "Error creating agent info")
962952

963-
b := &BeatsMonitor{
964-
enabled: true,
965-
config: cfg,
966-
agentInfo: agentInfo,
967-
}
953+
cfg := &monitoringConfig{
954+
C: &monitoringcfg.MonitoringConfig{
955+
Enabled: true,
956+
MonitorLogs: true,
957+
MonitorMetrics: true,
958+
Namespace: "test",
959+
HTTP: &monitoringcfg.MonitoringHTTPConfig{
960+
Enabled: false,
961+
},
962+
RuntimeManager: monitoringcfg.OtelRuntimeManager,
963+
},
964+
}
968965

969-
components := []component.Component{
970-
{
971-
ID: "filestream-receiver",
972-
InputSpec: &component.InputRuntimeSpec{
973-
Spec: component.InputSpec{
974-
Command: &component.CommandSpec{
975-
Name: "filebeat",
966+
policy := map[string]any{
967+
"agent": map[string]any{
968+
"monitoring": map[string]any{
969+
"metrics": true,
970+
"logs": false,
976971
},
977972
},
978-
},
979-
RuntimeManager: component.OtelRuntimeManager,
980-
},
981-
}
982-
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
983-
require.NoError(t, err)
973+
"outputs": map[string]any{
974+
"default": map[string]any{},
975+
},
976+
}
984977

985-
// Verify that if we're using filebeat receiver, there's no filebeat input
986-
var monitoringCfg struct {
987-
Inputs []struct {
988-
ID string
989-
RuntimeManager string `mapstructure:"_runtime_experimental"`
990-
}
991-
}
992-
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
993-
require.NoError(t, err)
994-
for _, input := range monitoringCfg.Inputs {
995-
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
978+
b := &BeatsMonitor{
979+
enabled: true,
980+
config: cfg,
981+
agentInfo: agentInfo,
982+
isOtelRuntimeSubprocess: tc.edotSubprocess,
983+
}
984+
985+
components := []component.Component{
986+
{
987+
ID: "filestream-receiver",
988+
InputSpec: &component.InputRuntimeSpec{
989+
Spec: component.InputSpec{
990+
Command: &component.CommandSpec{
991+
Name: "filebeat",
992+
},
993+
},
994+
},
995+
RuntimeManager: component.OtelRuntimeManager,
996+
},
997+
}
998+
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
999+
require.NoError(t, err)
1000+
1001+
// Verify that if we're using filebeat receiver, there's no filebeat input
1002+
var monitoringCfg struct {
1003+
Inputs []struct {
1004+
ID string
1005+
RuntimeManager string `mapstructure:"_runtime_experimental"`
1006+
Streams []struct {
1007+
ID string `mapstructure:"id"`
1008+
} `mapstructure:"streams"`
1009+
}
1010+
}
1011+
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
1012+
require.NoError(t, err)
1013+
edotSubprocessStreamID := fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID)
1014+
foundEdotSubprocessStream := false
1015+
for _, input := range monitoringCfg.Inputs {
1016+
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
1017+
if !foundEdotSubprocessStream && input.ID == "metrics-monitoring-agent" {
1018+
for _, stream := range input.Streams {
1019+
if stream.ID == edotSubprocessStreamID {
1020+
foundEdotSubprocessStream = true
1021+
break
1022+
}
1023+
}
1024+
}
1025+
}
1026+
if tc.edotSubprocess {
1027+
require.True(t, foundEdotSubprocessStream, "edot subprocess stream not found")
1028+
} else {
1029+
require.False(t, foundEdotSubprocessStream, "edot subprocess stream found")
1030+
}
1031+
})
9961032
}
9971033
}
9981034

@@ -1066,7 +1102,7 @@ func TestMonitorReload(t *testing.T) {
10661102
monitorcfg.MonitorLogs = false
10671103
monitorcfg.MonitorMetrics = false
10681104

1069-
beatsMonitor := New(true, "", monitorcfg, nil)
1105+
beatsMonitor := New(true, "", monitorcfg, nil, false)
10701106
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
10711107
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
10721108

internal/pkg/agent/cmd/inspect.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/elastic/elastic-agent/internal/pkg/config"
3030
"github.com/elastic/elastic-agent/internal/pkg/config/operations"
3131
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
32+
otelconfig "github.com/elastic/elastic-agent/internal/pkg/otel/config"
33+
"github.com/elastic/elastic-agent/internal/pkg/otel/manager"
3234
"github.com/elastic/elastic-agent/pkg/component"
3335
"github.com/elastic/elastic-agent/pkg/core/logger"
3436
"github.com/elastic/elastic-agent/pkg/utils"
@@ -179,7 +181,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
179181
return fmt.Errorf("failed to detect inputs and outputs: %w", err)
180182
}
181183

182-
monitorFn, err := getMonitoringFn(ctx, cfg)
184+
monitorFn, err := getMonitoringFn(ctx, l, cfg)
183185
if err != nil {
184186
return fmt.Errorf("failed to get monitoring: %w", err)
185187
}
@@ -374,7 +376,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri
374376
return nil, err
375377
}
376378

377-
monitorFn, err := getMonitoringFn(ctx, m)
379+
monitorFn, err := getMonitoringFn(ctx, l, m)
378380
if err != nil {
379381
return nil, fmt.Errorf("failed to get monitoring: %w", err)
380382
}
@@ -393,7 +395,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri
393395
return comps, nil
394396
}
395397

396-
func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) {
398+
func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) {
397399
config, err := config.NewConfigFrom(cfg)
398400
if err != nil {
399401
return nil, err
@@ -408,8 +410,9 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component
408410
if err != nil {
409411
return nil, fmt.Errorf("could not load agent info: %w", err)
410412
}
411-
412-
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo)
413+
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
414+
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
415+
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
413416
return monitor.MonitoringConfig, nil
414417
}
415418

0 commit comments

Comments
 (0)