Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
otelconfig "github.com/elastic/elastic-agent/internal/pkg/otel/config"
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
"github.com/elastic/elastic-agent/internal/pkg/queue"
"github.com/elastic/elastic-agent/internal/pkg/release"
Expand Down Expand Up @@ -122,13 +123,16 @@ func New(
override(cfg)
}

otelExecMode := otelconfig.GetExecutionModeFromConfig(log, rawConfig)
isOtelExecModeSubprocess := otelExecMode == otelmanager.SubprocessExecutionMode

// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, cfg.Settings.Upgrade, agentInfo, new(upgrade.AgentWatcherHelper))
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)

runtime, err := runtime.NewManager(
log,
Expand Down Expand Up @@ -240,7 +244,7 @@ func New(
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
}
Expand Down
61 changes: 48 additions & 13 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
otelMonitoring "github.com/elastic/elastic-agent/internal/pkg/otel/monitoring"
)

const (
Expand Down Expand Up @@ -84,10 +85,11 @@ var (
// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc)
// by injecting the monitoring config into an existing fleet config
type BeatsMonitor struct {
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
operatingSystem string
agentInfo info.Agent
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
operatingSystem string
agentInfo info.Agent
isOtelRuntimeSubprocess bool
}

// componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use
Expand All @@ -106,14 +108,15 @@ type monitoringConfig struct {
}

// New creates a new BeatsMonitor instance.
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor {
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
return &BeatsMonitor{
enabled: enabled,
config: &monitoringConfig{
C: cfg,
},
operatingSystem: operatingSystem,
agentInfo: agentInfo,
operatingSystem: operatingSystem,
agentInfo: agentInfo,
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
}
}

Expand Down Expand Up @@ -349,6 +352,16 @@ func (b *BeatsMonitor) Prepare(unit string) error {
return nil
}

// Returns true if any component in the list uses the otel runtime.
func usingOtelRuntime(componentInfos []componentInfo) bool {
for _, ci := range componentInfos {
if ci.RuntimeManager == component.OtelRuntimeManager {
return true
}
}
return false
}

// Cleanup removes files that were created for monitoring.
func (b *BeatsMonitor) Cleanup(unit string) error {
if !b.Enabled() {
Expand Down Expand Up @@ -661,13 +674,35 @@ func (b *BeatsMonitor) getHttpStreams(
"namespace": "agent",
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForAgentHttpStream(monitoringNamespace, dataset, b.agentInfo),
"processors": processorsForAgentHttpStream(agentName, agentName, agentName, monitoringNamespace, dataset, b.agentInfo),
}
if failureThreshold != nil {
agentStream[failureThresholdKey] = *failureThreshold
}
httpStreams = append(httpStreams, agentStream)

if usingOtelRuntime(componentInfos) && b.isOtelRuntimeSubprocess {
edotSubprocessStream := map[string]any{
idKey: fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"json"},
"path": "/stats",
"hosts": []interface{}{PrefixedEndpoint(otelMonitoring.EDOTMonitoringEndpoint())},
"namespace": "agent",
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForAgentHttpStream(agentName, otelMonitoring.EDOTComponentID, otelMonitoring.EDOTComponentID, monitoringNamespace, dataset, b.agentInfo),
}
if failureThreshold != nil {
edotSubprocessStream[failureThresholdKey] = *failureThreshold
}
httpStreams = append(httpStreams, edotSubprocessStream)
}

for _, compInfo := range componentInfos {
binaryName := compInfo.BinaryName
if !isSupportedMetricsBinary(binaryName) {
Expand Down Expand Up @@ -933,28 +968,28 @@ func processorsForHttpStream(binaryName, unitID, dataset string, agentInfo info.
}

// processorsForAgentHttpStream returns the processors used for the agent metric stream in the beats input.
func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agent) []any {
func processorsForAgentHttpStream(binaryName, processName, unitID, namespace, dataset string, agentInfo info.Agent) []any {
return []interface{}{
addDataStreamFieldsProcessor(dataset, namespace),
addEventFieldsProcessor(dataset),
addElasticAgentFieldsProcessor(agentName, agentInfo),
addElasticAgentFieldsProcessor(processName, agentInfo),
addAgentFieldsProcessor(agentInfo.AgentID()),
addCopyFieldsProcessor(httpCopyRules(), true, false),
dropFieldsProcessor([]any{"http"}, true),
addComponentFieldsProcessor(agentName, agentName),
addComponentFieldsProcessor(binaryName, unitID),
}
}

// addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field.
func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any {
func addElasticAgentFieldsProcessor(processName string, agentInfo info.Agent) map[string]any {
return map[string]any{
"add_fields": map[string]any{
"target": "elastic_agent",
"fields": map[string]any{
"id": agentInfo.AgentID(),
"version": agentInfo.Version(),
"snapshot": agentInfo.Snapshot(),
"process": binaryName,
"process": processName,
},
},
}
Expand Down
146 changes: 91 additions & 55 deletions internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package monitoring
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -932,67 +933,102 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
}

func TestMonitoringWithOtelRuntime(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

cfg := &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorLogs: true,
MonitorMetrics: true,
Namespace: "test",
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
RuntimeManager: monitoringcfg.OtelRuntimeManager,
},
}

policy := map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"logs": false,
},
for _, tc := range []struct {
name string
edotSubprocess bool
}{
{
name: "otel runtime inprocess",
edotSubprocess: false,
},
"outputs": map[string]any{
"default": map[string]any{},
{
name: "otel runtime subprocess",
edotSubprocess: true,
},
}
} {
t.Run(tc.name, func(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

b := &BeatsMonitor{
enabled: true,
config: cfg,
agentInfo: agentInfo,
}
cfg := &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorLogs: true,
MonitorMetrics: true,
Namespace: "test",
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
RuntimeManager: monitoringcfg.OtelRuntimeManager,
},
}

components := []component.Component{
{
ID: "filestream-receiver",
InputSpec: &component.InputRuntimeSpec{
Spec: component.InputSpec{
Command: &component.CommandSpec{
Name: "filebeat",
policy := map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"logs": false,
},
},
},
RuntimeManager: component.OtelRuntimeManager,
},
}
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
require.NoError(t, err)
"outputs": map[string]any{
"default": map[string]any{},
},
}

// Verify that if we're using filebeat receiver, there's no filebeat input
var monitoringCfg struct {
Inputs []struct {
ID string
RuntimeManager string `mapstructure:"_runtime_experimental"`
}
}
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
require.NoError(t, err)
for _, input := range monitoringCfg.Inputs {
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
b := &BeatsMonitor{
enabled: true,
config: cfg,
agentInfo: agentInfo,
isOtelRuntimeSubprocess: tc.edotSubprocess,
}

components := []component.Component{
{
ID: "filestream-receiver",
InputSpec: &component.InputRuntimeSpec{
Spec: component.InputSpec{
Command: &component.CommandSpec{
Name: "filebeat",
},
},
},
RuntimeManager: component.OtelRuntimeManager,
},
}
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
require.NoError(t, err)

// Verify that if we're using filebeat receiver, there's no filebeat input
var monitoringCfg struct {
Inputs []struct {
ID string
RuntimeManager string `mapstructure:"_runtime_experimental"`
Streams []struct {
ID string `mapstructure:"id"`
} `mapstructure:"streams"`
}
}
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
require.NoError(t, err)
edotSubprocessStreamID := fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID)
foundEdotSubprocessStream := false
for _, input := range monitoringCfg.Inputs {
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
if !foundEdotSubprocessStream && input.ID == "metrics-monitoring-agent" {
for _, stream := range input.Streams {
if stream.ID == edotSubprocessStreamID {
foundEdotSubprocessStream = true
break
}
}
}
}
if tc.edotSubprocess {
require.True(t, foundEdotSubprocessStream, "edot subprocess stream not found")
} else {
require.False(t, foundEdotSubprocessStream, "edot subprocess stream found")
}
})
}
}

Expand Down Expand Up @@ -1066,7 +1102,7 @@ func TestMonitorReload(t *testing.T) {
monitorcfg.MonitorLogs = false
monitorcfg.MonitorMetrics = false

beatsMonitor := New(true, "", monitorcfg, nil)
beatsMonitor := New(true, "", monitorcfg, nil, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)

Expand Down
13 changes: 8 additions & 5 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/config/operations"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
otelconfig "github.com/elastic/elastic-agent/internal/pkg/otel/config"
"github.com/elastic/elastic-agent/internal/pkg/otel/manager"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/utils"
Expand Down Expand Up @@ -179,7 +181,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
return fmt.Errorf("failed to detect inputs and outputs: %w", err)
}

monitorFn, err := getMonitoringFn(ctx, cfg)
monitorFn, err := getMonitoringFn(ctx, l, cfg)
if err != nil {
return fmt.Errorf("failed to get monitoring: %w", err)
}
Expand Down Expand Up @@ -374,7 +376,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri
return nil, err
}

monitorFn, err := getMonitoringFn(ctx, m)
monitorFn, err := getMonitoringFn(ctx, l, m)
if err != nil {
return nil, fmt.Errorf("failed to get monitoring: %w", err)
}
Expand All @@ -393,7 +395,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri
return comps, nil
}

func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) {
func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) {
config, err := config.NewConfigFrom(cfg)
if err != nil {
return nil, err
Expand All @@ -408,8 +410,9 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component
if err != nil {
return nil, fmt.Errorf("could not load agent info: %w", err)
}

monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo)
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
return monitor.MonitoringConfig, nil
}

Expand Down
Loading
Loading