Skip to content
Merged
13 changes: 11 additions & 2 deletions cmd/system-probe/config/adjust_npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package config

import (
"errors"
"fmt"
"math"
"runtime"
Expand Down Expand Up @@ -50,7 +51,7 @@ func adjustNetwork(cfg config.Config) {

validateInt64(cfg, spNS("max_tracked_connections"), defaultMaxTrackedConnections, func(v int64) error {
if v <= 0 {
return fmt.Errorf("must be a positive value")
return errors.New("must be a positive value")
}
return nil
})
Expand All @@ -60,11 +61,19 @@ func adjustNetwork(cfg config.Config) {
// closed connections in environments with mostly short-lived connections
validateInt64(cfg, spNS("max_closed_connections_buffered"), cfg.GetInt64(spNS("max_tracked_connections")), func(v int64) error {
if v <= 0 {
return fmt.Errorf("must be a positive value")
return errors.New("must be a positive value")
}
return nil
})
limitMaxInt64(cfg, spNS("max_closed_connections_buffered"), math.MaxUint32)
// also ensure that max_failed_connections_buffered is equal to max_tracked_connections if the former is not set
validateInt64(cfg, netNS("max_failed_connections_buffered"), cfg.GetInt64(spNS("max_tracked_connections")), func(v int64) error {
if v <= 0 {
return errors.New("must be a positive value")
}
return nil
})
limitMaxInt64(cfg, netNS("max_failed_connections_buffered"), math.MaxUint32)

limitMaxInt(cfg, spNS("offset_guess_threshold"), maxOffsetThreshold)

Expand Down
1 change: 1 addition & 0 deletions pkg/config/setup/system_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {

cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536)
cfg.BindEnv(join(spNS, "max_closed_connections_buffered"))
cfg.BindEnv(join(netNS, "max_failed_connections_buffered"))
cfg.BindEnvAndSetDefault(join(spNS, "closed_connection_flush_threshold"), 0)
cfg.BindEnvAndSetDefault(join(spNS, "closed_channel_size"), 500)
cfg.BindEnvAndSetDefault(join(spNS, "max_connection_state_buffered"), 75000)
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type Config struct {
// get flushed on every client request (default 30s check interval)
MaxClosedConnectionsBuffered uint32

// MaxFailedConnectionsBuffered represents the maximum number of failed connections we'll buffer in memory. These connections will be
// removed from memory as they are matched to closed connections
MaxFailedConnectionsBuffered uint32

// ClosedConnectionFlushThreshold represents the number of closed connections stored before signalling
// the agent to flush the connections. This value only valid on Windows
ClosedConnectionFlushThreshold int
Expand Down Expand Up @@ -340,6 +344,7 @@ func New() *Config {
TCPFailedConnectionsEnabled: cfg.GetBool(join(netNS, "enable_tcp_failed_connections")),
MaxTrackedConnections: uint32(cfg.GetInt64(join(spNS, "max_tracked_connections"))),
MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(join(spNS, "max_closed_connections_buffered"))),
MaxFailedConnectionsBuffered: uint32(cfg.GetInt64(join(netNS, "max_failed_connections_buffered"))),
ClosedConnectionFlushThreshold: cfg.GetInt(join(spNS, "closed_connection_flush_threshold")),
ClosedChannelSize: cfg.GetInt(join(spNS, "closed_channel_size")),
MaxConnectionsStateBuffered: cfg.GetInt(join(spNS, "max_connection_state_buffered")),
Expand Down
17 changes: 17 additions & 0 deletions pkg/network/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,23 @@ func TestMaxClosedConnectionsBuffered(t *testing.T) {
})
}

func TestMaxFailedConnectionsBuffered(t *testing.T) {
maxTrackedConnections := New().MaxTrackedConnections

t.Run("value set", func(t *testing.T) {
aconfig.ResetSystemProbeConfig(t)
t.Setenv("DD_NETWORK_CONFIG_MAX_FAILED_CONNECTIONS_BUFFERED", fmt.Sprintf("%d", maxTrackedConnections-1))
cfg := New()
require.Equal(t, maxTrackedConnections-1, cfg.MaxFailedConnectionsBuffered)
})

t.Run("value not set", func(t *testing.T) {
aconfig.ResetSystemProbeConfig(t)
cfg := New()
require.Equal(t, cfg.MaxTrackedConnections, cfg.MaxFailedConnectionsBuffered)
})
}

func TestMaxHTTPStatsBuffered(t *testing.T) {
t.Run("via deprecated YAML", func(t *testing.T) {
aconfig.ResetSystemProbeConfig(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ type TCPFailedConnConsumer struct {
}

// NewFailedConnConsumer creates a new TCPFailedConnConsumer
func NewFailedConnConsumer(eventHandler ddebpf.EventHandler, m *manager.Manager) *TCPFailedConnConsumer {
func NewFailedConnConsumer(eventHandler ddebpf.EventHandler, m *manager.Manager, maxFailedConnsBuffered uint32) *TCPFailedConnConsumer {
return &TCPFailedConnConsumer{
eventHandler: eventHandler,
closed: make(chan struct{}),
FailedConns: NewFailedConns(m),
FailedConns: NewFailedConns(m, maxFailedConnsBuffered),
}
}

Expand Down
62 changes: 32 additions & 30 deletions pkg/network/tracer/connection/failure/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ package failure

import (
"fmt"
"strconv"
"sync"
"syscall"
"time"

manager "github.com/DataDog/ebpf-manager"
"golang.org/x/sys/unix"

ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/network"
Expand All @@ -25,22 +26,18 @@ import (
)

var (
allowListErrs = map[uint32]struct{}{
ebpf.TCPFailureConnReset: {}, // Connection reset by peer
ebpf.TCPFailureConnTimeout: {}, // Connection timed out
ebpf.TCPFailureConnRefused: {}, // Connection refused
}

telemetryModuleName = "network_tracer__tcp_failure"
mapTTL = 10 * time.Millisecond.Nanoseconds()
)

var failureTelemetry = struct {
failedConnOrphans telemetry.Counter
failedConnMatches telemetry.Counter
failedConnMatches telemetry.Counter
failedConnOrphans telemetry.Counter
failedConnsDropped telemetry.Counter
}{
telemetry.NewCounter(telemetryModuleName, "orphans", []string{}, "Counter measuring the number of orphans after associating failed connections with a closed connection"),
telemetry.NewCounter(telemetryModuleName, "matches", []string{"type"}, "Counter measuring the number of successful matches of failed connections with closed connections"),
telemetry.NewCounter(telemetryModuleName, "orphans", []string{}, "Counter measuring the number of orphans after associating failed connections with a closed connection"),
telemetry.NewCounter(telemetryModuleName, "dropped", []string{}, "Counter measuring the number of dropped failed connections"),
}

// FailedConnStats is a wrapper to help document the purpose of the underlying map
Expand All @@ -61,32 +58,39 @@ type FailedConnMap map[ebpf.ConnTuple]*FailedConnStats

// FailedConns is a struct to hold failed connections
type FailedConns struct {
FailedConnMap map[ebpf.ConnTuple]*FailedConnStats
failureTuple *ebpf.ConnTuple
mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64]
sync.RWMutex
FailedConnMap map[ebpf.ConnTuple]*FailedConnStats
maxFailuresBuffered uint32
failureTuple *ebpf.ConnTuple
mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64]
sync.Mutex
}

// NewFailedConns returns a new FailedConns struct
func NewFailedConns(m *manager.Manager) *FailedConns {
func NewFailedConns(m *manager.Manager, maxFailedConnsBuffered uint32) *FailedConns {
fc := &FailedConns{
FailedConnMap: make(map[ebpf.ConnTuple]*FailedConnStats),
failureTuple: &ebpf.ConnTuple{},
FailedConnMap: make(map[ebpf.ConnTuple]*FailedConnStats),
maxFailuresBuffered: maxFailedConnsBuffered,
failureTuple: &ebpf.ConnTuple{},
}
fc.setupMapCleaner(m)
return fc
}

// upsertConn adds or updates the failed connection in the failed connection map
func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) {
if _, exists := allowListErrs[failedConn.Reason]; !exists {
if fc == nil {
return
}
connTuple := failedConn.Tup

fc.Lock()
defer fc.Unlock()

if len(fc.FailedConnMap) >= int(fc.maxFailuresBuffered) {
failureTelemetry.failedConnsDropped.Inc()
return
}
connTuple := failedConn.Tup

stats, ok := fc.FailedConnMap[connTuple]
if !ok {
stats = &FailedConnStats{
Expand All @@ -101,25 +105,23 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) {

// MatchFailedConn increments the failed connection counters for a given connection based on the failed connection map
func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) {
if fc == nil {
if fc == nil || conn.Type != network.TCP {
return
}
if conn.Type != network.TCP {
return
}
util.ConnStatsToTuple(conn, fc.failureTuple)

fc.RLock()
defer fc.RUnlock()
fc.Lock()
defer fc.Unlock()

util.ConnStatsToTuple(conn, fc.failureTuple)

if failedConn, ok := fc.FailedConnMap[*fc.failureTuple]; ok {
// found matching failed connection
conn.TCPFailures = make(map[uint32]uint32)
conn.TCPFailures = failedConn.CountByErrCode

for errCode, count := range failedConn.CountByErrCode {
failureTelemetry.failedConnMatches.Add(1, strconv.Itoa(int(errCode)))
conn.TCPFailures[errCode] += count
for errCode := range failedConn.CountByErrCode {
failureTelemetry.failedConnMatches.Add(1, unix.ErrnoName(syscall.Errno(errCode)))
}
delete(fc.FailedConnMap, *fc.failureTuple)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/network/tracer/connection/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func NewTracer(config *config.Config, _ telemetryComponent.Component) (Tracer, e
config.TCPFailedConnectionsEnabled = false
}
if config.FailedConnectionsSupported() {
failedConnConsumer = failure.NewFailedConnConsumer(failedConnsHandler, m)
failedConnConsumer = failure.NewFailedConnConsumer(failedConnsHandler, m, config.MaxFailedConnectionsBuffered)
}

tr := &tracer{
Expand Down