Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,16 @@ func New(
override(cfg)
}

otelExecMode := otelmanager.GetExecutionModeFromConfig(rawConfig)
isOtelSubprocessExecution := otelExecMode == otelmanager.SubprocessExecutionMode

// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, cfg.Settings.Upgrade, agentInfo, new(upgrade.AgentWatcherHelper))
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelSubprocessExecution)

runtime, err := runtime.NewManager(
log,
Expand Down Expand Up @@ -240,7 +243,7 @@ func New(
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
}
Expand Down
68 changes: 53 additions & 15 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
monitoringOutput = "monitoring"
defaultMonitoringNamespace = "default"
agentName = "elastic-agent"
edotCollectorName = "elastic-agent/collector"
metricBeatName = "metricbeat"
fileBeatName = "filebeat"

Expand All @@ -73,6 +74,9 @@ const (
// metricset stream failure threshold before the stream is marked as DEGRADED
// to avoid marking the agent degraded for transient errors, we set the default threshold to 5
defaultMetricsStreamFailureThreshold = uint(5)

beatsMonitoringComponentInfoID = "beat/" + monitoringMetricsUnitID
httpMonitoringComponentInfoID = "http/" + monitoringMetricsUnitID
)

var (
Expand All @@ -84,10 +88,11 @@ var (
// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc)
// by injecting the monitoring config into an existing fleet config
type BeatsMonitor struct {
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
operatingSystem string
agentInfo info.Agent
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
operatingSystem string
agentInfo info.Agent
isOtelRuntimeSubprocess bool
}

// componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use
Expand All @@ -106,14 +111,15 @@ type monitoringConfig struct {
}

// New creates a new BeatsMonitor instance.
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor {
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
return &BeatsMonitor{
enabled: enabled,
config: &monitoringConfig{
C: cfg,
},
operatingSystem: operatingSystem,
agentInfo: agentInfo,
operatingSystem: operatingSystem,
agentInfo: agentInfo,
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
}
}

Expand Down Expand Up @@ -418,12 +424,12 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo
if b.config.C.MonitorMetrics {
componentInfos = append(componentInfos,
componentInfo{
ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID),
ID: beatsMonitoringComponentInfoID,
BinaryName: metricBeatName,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
},
componentInfo{
ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID),
ID: httpMonitoringComponentInfoID,
BinaryName: metricBeatName,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
})
Expand Down Expand Up @@ -661,20 +667,28 @@ func (b *BeatsMonitor) getHttpStreams(
"namespace": "agent",
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForAgentHttpStream(monitoringNamespace, dataset, b.agentInfo),
"processors": processorsForAgentHttpStream(agentName, agentName, agentName, monitoringNamespace, dataset, b.agentInfo),
}
if failureThreshold != nil {
agentStream[failureThresholdKey] = *failureThreshold
}
httpStreams = append(httpStreams, agentStream)

var edotSubprocessEndpoints []interface{}

for _, compInfo := range componentInfos {
binaryName := compInfo.BinaryName
if !isSupportedMetricsBinary(binaryName) {
continue
}

endpoints := []interface{}{PrefixedEndpoint(BeatsMonitoringEndpoint(compInfo.ID))}
if compInfo.RuntimeManager == component.OtelRuntimeManager && compInfo.ID == httpMonitoringComponentInfoID && b.isOtelRuntimeSubprocess {
// when Otel runtime is running as subprocess, we need to monitor it as a separate stream, thus we utilise the already
// exposed endpoint of httpMonitoringComponentInfoID.
edotSubprocessEndpoints = endpoints
}

name := sanitizeName(binaryName)

// Do not create http streams if runtime-manager is otel and binary is of beat type
Expand Down Expand Up @@ -727,6 +741,30 @@ func (b *BeatsMonitor) getHttpStreams(
}
}

if edotSubprocessEndpoints != nil {
// Otel runtime subprocess metrics are collected using the same processors as the elastic-agent since we want only the
// system resources.
edotSubprocessStream := map[string]any{
idKey: fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"json"},
"path": "/stats",
"hosts": edotSubprocessEndpoints,
"namespace": "agent",
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForAgentHttpStream(agentName, edotCollectorName, edotCollectorName, monitoringNamespace, dataset, b.agentInfo),
}
if failureThreshold != nil {
edotSubprocessStream[failureThresholdKey] = *failureThreshold
}
httpStreams = append(httpStreams, edotSubprocessStream)
}

return httpStreams
}

Expand Down Expand Up @@ -933,28 +971,28 @@ func processorsForHttpStream(binaryName, unitID, dataset string, agentInfo info.
}

// processorsForAgentHttpStream returns the processors used for the agent metric stream in the beats input.
func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agent) []any {
func processorsForAgentHttpStream(binaryName, processName, unitID, namespace, dataset string, agentInfo info.Agent) []any {
return []interface{}{
addDataStreamFieldsProcessor(dataset, namespace),
addEventFieldsProcessor(dataset),
addElasticAgentFieldsProcessor(agentName, agentInfo),
addElasticAgentFieldsProcessor(processName, agentInfo),
addAgentFieldsProcessor(agentInfo.AgentID()),
addCopyFieldsProcessor(httpCopyRules(), true, false),
dropFieldsProcessor([]any{"http"}, true),
addComponentFieldsProcessor(agentName, agentName),
addComponentFieldsProcessor(binaryName, unitID),
}
}

// addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field.
func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any {
func addElasticAgentFieldsProcessor(processName string, agentInfo info.Agent) map[string]any {
return map[string]any{
"add_fields": map[string]any{
"target": "elastic_agent",
"fields": map[string]any{
"id": agentInfo.AgentID(),
"version": agentInfo.Version(),
"snapshot": agentInfo.Snapshot(),
"process": binaryName,
"process": processName,
},
},
}
Expand Down
146 changes: 91 additions & 55 deletions internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package monitoring
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -932,67 +933,102 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
}

func TestMonitoringWithOtelRuntime(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

cfg := &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorLogs: true,
MonitorMetrics: true,
Namespace: "test",
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
RuntimeManager: monitoringcfg.OtelRuntimeManager,
},
}

policy := map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"logs": false,
},
for _, tc := range []struct {
name string
edotSubprocess bool
}{
{
name: "otel runtime inprocess",
edotSubprocess: false,
},
"outputs": map[string]any{
"default": map[string]any{},
{
name: "otel runtime subprocess",
edotSubprocess: true,
},
}
} {
t.Run(tc.name, func(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

b := &BeatsMonitor{
enabled: true,
config: cfg,
agentInfo: agentInfo,
}
cfg := &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorLogs: true,
MonitorMetrics: true,
Namespace: "test",
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
RuntimeManager: monitoringcfg.OtelRuntimeManager,
},
}

components := []component.Component{
{
ID: "filestream-receiver",
InputSpec: &component.InputRuntimeSpec{
Spec: component.InputSpec{
Command: &component.CommandSpec{
Name: "filebeat",
policy := map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"logs": false,
},
},
},
RuntimeManager: component.OtelRuntimeManager,
},
}
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
require.NoError(t, err)
"outputs": map[string]any{
"default": map[string]any{},
},
}

// Verify that if we're using filebeat receiver, there's no filebeat input
var monitoringCfg struct {
Inputs []struct {
ID string
RuntimeManager string `mapstructure:"_runtime_experimental"`
}
}
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
require.NoError(t, err)
for _, input := range monitoringCfg.Inputs {
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
b := &BeatsMonitor{
enabled: true,
config: cfg,
agentInfo: agentInfo,
isOtelRuntimeSubprocess: tc.edotSubprocess,
}

components := []component.Component{
{
ID: "filestream-receiver",
InputSpec: &component.InputRuntimeSpec{
Spec: component.InputSpec{
Command: &component.CommandSpec{
Name: "filebeat",
},
},
},
RuntimeManager: component.OtelRuntimeManager,
},
}
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
require.NoError(t, err)

// Verify that if we're using filebeat receiver, there's no filebeat input
var monitoringCfg struct {
Inputs []struct {
ID string
RuntimeManager string `mapstructure:"_runtime_experimental"`
Streams []struct {
ID string `mapstructure:"id"`
} `mapstructure:"streams"`
}
}
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
require.NoError(t, err)
edotSubprocessStreamID := fmt.Sprintf("%s-edot-collector", monitoringMetricsUnitID)
foundEdotSubprocessStream := false
for _, input := range monitoringCfg.Inputs {
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
if !foundEdotSubprocessStream && input.ID == "metrics-monitoring-agent" {
for _, stream := range input.Streams {
if stream.ID == edotSubprocessStreamID {
foundEdotSubprocessStream = true
break
}
}
}
}
if tc.edotSubprocess {
require.True(t, foundEdotSubprocessStream, "edot subprocess stream not found")
} else {
require.False(t, foundEdotSubprocessStream, "edot subprocess stream found")
}
})
}
}

Expand Down Expand Up @@ -1066,7 +1102,7 @@ func TestMonitorReload(t *testing.T) {
monitorcfg.MonitorLogs = false
monitorcfg.MonitorMetrics = false

beatsMonitor := New(true, "", monitorcfg, nil)
beatsMonitor := New(true, "", monitorcfg, nil, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)

Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/config/operations"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
otelmanager "github.com/elastic/elastic-agent/internal/pkg/otel/manager"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/utils"
Expand Down Expand Up @@ -409,7 +410,8 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component
return nil, fmt.Errorf("could not load agent info: %w", err)
}

monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo)
isOtelSubprocessExecution := otelmanager.IsSubprocessExecution(config)
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelSubprocessExecution)
return monitor.MonitoringConfig, nil
}

Expand Down
Loading
Loading