diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 009ddc16bd7..cf114f316cd 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier" fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" + otelconfig "github.com/elastic/elastic-agent/internal/pkg/otel/config" otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager" "github.com/elastic/elastic-agent/internal/pkg/queue" "github.com/elastic/elastic-agent/internal/pkg/release" @@ -122,13 +123,16 @@ func New( override(cfg) } + otelExecMode := otelconfig.GetExecutionModeFromConfig(log, rawConfig) + isOtelExecModeSubprocess := otelExecMode == otelmanager.SubprocessExecutionMode + // monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761 isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, cfg.Settings.Upgrade, agentInfo, new(upgrade.AgentWatcherHelper)) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err) } - monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo) + monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess) runtime, err := runtime.NewManager( log, @@ -240,7 +244,7 @@ func New( return nil, nil, nil, errors.New(err, "failed to initialize composable controller") } - otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout) + otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index d02b859ccaa..dccf920d61c 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/config" monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" + otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring" ) const ( @@ -84,10 +85,11 @@ var ( // BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc) // by injecting the monitoring config into an existing fleet config type BeatsMonitor struct { - enabled bool // feature flag disabling whole v1 monitoring story - config *monitoringConfig - operatingSystem string - agentInfo info.Agent + enabled bool // feature flag disabling whole v1 monitoring story + config *monitoringConfig + operatingSystem string + agentInfo info.Agent + isOtelRuntimeSubprocess bool } // componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use @@ -106,14 +108,15 @@ type monitoringConfig struct { } // New creates a new BeatsMonitor instance. -func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor { +func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor { return &BeatsMonitor{ enabled: enabled, config: &monitoringConfig{ C: cfg, }, - operatingSystem: operatingSystem, - agentInfo: agentInfo, + operatingSystem: operatingSystem, + agentInfo: agentInfo, + isOtelRuntimeSubprocess: isOtelRuntimeSubprocess, } } @@ -349,6 +352,16 @@ func (b *BeatsMonitor) Prepare(unit string) error { return nil } +// Returns true if any component in the list uses the otel runtime. +func usingOtelRuntime(componentInfos []componentInfo) bool { + for _, ci := range componentInfos { + if ci.RuntimeManager == component.OtelRuntimeManager { + return true + } + } + return false +} + // Cleanup removes files that were created for monitoring. func (b *BeatsMonitor) Cleanup(unit string) error { if !b.Enabled() { @@ -661,13 +674,35 @@ func (b *BeatsMonitor) getHttpStreams( "namespace": "agent", "period": metricsCollectionIntervalString, "index": indexName, - "processors": processorsForAgentHttpStream(monitoringNamespace, dataset, b.agentInfo), + "processors": processorsForAgentHttpStream(agentName, agentName, agentName, monitoringNamespace, dataset, b.agentInfo), } if failureThreshold != nil { agentStream[failureThresholdKey] = *failureThreshold } httpStreams = append(httpStreams, agentStream) + if usingOtelRuntime(componentInfos) && b.isOtelRuntimeSubprocess { + edotSubprocessStream := map[string]any{ + idKey: fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID), + "data_stream": map[string]interface{}{ + "type": "metrics", + "dataset": dataset, + "namespace": monitoringNamespace, + }, + "metricsets": []interface{}{"json"}, + "path": "/stats", + "hosts": []interface{}{PrefixedEndpoint(otelMonitoring.EDOTMonitoringEndpoint())}, + "namespace": "agent", + "period": metricsCollectionIntervalString, + "index": indexName, + "processors": processorsForAgentHttpStream(agentName, otelMonitoring.EDOTComponentID, otelMonitoring.EDOTComponentID, monitoringNamespace, dataset, b.agentInfo), + } + if failureThreshold != nil { + edotSubprocessStream[failureThresholdKey] = *failureThreshold + } + httpStreams = append(httpStreams, edotSubprocessStream) + } + for _, compInfo := range componentInfos { binaryName := compInfo.BinaryName if !isSupportedMetricsBinary(binaryName) { @@ -933,20 +968,20 @@ func processorsForHttpStream(binaryName, unitID, dataset string, agentInfo info. } // processorsForAgentHttpStream returns the processors used for the agent metric stream in the beats input. -func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agent) []any { +func processorsForAgentHttpStream(binaryName, processName, unitID, namespace, dataset string, agentInfo info.Agent) []any { return []interface{}{ addDataStreamFieldsProcessor(dataset, namespace), addEventFieldsProcessor(dataset), - addElasticAgentFieldsProcessor(agentName, agentInfo), + addElasticAgentFieldsProcessor(processName, agentInfo), addAgentFieldsProcessor(agentInfo.AgentID()), addCopyFieldsProcessor(httpCopyRules(), true, false), dropFieldsProcessor([]any{"http"}, true), - addComponentFieldsProcessor(agentName, agentName), + addComponentFieldsProcessor(binaryName, unitID), } } // addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field. -func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any { +func addElasticAgentFieldsProcessor(processName string, agentInfo info.Agent) map[string]any { return map[string]any{ "add_fields": map[string]any{ "target": "elastic_agent", @@ -954,7 +989,7 @@ func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map "id": agentInfo.AgentID(), "version": agentInfo.Version(), "snapshot": agentInfo.Snapshot(), - "process": binaryName, + "process": processName, }, }, } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index 8ab48f0d9e3..18c708eb905 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -7,6 +7,7 @@ package monitoring import ( "context" "encoding/json" + "fmt" "os" "path/filepath" "runtime" @@ -932,67 +933,102 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) { } func TestMonitoringWithOtelRuntime(t *testing.T) { - agentInfo, err := info.NewAgentInfo(context.Background(), false) - require.NoError(t, err, "Error creating agent info") - - cfg := &monitoringConfig{ - C: &monitoringcfg.MonitoringConfig{ - Enabled: true, - MonitorLogs: true, - MonitorMetrics: true, - Namespace: "test", - HTTP: &monitoringcfg.MonitoringHTTPConfig{ - Enabled: false, - }, - RuntimeManager: monitoringcfg.OtelRuntimeManager, - }, - } - - policy := map[string]any{ - "agent": map[string]any{ - "monitoring": map[string]any{ - "metrics": true, - "logs": false, - }, + for _, tc := range []struct { + name string + edotSubprocess bool + }{ + { + name: "otel runtime inprocess", + edotSubprocess: false, }, - "outputs": map[string]any{ - "default": map[string]any{}, + { + name: "otel runtime subprocess", + edotSubprocess: true, }, - } + } { + t.Run(tc.name, func(t *testing.T) { + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") - b := &BeatsMonitor{ - enabled: true, - config: cfg, - agentInfo: agentInfo, - } + cfg := &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorLogs: true, + MonitorMetrics: true, + Namespace: "test", + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + RuntimeManager: monitoringcfg.OtelRuntimeManager, + }, + } - components := []component.Component{ - { - ID: "filestream-receiver", - InputSpec: &component.InputRuntimeSpec{ - Spec: component.InputSpec{ - Command: &component.CommandSpec{ - Name: "filebeat", + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "logs": false, }, }, - }, - RuntimeManager: component.OtelRuntimeManager, - }, - } - monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{}) - require.NoError(t, err) + "outputs": map[string]any{ + "default": map[string]any{}, + }, + } - // Verify that if we're using filebeat receiver, there's no filebeat input - var monitoringCfg struct { - Inputs []struct { - ID string - RuntimeManager string `mapstructure:"_runtime_experimental"` - } - } - err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg) - require.NoError(t, err) - for _, input := range monitoringCfg.Inputs { - assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager) + b := &BeatsMonitor{ + enabled: true, + config: cfg, + agentInfo: agentInfo, + isOtelRuntimeSubprocess: tc.edotSubprocess, + } + + components := []component.Component{ + { + ID: "filestream-receiver", + InputSpec: &component.InputRuntimeSpec{ + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Name: "filebeat", + }, + }, + }, + RuntimeManager: component.OtelRuntimeManager, + }, + } + monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{}) + require.NoError(t, err) + + // Verify that if we're using filebeat receiver, there's no filebeat input + var monitoringCfg struct { + Inputs []struct { + ID string + RuntimeManager string `mapstructure:"_runtime_experimental"` + Streams []struct { + ID string `mapstructure:"id"` + } `mapstructure:"streams"` + } + } + err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg) + require.NoError(t, err) + edotSubprocessStreamID := fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID) + foundEdotSubprocessStream := false + for _, input := range monitoringCfg.Inputs { + assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager) + if !foundEdotSubprocessStream && input.ID == "metrics-monitoring-agent" { + for _, stream := range input.Streams { + if stream.ID == edotSubprocessStreamID { + foundEdotSubprocessStream = true + break + } + } + } + } + if tc.edotSubprocess { + require.True(t, foundEdotSubprocessStream, "edot subprocess stream not found") + } else { + require.False(t, foundEdotSubprocessStream, "edot subprocess stream found") + } + }) } } @@ -1066,7 +1102,7 @@ func TestMonitorReload(t *testing.T) { monitorcfg.MonitorLogs = false monitorcfg.MonitorMetrics = false - beatsMonitor := New(true, "", monitorcfg, nil) + beatsMonitor := New(true, "", monitorcfg, nil, false) assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false) assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false) diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index b71096626f1..ef5f24f3bae 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -29,6 +29,8 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/config/operations" "github.com/elastic/elastic-agent/internal/pkg/diagnostics" + otelconfig "github.com/elastic/elastic-agent/internal/pkg/otel/config" + "github.com/elastic/elastic-agent/internal/pkg/otel/manager" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/utils" @@ -179,7 +181,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, return fmt.Errorf("failed to detect inputs and outputs: %w", err) } - monitorFn, err := getMonitoringFn(ctx, cfg) + monitorFn, err := getMonitoringFn(ctx, l, cfg) if err != nil { return fmt.Errorf("failed to get monitoring: %w", err) } @@ -374,7 +376,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri return nil, err } - monitorFn, err := getMonitoringFn(ctx, m) + monitorFn, err := getMonitoringFn(ctx, l, m) if err != nil { return nil, fmt.Errorf("failed to get monitoring: %w", err) } @@ -393,7 +395,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri return comps, nil } -func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) { +func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) { config, err := config.NewConfigFrom(cfg) if err != nil { return nil, err @@ -408,8 +410,9 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component if err != nil { return nil, fmt.Errorf("could not load agent info: %w", err) } - - monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo) + otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config) + isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode + monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess) return monitor.MonitoringConfig, nil } diff --git a/internal/pkg/agent/cmd/otel.go b/internal/pkg/agent/cmd/otel.go index c6b6b7dfda9..3abf91ace16 100644 --- a/internal/pkg/agent/cmd/otel.go +++ b/internal/pkg/agent/cmd/otel.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/otel" "github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider" "github.com/elastic/elastic-agent/internal/pkg/otel/manager" + "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -45,10 +46,14 @@ func newOtelCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Comman if err != nil { return err } + supervisedMonitoringURL, err := cmd.Flags().GetString(manager.OtelSupervisedMonitoringURLFlagName) + if err != nil { + return err + } if err := prepareEnv(); err != nil { return err } - return RunCollector(cmd.Context(), cfgFiles, supervised, supervisedLoggingLevel) + return RunCollector(cmd.Context(), cfgFiles, supervised, supervisedLoggingLevel, supervisedMonitoringURL) }, PreRun: func(c *cobra.Command, args []string) { // hide inherited flags not to bloat help with flags not related to otel @@ -76,7 +81,7 @@ func hideInheritedFlags(c *cobra.Command) { }) } -func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool, supervisedLoggingLevel string) error { +func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool, supervisedLoggingLevel string, supervisedMonitoringURL string) error { settings, err := prepareCollectorSettings(configFiles, supervised, supervisedLoggingLevel) if err != nil { return fmt.Errorf("failed to prepare collector settings: %w", err) @@ -89,6 +94,17 @@ func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool, service.WaitExecutionDone() }() + if supervisedMonitoringURL != "" { + server, err := monitoring.NewServer(settings.log, supervisedMonitoringURL) + if err != nil { + return fmt.Errorf("error create monitoring server: %w", err) + } + server.Start() + defer func() { + _ = server.Stop() + }() + } + service.BeforeRun() defer service.Cleanup() @@ -102,18 +118,23 @@ func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool, defer cancel() go service.ProcessWindowsControlEvents(stopCollector) - return otel.Run(ctx, stop, settings) + return otel.Run(ctx, stop, settings.otelSettings) +} + +type edotSettings struct { + log *logger.Logger + otelSettings *otelcol.CollectorSettings } -func prepareCollectorSettings(configFiles []string, supervised bool, supervisedLoggingLevel string) (*otelcol.CollectorSettings, error) { - var settings *otelcol.CollectorSettings +func prepareCollectorSettings(configFiles []string, supervised bool, supervisedLoggingLevel string) (edotSettings, error) { + var settings edotSettings if supervised { // add stdin config provider configProvider, err := agentprovider.NewBufferProvider(os.Stdin) if err != nil { - return nil, fmt.Errorf("failed to create config provider: %w", err) + return settings, fmt.Errorf("failed to create config provider: %w", err) } - settings = otel.NewSettings(release.Version(), []string{configProvider.URI()}, + settings.otelSettings = otel.NewSettings(release.Version(), []string{configProvider.URI()}, otel.WithConfigProviderFactory(configProvider.NewFactory()), ) @@ -138,20 +159,21 @@ func prepareCollectorSettings(configFiles []string, supervised bool, supervisedL l, err := logger.NewFromConfig("edot", defaultCfg, defaultEventLogCfg, false) if err != nil { - return nil, fmt.Errorf("failed to create logger: %w", err) + return settings, fmt.Errorf("failed to create logger: %w", err) } + settings.log = l if logLevelSettingErr != nil { l.Warnf("Fallback to default logging level due to: %v", logLevelSettingErr) } - settings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core { + settings.otelSettings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core { return l.Core() })} - settings.DisableGracefulShutdown = false + settings.otelSettings.DisableGracefulShutdown = false } else { - settings = otel.NewSettings(release.Version(), configFiles) + settings.otelSettings = otel.NewSettings(release.Version(), configFiles) } return settings, nil } diff --git a/internal/pkg/agent/cmd/otel_flags.go b/internal/pkg/agent/cmd/otel_flags.go index e5818eee19a..e43269068a9 100644 --- a/internal/pkg/agent/cmd/otel_flags.go +++ b/internal/pkg/agent/cmd/otel_flags.go @@ -39,6 +39,11 @@ func setupOtelFlags(flags *pflag.FlagSet) { // but look above, so we explicitly ignore it _ = flags.MarkHidden(manager.OtelSupervisedLoggingLevelFlagName) + flags.String(manager.OtelSupervisedMonitoringURLFlagName, "", "Set the monitoring path") + // the only error we can get here is that the flag does not exist + // but look above, so we explicitly ignore it + _ = flags.MarkHidden(manager.OtelSupervisedMonitoringURLFlagName) + goFlags := new(flag.FlagSet) featuregate.GlobalRegistry().RegisterFlags(goFlags) diff --git a/internal/pkg/agent/cmd/otel_test.go b/internal/pkg/agent/cmd/otel_test.go index 073fd46d58b..e900cac9a1a 100644 --- a/internal/pkg/agent/cmd/otel_test.go +++ b/internal/pkg/agent/cmd/otel_test.go @@ -30,23 +30,23 @@ func TestPrepareCollectorSettings(t *testing.T) { settings, err := prepareCollectorSettings(nil, true, "info") require.NoError(t, err, "failed to prepare collector settings") require.NotNil(t, settings, "settings should not be nil") - require.NotNil(t, settings.ConfigProviderSettings.ResolverSettings.URIs, "URIs should not be nil") + require.NotNil(t, settings.otelSettings.ConfigProviderSettings.ResolverSettings.URIs, "URIs should not be nil") agentProviderURIFound := false - for _, uri := range settings.ConfigProviderSettings.ResolverSettings.URIs { + for _, uri := range settings.otelSettings.ConfigProviderSettings.ResolverSettings.URIs { agentProviderURIFound = strings.Contains(uri, agentprovider.AgentConfigProviderSchemeName) if agentProviderURIFound { break } } require.True(t, agentProviderURIFound, "agentprovider Scheme not found in the URIS of ConfigProviderSettings") - require.NotNil(t, settings.LoggingOptions, "loggingOptions should not be nil for supervised mode") + require.NotNil(t, settings.otelSettings.LoggingOptions, "loggingOptions should not be nil for supervised mode") }) t.Run("returns valid settings in standalone mode", func(t *testing.T) { settings, err := prepareCollectorSettings([]string{"fake-config.yaml"}, false, "info") require.NoError(t, err, "failed to prepare collector settings") require.NotNil(t, settings, "settings should not be nil") - require.Contains(t, settings.ConfigProviderSettings.ResolverSettings.URIs, "fake-config.yaml", "fake-config.yaml not found in the URIS of ConfigProviderSettings") + require.Contains(t, settings.otelSettings.ConfigProviderSettings.ResolverSettings.URIs, "fake-config.yaml", "fake-config.yaml not found in the URIS of ConfigProviderSettings") }) t.Run("fails when supervised mode has invalid config from stdin", func(t *testing.T) { @@ -61,7 +61,7 @@ func TestPrepareCollectorSettings(t *testing.T) { settings, err := prepareCollectorSettings(nil, true, "info") require.Error(t, err) - require.Nil(t, settings) + require.Nil(t, settings.otelSettings) }) t.Run("doesn't fail when unsupervised mode has invalid config from stdin", func(t *testing.T) { diff --git a/internal/pkg/otel/config/config.go b/internal/pkg/otel/config/config.go new file mode 100644 index 00000000000..f3d5dae6d44 --- /dev/null +++ b/internal/pkg/otel/config/config.go @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package config + +import ( + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/internal/pkg/otel/manager" +) + +const defaultExecMode = manager.EmbeddedExecutionMode + +type execModeConfig struct { + Agent struct { + Features struct { + Otel *struct { + SubprocessExecution *bool `json:"subprocess_execution,omitempty" yaml:"subprocess_execution,omitempty" config:"subprocess_execution,omitempty"` + } `json:"otel,omitempty" yaml:"otel,omitempty" config:"otel,omitempty"` + } `json:"features" yaml:"features" config:"features"` + } `json:"agent" yaml:"agent" config:"agent"` +} + +// GetExecutionModeFromConfig returns the execution mode of the OTel runtime based on the config. +func GetExecutionModeFromConfig(log *logp.Logger, conf *config.Config) manager.ExecutionMode { + var c execModeConfig + if err := conf.UnpackTo(&c); err != nil { + log.Warnf("failed to unpack config when getting otel runtime execution mode: %v", err) + return defaultExecMode + } + + if c.Agent.Features.Otel == nil { + return defaultExecMode + } + + if c.Agent.Features.Otel.SubprocessExecution == nil { + return defaultExecMode + } + + if *c.Agent.Features.Otel.SubprocessExecution { + return manager.SubprocessExecutionMode + } else { + return manager.EmbeddedExecutionMode + } +} diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index ba2394af004..300e43b70b8 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -24,14 +24,16 @@ import ( "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring" runtimeLogger "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/process" ) const ( - OtelSetSupervisedFlagName = "supervised" - OtelSupervisedLoggingLevelFlagName = "supervised.logging.level" + OtelSetSupervisedFlagName = "supervised" + OtelSupervisedLoggingLevelFlagName = "supervised.logging.level" + OtelSupervisedMonitoringURLFlagName = "supervised.monitoring.url" ) func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subprocessExecution, error) { @@ -51,6 +53,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subproc "otel", fmt.Sprintf("--%s", OtelSetSupervisedFlagName), fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, logLevel.String()), + fmt.Sprintf("--%s=%s", OtelSupervisedMonitoringURLFlagName, monitoring.EDOTMonitoringEndpoint()), }, logLevel: logLevel, healthCheckExtensionID: healthCheckExtensionID, diff --git a/internal/pkg/otel/manager/testing/testing.go b/internal/pkg/otel/manager/testing/testing.go index e7afd25650d..f517f8f55c7 100644 --- a/internal/pkg/otel/manager/testing/testing.go +++ b/internal/pkg/otel/manager/testing/testing.go @@ -45,8 +45,10 @@ func main() { }) } + monitoringURL := os.Getenv("TEST_SUPERVISED_COLLECTOR_MONITORING_URL") + exitCode := 0 - err = cmd.RunCollector(ctx, nil, true, "debug") + err = cmd.RunCollector(ctx, nil, true, "debug", monitoringURL) if err != nil && !errors.Is(err, context.Canceled) { exitCode = 1 } diff --git a/internal/pkg/otel/monitoring/monitoring.go b/internal/pkg/otel/monitoring/monitoring.go new file mode 100644 index 00000000000..82e8248b9a3 --- /dev/null +++ b/internal/pkg/otel/monitoring/monitoring.go @@ -0,0 +1,134 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package monitoring + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "path/filepath" + "runtime" + "strings" + + "github.com/gofrs/uuid/v5" + "github.com/gorilla/mux" + + "github.com/elastic/elastic-agent-libs/api" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-system-metrics/report" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/utils" + "github.com/elastic/elastic-agent/version" +) + +// EDOTComponentID is the component ID for the EDOT collector. +const EDOTComponentID = "elastic-agent/collector" + +// EDOTMonitoringEndpoint returns the monitoring endpoint for the EDOT collector. +func EDOTMonitoringEndpoint() string { + return utils.SocketURLWithFallback(EDOTComponentID, paths.TempDir()) +} + +// NewServer creates a new server exposing metrics and process information. +func NewServer(log *logp.Logger, host string) (*api.Server, error) { + ephemeralID, err := generateEphemeralID() + if err != nil { + return nil, err + } + + if err := report.SetupMetricsOptions(report.MetricOptions{ + Name: EDOTComponentID, + Version: version.GetDefaultVersion(), + Logger: log, + EphemeralID: ephemeralID, + SystemMetrics: monitoring.Default.GetOrCreateRegistry("system"), + ProcessMetrics: monitoring.Default.GetOrCreateRegistry("beat"), + }); err != nil { + return nil, fmt.Errorf("failed to setup metrics: %w", err) + } + + r := mux.NewRouter() + r.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + data := monitoring.CollectStructSnapshot( + monitoring.GetNamespace("stats").GetRegistry(), + monitoring.Full, + false, + ) + + bytes, err := json.Marshal(data) + var content string + if err != nil { + content = fmt.Sprintf("Not valid json: %v", err) + } else { + content = string(bytes) + } + fmt.Fprint(w, content) + }) + + mux := http.NewServeMux() + mux.Handle("/", r) + + err = createMonitoringPath(host) + if err != nil { + return nil, fmt.Errorf("failed to create monitoring path: %w", err) + } + + srvCfg := api.DefaultConfig() + srvCfg.Enabled = true + srvCfg.Host = host + srvCfg.Port = 0 + apiServer, err := api.NewFromConfig(log, mux, srvCfg) + if err != nil { + return nil, fmt.Errorf("failed to create api server: %w", err) + } + return apiServer, nil +} + +// createMonitoringPath checks and creates the drop path if it doesn't exist. +func createMonitoringPath(drop string) error { + if drop == "" || runtime.GOOS == "windows" || isHttpUrl(drop) { + return nil + } + + path := strings.TrimPrefix(drop, "unix://") + if strings.HasSuffix(path, ".sock") { + path = filepath.Dir(path) + } + + _, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + return err + } + + // create + if err := os.MkdirAll(path, 0775); err != nil { + return err + } + } + + return os.Chown(path, os.Geteuid(), os.Getegid()) +} + +// isHttpUrl checks if the given string is a valid HTTP URL. +func isHttpUrl(s string) bool { + u, err := url.Parse(strings.TrimSpace(s)) + return err == nil && (u.Scheme == "http" || u.Scheme == "https") && u.Host != "" +} + +// generateEphemeralID generates a random UUID. +func generateEphemeralID() (string, error) { + uid, err := uuid.NewV4() + if err != nil { + return "", fmt.Errorf("error while generating UUID for agent: %w", err) + } + + return uid.String(), nil +}