Skip to content

Commit 5829790

Browse files
feat: add standalone monitoring server in supervised EDOT
1 parent a4da5e0 commit 5829790

File tree

7 files changed

+198
-56
lines changed

7 files changed

+198
-56
lines changed

internal/pkg/agent/application/monitoring/v1_monitor.go

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
3131
"github.com/elastic/elastic-agent/internal/pkg/config"
3232
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
33+
otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
3334
)
3435

3536
const (
@@ -58,7 +59,6 @@ const (
5859
monitoringOutput = "monitoring"
5960
defaultMonitoringNamespace = "default"
6061
agentName = "elastic-agent"
61-
edotCollectorName = "elastic-agent/collector"
6262
metricBeatName = "metricbeat"
6363
fileBeatName = "filebeat"
6464

@@ -74,9 +74,6 @@ const (
7474
// metricset stream failure threshold before the stream is marked as DEGRADED
7575
// to avoid marking the agent degraded for transient errors, we set the default threshold to 5
7676
defaultMetricsStreamFailureThreshold = uint(5)
77-
78-
beatsMonitoringComponentInfoID = "beat/" + monitoringMetricsUnitID
79-
httpMonitoringComponentInfoID = "http/" + monitoringMetricsUnitID
8077
)
8178

8279
var (
@@ -355,6 +352,16 @@ func (b *BeatsMonitor) Prepare(unit string) error {
355352
return nil
356353
}
357354

355+
// Returns true if any component in the list uses the otel runtime.
356+
func usingOtelRuntime(componentInfos []componentInfo) bool {
357+
for _, ci := range componentInfos {
358+
if ci.RuntimeManager == component.OtelRuntimeManager {
359+
return true
360+
}
361+
}
362+
return false
363+
}
364+
358365
// Cleanup removes files that were created for monitoring.
359366
func (b *BeatsMonitor) Cleanup(unit string) error {
360367
if !b.Enabled() {
@@ -424,12 +431,12 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo
424431
if b.config.C.MonitorMetrics {
425432
componentInfos = append(componentInfos,
426433
componentInfo{
427-
ID: beatsMonitoringComponentInfoID,
434+
ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID),
428435
BinaryName: metricBeatName,
429436
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
430437
},
431438
componentInfo{
432-
ID: httpMonitoringComponentInfoID,
439+
ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID),
433440
BinaryName: metricBeatName,
434441
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
435442
})
@@ -674,7 +681,27 @@ func (b *BeatsMonitor) getHttpStreams(
674681
}
675682
httpStreams = append(httpStreams, agentStream)
676683

677-
var edotSubprocessEndpoints []interface{}
684+
if usingOtelRuntime(componentInfos) && b.isOtelRuntimeSubprocess {
685+
edotSubprocessStream := map[string]any{
686+
idKey: fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID),
687+
"data_stream": map[string]interface{}{
688+
"type": "metrics",
689+
"dataset": dataset,
690+
"namespace": monitoringNamespace,
691+
},
692+
"metricsets": []interface{}{"json"},
693+
"path": "/stats",
694+
"hosts": []interface{}{PrefixedEndpoint(otelMonitoring.EDOTMonitoringEndpoint())},
695+
"namespace": "agent",
696+
"period": metricsCollectionIntervalString,
697+
"index": indexName,
698+
"processors": processorsForAgentHttpStream(agentName, otelMonitoring.EDOTComponentID, otelMonitoring.EDOTComponentID, monitoringNamespace, dataset, b.agentInfo),
699+
}
700+
if failureThreshold != nil {
701+
edotSubprocessStream[failureThresholdKey] = *failureThreshold
702+
}
703+
httpStreams = append(httpStreams, edotSubprocessStream)
704+
}
678705

679706
for _, compInfo := range componentInfos {
680707
binaryName := compInfo.BinaryName
@@ -683,12 +710,6 @@ func (b *BeatsMonitor) getHttpStreams(
683710
}
684711

685712
endpoints := []interface{}{PrefixedEndpoint(BeatsMonitoringEndpoint(compInfo.ID))}
686-
if compInfo.RuntimeManager == component.OtelRuntimeManager && compInfo.ID == httpMonitoringComponentInfoID && b.isOtelRuntimeSubprocess {
687-
// when Otel runtime is running as subprocess, we need to monitor it as a separate stream, thus we utilise the already
688-
// exposed endpoint of httpMonitoringComponentInfoID.
689-
edotSubprocessEndpoints = endpoints
690-
}
691-
692713
name := sanitizeName(binaryName)
693714

694715
// Do not create http streams if runtime-manager is otel and binary is of beat type
@@ -741,30 +762,6 @@ func (b *BeatsMonitor) getHttpStreams(
741762
}
742763
}
743764

744-
if edotSubprocessEndpoints != nil {
745-
// Otel runtime subprocess metrics are collected using the same processors as the elastic-agent since we want only the
746-
// system resources.
747-
edotSubprocessStream := map[string]any{
748-
idKey: fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID),
749-
"data_stream": map[string]interface{}{
750-
"type": "metrics",
751-
"dataset": dataset,
752-
"namespace": monitoringNamespace,
753-
},
754-
"metricsets": []interface{}{"json"},
755-
"path": "/stats",
756-
"hosts": edotSubprocessEndpoints,
757-
"namespace": "agent",
758-
"period": metricsCollectionIntervalString,
759-
"index": indexName,
760-
"processors": processorsForAgentHttpStream(agentName, edotCollectorName, edotCollectorName, monitoringNamespace, dataset, b.agentInfo),
761-
}
762-
if failureThreshold != nil {
763-
edotSubprocessStream[failureThresholdKey] = *failureThreshold
764-
}
765-
httpStreams = append(httpStreams, edotSubprocessStream)
766-
}
767-
768765
return httpStreams
769766
}
770767

internal/pkg/agent/cmd/otel.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/elastic/elastic-agent/internal/pkg/otel"
2424
"github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider"
2525
"github.com/elastic/elastic-agent/internal/pkg/otel/manager"
26+
"github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
2627
"github.com/elastic/elastic-agent/internal/pkg/release"
2728
"github.com/elastic/elastic-agent/pkg/core/logger"
2829
)
@@ -45,10 +46,14 @@ func newOtelCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Comman
4546
if err != nil {
4647
return err
4748
}
49+
supervisedMonitoringURL, err := cmd.Flags().GetString(manager.OtelSupervisedMonitoringURLFlagName)
50+
if err != nil {
51+
return err
52+
}
4853
if err := prepareEnv(); err != nil {
4954
return err
5055
}
51-
return RunCollector(cmd.Context(), cfgFiles, supervised, supervisedLoggingLevel)
56+
return RunCollector(cmd.Context(), cfgFiles, supervised, supervisedLoggingLevel, supervisedMonitoringURL)
5257
},
5358
PreRun: func(c *cobra.Command, args []string) {
5459
// hide inherited flags not to bloat help with flags not related to otel
@@ -76,7 +81,7 @@ func hideInheritedFlags(c *cobra.Command) {
7681
})
7782
}
7883

79-
func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool, supervisedLoggingLevel string) error {
84+
func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool, supervisedLoggingLevel string, supervisedMonitoringURL string) error {
8085
settings, err := prepareCollectorSettings(configFiles, supervised, supervisedLoggingLevel)
8186
if err != nil {
8287
return fmt.Errorf("failed to prepare collector settings: %w", err)
@@ -89,6 +94,15 @@ func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool,
8994
service.WaitExecutionDone()
9095
}()
9196

97+
if supervisedMonitoringURL != "" {
98+
server, err := monitoring.NewServer(settings.log, supervisedMonitoringURL)
99+
if err != nil {
100+
return fmt.Errorf("error create monitoring server: %w", err)
101+
}
102+
server.Start()
103+
defer server.Stop()
104+
}
105+
92106
service.BeforeRun()
93107
defer service.Cleanup()
94108

@@ -102,18 +116,23 @@ func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool,
102116
defer cancel()
103117
go service.ProcessWindowsControlEvents(stopCollector)
104118

105-
return otel.Run(ctx, stop, settings)
119+
return otel.Run(ctx, stop, settings.otelSettings)
120+
}
121+
122+
type edotSettings struct {
123+
log *logger.Logger
124+
otelSettings *otelcol.CollectorSettings
106125
}
107126

108-
func prepareCollectorSettings(configFiles []string, supervised bool, supervisedLoggingLevel string) (*otelcol.CollectorSettings, error) {
109-
var settings *otelcol.CollectorSettings
127+
func prepareCollectorSettings(configFiles []string, supervised bool, supervisedLoggingLevel string) (edotSettings, error) {
128+
var settings edotSettings
110129
if supervised {
111130
// add stdin config provider
112131
configProvider, err := agentprovider.NewBufferProvider(os.Stdin)
113132
if err != nil {
114-
return nil, fmt.Errorf("failed to create config provider: %w", err)
133+
return settings, fmt.Errorf("failed to create config provider: %w", err)
115134
}
116-
settings = otel.NewSettings(release.Version(), []string{configProvider.URI()},
135+
settings.otelSettings = otel.NewSettings(release.Version(), []string{configProvider.URI()},
117136
otel.WithConfigProviderFactory(configProvider.NewFactory()),
118137
)
119138

@@ -138,20 +157,21 @@ func prepareCollectorSettings(configFiles []string, supervised bool, supervisedL
138157

139158
l, err := logger.NewFromConfig("edot", defaultCfg, defaultEventLogCfg, false)
140159
if err != nil {
141-
return nil, fmt.Errorf("failed to create logger: %w", err)
160+
return settings, fmt.Errorf("failed to create logger: %w", err)
142161
}
162+
settings.log = l
143163

144164
if logLevelSettingErr != nil {
145165
l.Warnf("Fallback to default logging level due to: %v", logLevelSettingErr)
146166
}
147167

148-
settings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core {
168+
settings.otelSettings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core {
149169
return l.Core()
150170
})}
151171

152-
settings.DisableGracefulShutdown = false
172+
settings.otelSettings.DisableGracefulShutdown = false
153173
} else {
154-
settings = otel.NewSettings(release.Version(), configFiles)
174+
settings.otelSettings = otel.NewSettings(release.Version(), configFiles)
155175
}
156176
return settings, nil
157177
}

internal/pkg/agent/cmd/otel_flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ func setupOtelFlags(flags *pflag.FlagSet) {
3939
// but look above, so we explicitly ignore it
4040
_ = flags.MarkHidden(manager.OtelSupervisedLoggingLevelFlagName)
4141

42+
flags.String(manager.OtelSupervisedMonitoringURLFlagName, "", "Set the monitoring path")
43+
// the only error we can get here is that the flag does not exist
44+
// but look above, so we explicitly ignore it
45+
_ = flags.MarkHidden(manager.OtelSupervisedMonitoringURLFlagName)
46+
4247
goFlags := new(flag.FlagSet)
4348
featuregate.GlobalRegistry().RegisterFlags(goFlags)
4449

internal/pkg/agent/cmd/otel_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,23 @@ func TestPrepareCollectorSettings(t *testing.T) {
3030
settings, err := prepareCollectorSettings(nil, true, "info")
3131
require.NoError(t, err, "failed to prepare collector settings")
3232
require.NotNil(t, settings, "settings should not be nil")
33-
require.NotNil(t, settings.ConfigProviderSettings.ResolverSettings.URIs, "URIs should not be nil")
33+
require.NotNil(t, settings.otelSettings.ConfigProviderSettings.ResolverSettings.URIs, "URIs should not be nil")
3434
agentProviderURIFound := false
35-
for _, uri := range settings.ConfigProviderSettings.ResolverSettings.URIs {
35+
for _, uri := range settings.otelSettings.ConfigProviderSettings.ResolverSettings.URIs {
3636
agentProviderURIFound = strings.Contains(uri, agentprovider.AgentConfigProviderSchemeName)
3737
if agentProviderURIFound {
3838
break
3939
}
4040
}
4141
require.True(t, agentProviderURIFound, "agentprovider Scheme not found in the URIS of ConfigProviderSettings")
42-
require.NotNil(t, settings.LoggingOptions, "loggingOptions should not be nil for supervised mode")
42+
require.NotNil(t, settings.otelSettings.LoggingOptions, "loggingOptions should not be nil for supervised mode")
4343
})
4444

4545
t.Run("returns valid settings in standalone mode", func(t *testing.T) {
4646
settings, err := prepareCollectorSettings([]string{"fake-config.yaml"}, false, "info")
4747
require.NoError(t, err, "failed to prepare collector settings")
4848
require.NotNil(t, settings, "settings should not be nil")
49-
require.Contains(t, settings.ConfigProviderSettings.ResolverSettings.URIs, "fake-config.yaml", "fake-config.yaml not found in the URIS of ConfigProviderSettings")
49+
require.Contains(t, settings.otelSettings.ConfigProviderSettings.ResolverSettings.URIs, "fake-config.yaml", "fake-config.yaml not found in the URIS of ConfigProviderSettings")
5050
})
5151

5252
t.Run("fails when supervised mode has invalid config from stdin", func(t *testing.T) {
@@ -61,7 +61,7 @@ func TestPrepareCollectorSettings(t *testing.T) {
6161

6262
settings, err := prepareCollectorSettings(nil, true, "info")
6363
require.Error(t, err)
64-
require.Nil(t, settings)
64+
require.Nil(t, settings.otelSettings)
6565
})
6666

6767
t.Run("doesn't fail when unsupervised mode has invalid config from stdin", func(t *testing.T) {

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@ import (
2424

2525
"github.com/elastic/elastic-agent-libs/logp"
2626

27+
"github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
2728
runtimeLogger "github.com/elastic/elastic-agent/pkg/component/runtime"
2829
"github.com/elastic/elastic-agent/pkg/core/logger"
2930
"github.com/elastic/elastic-agent/pkg/core/process"
3031
)
3132

3233
const (
33-
OtelSetSupervisedFlagName = "supervised"
34-
OtelSupervisedLoggingLevelFlagName = "supervised.logging.level"
34+
OtelSetSupervisedFlagName = "supervised"
35+
OtelSupervisedLoggingLevelFlagName = "supervised.logging.level"
36+
OtelSupervisedMonitoringURLFlagName = "supervised.monitoring.url"
3537
)
3638

3739
func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subprocessExecution, error) {
@@ -51,6 +53,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subproc
5153
"otel",
5254
fmt.Sprintf("--%s", OtelSetSupervisedFlagName),
5355
fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, logLevel.String()),
56+
fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()),
5457
},
5558
logLevel: logLevel,
5659
healthCheckExtensionID: healthCheckExtensionID,

internal/pkg/otel/manager/testing/testing.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ func main() {
4545
})
4646
}
4747

48+
monitoringURL := os.Getenv("TEST_SUPERVISED_COLLECTOR_MONITORING_URL")
49+
4850
exitCode := 0
49-
err = cmd.RunCollector(ctx, nil, true, "debug")
51+
err = cmd.RunCollector(ctx, nil, true, "debug", monitoringURL)
5052
if err != nil && !errors.Is(err, context.Canceled) {
5153
exitCode = 1
5254
}

0 commit comments

Comments
 (0)