From de632dcd90df79c86e24572c70ed49fd8aff8f18 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 15:34:01 -0400 Subject: [PATCH 01/14] optimize memory allocation in failure path --- .../tracer/connection/failure/matching.go | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 074b7ea56bce4d..dd438cb6047dbb 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -22,6 +22,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) var ( @@ -33,6 +34,8 @@ var ( telemetryModuleName = "network_tracer__tcp_failure" mapTTL = 10 * time.Millisecond.Nanoseconds() + + tuplePool = ddsync.NewDefaultTypedPool[ebpf.ConnTuple]() ) var failureTelemetry = struct { @@ -49,6 +52,12 @@ type FailedConnStats struct { Expiry int64 } +func (t FailedConnStats) reset() { + for k := range t.CountByErrCode { + delete(t.CountByErrCode, k) + } +} + // String returns a string representation of the failedConnStats func (t FailedConnStats) String() string { return fmt.Sprintf( @@ -64,6 +73,7 @@ type FailedConns struct { FailedConnMap map[ebpf.ConnTuple]*FailedConnStats failureTuple *ebpf.ConnTuple mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64] + pool *ddsync.TypedPool[FailedConnStats] sync.RWMutex } @@ -72,6 +82,11 @@ func NewFailedConns(m *manager.Manager) *FailedConns { fc := &FailedConns{ FailedConnMap: make(map[ebpf.ConnTuple]*FailedConnStats), failureTuple: &ebpf.ConnTuple{}, + pool: ddsync.NewTypedPool(func() *FailedConnStats { + return &FailedConnStats{ + CountByErrCode: make(map[uint32]uint32), + } + }), } fc.setupMapCleaner(m) return fc @@ -89,9 +104,8 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { stats, ok := fc.FailedConnMap[connTuple] if !ok { - stats = &FailedConnStats{ - CountByErrCode: make(map[uint32]uint32), - } + stats = fc.pool.Get() + stats.reset() fc.FailedConnMap[connTuple] = stats } @@ -110,9 +124,11 @@ func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { util.ConnStatsToTuple(conn, fc.failureTuple) fc.RLock() - defer fc.RUnlock() + foundMatch := false + var failedConn *FailedConnStats if failedConn, ok := fc.FailedConnMap[*fc.failureTuple]; ok { + foundMatch = true // found matching failed connection conn.TCPFailures = make(map[uint32]uint32) @@ -121,6 +137,13 @@ func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { conn.TCPFailures[errCode] += count } } + fc.RUnlock() + if foundMatch { + fc.Lock() + delete(fc.FailedConnMap, *fc.failureTuple) + fc.pool.Put(failedConn) + fc.Unlock() + } } // RemoveExpired removes expired failed connections from the map From 0ee6f5e95a679ddcc715762380c00c86be7be146 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 16:12:31 -0400 Subject: [PATCH 02/14] add new config for max failures buffered --- cmd/system-probe/config/adjust_npm.go | 8 ++++++ pkg/config/setup/system_probe.go | 1 + pkg/network/config/config.go | 5 ++++ pkg/network/config/config_test.go | 17 +++++++++++ .../failure/failed_conn_consumer.go | 4 +-- .../tracer/connection/failure/matching.go | 28 ++++++++++++------- pkg/network/tracer/connection/tracer.go | 2 +- 7 files changed, 52 insertions(+), 13 deletions(-) diff --git a/cmd/system-probe/config/adjust_npm.go b/cmd/system-probe/config/adjust_npm.go index acabe451b564cc..789076fac7239b 100644 --- a/cmd/system-probe/config/adjust_npm.go +++ b/cmd/system-probe/config/adjust_npm.go @@ -65,6 +65,14 @@ func adjustNetwork(cfg config.Config) { return nil }) limitMaxInt64(cfg, spNS("max_closed_connections_buffered"), math.MaxUint32) + // also ensure that max_failed_connections_buffered is equal to max_tracked_connections if the former is not set + validateInt64(cfg, spNS("max_failed_connections_buffered"), cfg.GetInt64(spNS("max_tracked_connections")), func(v int64) error { + if v <= 0 { + return fmt.Errorf("must be a positive value") + } + return nil + }) + limitMaxInt64(cfg, spNS("max_failed_connections_buffered"), math.MaxUint32) limitMaxInt(cfg, spNS("offset_guess_threshold"), maxOffsetThreshold) diff --git a/pkg/config/setup/system_probe.go b/pkg/config/setup/system_probe.go index c35ef3477cc5a9..ef7ed827080bee 100644 --- a/pkg/config/setup/system_probe.go +++ b/pkg/config/setup/system_probe.go @@ -181,6 +181,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) { cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536) cfg.BindEnv(join(spNS, "max_closed_connections_buffered")) + cfg.BindEnv(join(spNS, "max_failed_connections_buffered")) cfg.BindEnvAndSetDefault(join(spNS, "closed_connection_flush_threshold"), 0) cfg.BindEnvAndSetDefault(join(spNS, "closed_channel_size"), 500) cfg.BindEnvAndSetDefault(join(spNS, "max_connection_state_buffered"), 75000) diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index 8aa1366e385d17..695e5736cdd614 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -165,6 +165,10 @@ type Config struct { // get flushed on every client request (default 30s check interval) MaxClosedConnectionsBuffered uint32 + // MaxClosedConnectionsBuffered represents the maximum number of closed connections we'll buffer in memory. These closed connections + // get flushed on every client request (default 30s check interval) + MaxFailedConnectionsBuffered uint32 + // ClosedConnectionFlushThreshold represents the number of closed connections stored before signalling // the agent to flush the connections. This value only valid on Windows ClosedConnectionFlushThreshold int @@ -340,6 +344,7 @@ func New() *Config { TCPFailedConnectionsEnabled: cfg.GetBool(join(netNS, "enable_tcp_failed_connections")), MaxTrackedConnections: uint32(cfg.GetInt64(join(spNS, "max_tracked_connections"))), MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(join(spNS, "max_closed_connections_buffered"))), + MaxFailedConnectionsBuffered: uint32(cfg.GetInt64(join(spNS, "max_failed_connections_buffered"))), ClosedConnectionFlushThreshold: cfg.GetInt(join(spNS, "closed_connection_flush_threshold")), ClosedChannelSize: cfg.GetInt(join(spNS, "closed_channel_size")), MaxConnectionsStateBuffered: cfg.GetInt(join(spNS, "max_connection_state_buffered")), diff --git a/pkg/network/config/config_test.go b/pkg/network/config/config_test.go index b7134305c180e4..bce6d472476ae5 100644 --- a/pkg/network/config/config_test.go +++ b/pkg/network/config/config_test.go @@ -1181,6 +1181,23 @@ func TestMaxClosedConnectionsBuffered(t *testing.T) { }) } +func TestMaxFailedConnectionsBuffered(t *testing.T) { + maxTrackedConnections := New().MaxTrackedConnections + + t.Run("value set", func(t *testing.T) { + aconfig.ResetSystemProbeConfig(t) + t.Setenv("DD_SYSTEM_PROBE_CONFIG_MAX_FAILED_CONNECTIONS_BUFFERED", fmt.Sprintf("%d", maxTrackedConnections-1)) + cfg := New() + require.Equal(t, maxTrackedConnections-1, cfg.MaxFailedConnectionsBuffered) + }) + + t.Run("value not set", func(t *testing.T) { + aconfig.ResetSystemProbeConfig(t) + cfg := New() + require.Equal(t, cfg.MaxTrackedConnections, cfg.MaxFailedConnectionsBuffered) + }) +} + func TestMaxHTTPStatsBuffered(t *testing.T) { t.Run("via deprecated YAML", func(t *testing.T) { aconfig.ResetSystemProbeConfig(t) diff --git a/pkg/network/tracer/connection/failure/failed_conn_consumer.go b/pkg/network/tracer/connection/failure/failed_conn_consumer.go index 2a3358d2f5bd55..b82dbd5afc76bf 100644 --- a/pkg/network/tracer/connection/failure/failed_conn_consumer.go +++ b/pkg/network/tracer/connection/failure/failed_conn_consumer.go @@ -40,11 +40,11 @@ type TCPFailedConnConsumer struct { } // NewFailedConnConsumer creates a new TCPFailedConnConsumer -func NewFailedConnConsumer(eventHandler ddebpf.EventHandler, m *manager.Manager) *TCPFailedConnConsumer { +func NewFailedConnConsumer(eventHandler ddebpf.EventHandler, m *manager.Manager, maxFailedConnsBuffered uint32) *TCPFailedConnConsumer { return &TCPFailedConnConsumer{ eventHandler: eventHandler, closed: make(chan struct{}), - FailedConns: NewFailedConns(m), + FailedConns: NewFailedConns(m, maxFailedConnsBuffered), } } diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index dd438cb6047dbb..188caf5df2bdce 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -39,11 +39,13 @@ var ( ) var failureTelemetry = struct { - failedConnOrphans telemetry.Counter - failedConnMatches telemetry.Counter + failedConnMatches telemetry.Counter + failedConnOrphans telemetry.Counter + failedConnsDropped telemetry.Counter }{ - telemetry.NewCounter(telemetryModuleName, "orphans", []string{}, "Counter measuring the number of orphans after associating failed connections with a closed connection"), telemetry.NewCounter(telemetryModuleName, "matches", []string{"type"}, "Counter measuring the number of successful matches of failed connections with closed connections"), + telemetry.NewCounter(telemetryModuleName, "orphans", []string{}, "Counter measuring the number of orphans after associating failed connections with a closed connection"), + telemetry.NewCounter(telemetryModuleName, "dropped", []string{}, "Counter measuring the number of dropped failed connections"), } // FailedConnStats is a wrapper to help document the purpose of the underlying map @@ -70,18 +72,20 @@ type FailedConnMap map[ebpf.ConnTuple]*FailedConnStats // FailedConns is a struct to hold failed connections type FailedConns struct { - FailedConnMap map[ebpf.ConnTuple]*FailedConnStats - failureTuple *ebpf.ConnTuple - mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64] - pool *ddsync.TypedPool[FailedConnStats] + FailedConnMap map[ebpf.ConnTuple]*FailedConnStats + maxFailuresBuffered uint32 + failureTuple *ebpf.ConnTuple + mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64] + pool *ddsync.TypedPool[FailedConnStats] sync.RWMutex } // NewFailedConns returns a new FailedConns struct -func NewFailedConns(m *manager.Manager) *FailedConns { +func NewFailedConns(m *manager.Manager, maxFailedConnsBuffered uint32) *FailedConns { fc := &FailedConns{ - FailedConnMap: make(map[ebpf.ConnTuple]*FailedConnStats), - failureTuple: &ebpf.ConnTuple{}, + FailedConnMap: make(map[ebpf.ConnTuple]*FailedConnStats), + maxFailuresBuffered: maxFailedConnsBuffered, + failureTuple: &ebpf.ConnTuple{}, pool: ddsync.NewTypedPool(func() *FailedConnStats { return &FailedConnStats{ CountByErrCode: make(map[uint32]uint32), @@ -97,6 +101,10 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { if _, exists := allowListErrs[failedConn.Reason]; !exists { return } + if len(fc.FailedConnMap) >= int(fc.maxFailuresBuffered) { + failureTelemetry.failedConnsDropped.Inc() + return + } connTuple := failedConn.Tup fc.Lock() diff --git a/pkg/network/tracer/connection/tracer.go b/pkg/network/tracer/connection/tracer.go index e308327e38de53..e3e3efd2c770e1 100644 --- a/pkg/network/tracer/connection/tracer.go +++ b/pkg/network/tracer/connection/tracer.go @@ -291,7 +291,7 @@ func NewTracer(config *config.Config, _ telemetryComponent.Component) (Tracer, e config.TCPFailedConnectionsEnabled = false } if config.FailedConnectionsSupported() { - failedConnConsumer = failure.NewFailedConnConsumer(failedConnsHandler, m) + failedConnConsumer = failure.NewFailedConnConsumer(failedConnsHandler, m, config.MaxFailedConnectionsBuffered) } tr := &tracer{ From d01e843595779fda6776130dd2e080a3da5d64ea Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 16:15:11 -0400 Subject: [PATCH 03/14] rename stats pool --- pkg/network/tracer/connection/failure/matching.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 188caf5df2bdce..34eb7a21fca43b 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -34,8 +34,6 @@ var ( telemetryModuleName = "network_tracer__tcp_failure" mapTTL = 10 * time.Millisecond.Nanoseconds() - - tuplePool = ddsync.NewDefaultTypedPool[ebpf.ConnTuple]() ) var failureTelemetry = struct { @@ -76,7 +74,7 @@ type FailedConns struct { maxFailuresBuffered uint32 failureTuple *ebpf.ConnTuple mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64] - pool *ddsync.TypedPool[FailedConnStats] + statsPool *ddsync.TypedPool[FailedConnStats] sync.RWMutex } @@ -86,7 +84,7 @@ func NewFailedConns(m *manager.Manager, maxFailedConnsBuffered uint32) *FailedCo FailedConnMap: make(map[ebpf.ConnTuple]*FailedConnStats), maxFailuresBuffered: maxFailedConnsBuffered, failureTuple: &ebpf.ConnTuple{}, - pool: ddsync.NewTypedPool(func() *FailedConnStats { + statsPool: ddsync.NewTypedPool(func() *FailedConnStats { return &FailedConnStats{ CountByErrCode: make(map[uint32]uint32), } @@ -112,7 +110,7 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { stats, ok := fc.FailedConnMap[connTuple] if !ok { - stats = fc.pool.Get() + stats = fc.statsPool.Get() stats.reset() fc.FailedConnMap[connTuple] = stats } @@ -149,7 +147,7 @@ func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { if foundMatch { fc.Lock() delete(fc.FailedConnMap, *fc.failureTuple) - fc.pool.Put(failedConn) + fc.statsPool.Put(failedConn) fc.Unlock() } } From e2800bc03331688918e315f0b783837263ae9083 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 16:16:43 -0400 Subject: [PATCH 04/14] move comment --- pkg/network/tracer/connection/failure/matching.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 34eb7a21fca43b..bbff9d22ecc587 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -134,8 +134,8 @@ func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { var failedConn *FailedConnStats if failedConn, ok := fc.FailedConnMap[*fc.failureTuple]; ok { - foundMatch = true // found matching failed connection + foundMatch = true conn.TCPFailures = make(map[uint32]uint32) for errCode, count := range failedConn.CountByErrCode { From 4bf94023d47076ed5290a6cea804f46c374e31a8 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 16:18:37 -0400 Subject: [PATCH 05/14] remove double check for allowed failures. this is done in eBPF now --- pkg/network/tracer/connection/failure/matching.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index bbff9d22ecc587..aa8de533b7ac31 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -26,12 +26,6 @@ import ( ) var ( - allowListErrs = map[uint32]struct{}{ - ebpf.TCPFailureConnReset: {}, // Connection reset by peer - ebpf.TCPFailureConnTimeout: {}, // Connection timed out - ebpf.TCPFailureConnRefused: {}, // Connection refused - } - telemetryModuleName = "network_tracer__tcp_failure" mapTTL = 10 * time.Millisecond.Nanoseconds() ) @@ -96,9 +90,6 @@ func NewFailedConns(m *manager.Manager, maxFailedConnsBuffered uint32) *FailedCo // upsertConn adds or updates the failed connection in the failed connection map func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { - if _, exists := allowListErrs[failedConn.Reason]; !exists { - return - } if len(fc.FailedConnMap) >= int(fc.maxFailuresBuffered) { failureTelemetry.failedConnsDropped.Inc() return From 83b802d38e42e3a9d050547cf737a7f59f11e2e7 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 16:22:26 -0400 Subject: [PATCH 06/14] fix comment --- pkg/network/config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index 695e5736cdd614..e949970ecf39ae 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -165,8 +165,8 @@ type Config struct { // get flushed on every client request (default 30s check interval) MaxClosedConnectionsBuffered uint32 - // MaxClosedConnectionsBuffered represents the maximum number of closed connections we'll buffer in memory. These closed connections - // get flushed on every client request (default 30s check interval) + // MaxFailedConnectionsBuffered represents the maximum number of failed connections we'll buffer in memory. These connections will be + // removed from memory as they are matched to closed connections MaxFailedConnectionsBuffered uint32 // ClosedConnectionFlushThreshold represents the number of closed connections stored before signalling From 76915c3a91335bde3d91cd5dd593bd35a8b7d013 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 17:25:10 -0400 Subject: [PATCH 07/14] get rid of sync pool, assign failure map directly to conn map and delete failure entry on match --- .../tracer/connection/failure/matching.go | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index aa8de533b7ac31..078da41dd86623 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -9,11 +9,12 @@ package failure import ( "fmt" - "strconv" "sync" + "syscall" "time" manager "github.com/DataDog/ebpf-manager" + "golang.org/x/sys/unix" ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/network" @@ -22,7 +23,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" - ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) var ( @@ -68,7 +68,6 @@ type FailedConns struct { maxFailuresBuffered uint32 failureTuple *ebpf.ConnTuple mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64] - statsPool *ddsync.TypedPool[FailedConnStats] sync.RWMutex } @@ -78,11 +77,6 @@ func NewFailedConns(m *manager.Manager, maxFailedConnsBuffered uint32) *FailedCo FailedConnMap: make(map[ebpf.ConnTuple]*FailedConnStats), maxFailuresBuffered: maxFailedConnsBuffered, failureTuple: &ebpf.ConnTuple{}, - statsPool: ddsync.NewTypedPool(func() *FailedConnStats { - return &FailedConnStats{ - CountByErrCode: make(map[uint32]uint32), - } - }), } fc.setupMapCleaner(m) return fc @@ -101,8 +95,9 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { stats, ok := fc.FailedConnMap[connTuple] if !ok { - stats = fc.statsPool.Get() - stats.reset() + stats = &FailedConnStats{ + CountByErrCode: make(map[uint32]uint32), + } fc.FailedConnMap[connTuple] = stats } @@ -122,23 +117,20 @@ func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { fc.RLock() foundMatch := false - var failedConn *FailedConnStats if failedConn, ok := fc.FailedConnMap[*fc.failureTuple]; ok { // found matching failed connection foundMatch = true - conn.TCPFailures = make(map[uint32]uint32) + conn.TCPFailures = failedConn.CountByErrCode - for errCode, count := range failedConn.CountByErrCode { - failureTelemetry.failedConnMatches.Add(1, strconv.Itoa(int(errCode))) - conn.TCPFailures[errCode] += count + for errCode := range failedConn.CountByErrCode { + failureTelemetry.failedConnMatches.Add(1, unix.ErrnoName(syscall.Errno(errCode))) } } fc.RUnlock() if foundMatch { fc.Lock() delete(fc.FailedConnMap, *fc.failureTuple) - fc.statsPool.Put(failedConn) fc.Unlock() } } From 16f12fe6222eb84d6cae6748df6b9283abf7a3b5 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 17:26:10 -0400 Subject: [PATCH 08/14] remove unused method --- pkg/network/tracer/connection/failure/matching.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 078da41dd86623..3ebcc0a7b989a6 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -46,12 +46,6 @@ type FailedConnStats struct { Expiry int64 } -func (t FailedConnStats) reset() { - for k := range t.CountByErrCode { - delete(t.CountByErrCode, k) - } -} - // String returns a string representation of the failedConnStats func (t FailedConnStats) String() string { return fmt.Sprintf( From 262e28f9a30a2c30ac0499f8a875b8a9a7412df3 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 17:31:25 -0400 Subject: [PATCH 09/14] assign map entry to nil after passing it to connection object --- pkg/network/tracer/connection/failure/matching.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 3ebcc0a7b989a6..400c9b578c2264 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -120,6 +120,7 @@ func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { for errCode := range failedConn.CountByErrCode { failureTelemetry.failedConnMatches.Add(1, unix.ErrnoName(syscall.Errno(errCode))) } + failedConn.CountByErrCode = nil } fc.RUnlock() if foundMatch { From f139cf9d1ba7d36ff8b694f9f0c0c863da1e4b07 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Wed, 31 Jul 2024 19:49:01 -0400 Subject: [PATCH 10/14] use write lock for matching --- .../tracer/connection/failure/matching.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 400c9b578c2264..58753bc03c021d 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -101,32 +101,22 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { // MatchFailedConn increments the failed connection counters for a given connection based on the failed connection map func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { - if fc == nil { - return - } - if conn.Type != network.TCP { + if fc == nil || conn.Type != network.TCP { return } util.ConnStatsToTuple(conn, fc.failureTuple) - fc.RLock() - foundMatch := false + fc.Lock() + defer fc.Unlock() if failedConn, ok := fc.FailedConnMap[*fc.failureTuple]; ok { // found matching failed connection - foundMatch = true conn.TCPFailures = failedConn.CountByErrCode for errCode := range failedConn.CountByErrCode { failureTelemetry.failedConnMatches.Add(1, unix.ErrnoName(syscall.Errno(errCode))) } - failedConn.CountByErrCode = nil - } - fc.RUnlock() - if foundMatch { - fc.Lock() delete(fc.FailedConnMap, *fc.failureTuple) - fc.Unlock() } } From 3a6f3a8ff4b98faa222fe3c098dd9e42a588034d Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Thu, 1 Aug 2024 09:39:00 -0400 Subject: [PATCH 11/14] move configs to network ns, use errors.new --- cmd/system-probe/config/adjust_npm.go | 11 ++++++----- pkg/config/setup/system_probe.go | 2 +- pkg/network/config/config.go | 2 +- pkg/network/config/config_test.go | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cmd/system-probe/config/adjust_npm.go b/cmd/system-probe/config/adjust_npm.go index 789076fac7239b..58cfe1c3ac8a3e 100644 --- a/cmd/system-probe/config/adjust_npm.go +++ b/cmd/system-probe/config/adjust_npm.go @@ -6,6 +6,7 @@ package config import ( + "errors" "fmt" "math" "runtime" @@ -50,7 +51,7 @@ func adjustNetwork(cfg config.Config) { validateInt64(cfg, spNS("max_tracked_connections"), defaultMaxTrackedConnections, func(v int64) error { if v <= 0 { - return fmt.Errorf("must be a positive value") + return errors.New("must be a positive value") } return nil }) @@ -60,19 +61,19 @@ func adjustNetwork(cfg config.Config) { // closed connections in environments with mostly short-lived connections validateInt64(cfg, spNS("max_closed_connections_buffered"), cfg.GetInt64(spNS("max_tracked_connections")), func(v int64) error { if v <= 0 { - return fmt.Errorf("must be a positive value") + return errors.New("must be a positive value") } return nil }) limitMaxInt64(cfg, spNS("max_closed_connections_buffered"), math.MaxUint32) // also ensure that max_failed_connections_buffered is equal to max_tracked_connections if the former is not set - validateInt64(cfg, spNS("max_failed_connections_buffered"), cfg.GetInt64(spNS("max_tracked_connections")), func(v int64) error { + validateInt64(cfg, netNS("max_failed_connections_buffered"), cfg.GetInt64(spNS("max_tracked_connections")), func(v int64) error { if v <= 0 { - return fmt.Errorf("must be a positive value") + return errors.New("must be a positive value") } return nil }) - limitMaxInt64(cfg, spNS("max_failed_connections_buffered"), math.MaxUint32) + limitMaxInt64(cfg, netNS("max_failed_connections_buffered"), math.MaxUint32) limitMaxInt(cfg, spNS("offset_guess_threshold"), maxOffsetThreshold) diff --git a/pkg/config/setup/system_probe.go b/pkg/config/setup/system_probe.go index ef7ed827080bee..21870d884151c7 100644 --- a/pkg/config/setup/system_probe.go +++ b/pkg/config/setup/system_probe.go @@ -181,7 +181,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) { cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536) cfg.BindEnv(join(spNS, "max_closed_connections_buffered")) - cfg.BindEnv(join(spNS, "max_failed_connections_buffered")) + cfg.BindEnv(join(netNS, "max_failed_connections_buffered")) cfg.BindEnvAndSetDefault(join(spNS, "closed_connection_flush_threshold"), 0) cfg.BindEnvAndSetDefault(join(spNS, "closed_channel_size"), 500) cfg.BindEnvAndSetDefault(join(spNS, "max_connection_state_buffered"), 75000) diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index e949970ecf39ae..4689ebf0f27b68 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -344,7 +344,7 @@ func New() *Config { TCPFailedConnectionsEnabled: cfg.GetBool(join(netNS, "enable_tcp_failed_connections")), MaxTrackedConnections: uint32(cfg.GetInt64(join(spNS, "max_tracked_connections"))), MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(join(spNS, "max_closed_connections_buffered"))), - MaxFailedConnectionsBuffered: uint32(cfg.GetInt64(join(spNS, "max_failed_connections_buffered"))), + MaxFailedConnectionsBuffered: uint32(cfg.GetInt64(join(netNS, "max_failed_connections_buffered"))), ClosedConnectionFlushThreshold: cfg.GetInt(join(spNS, "closed_connection_flush_threshold")), ClosedChannelSize: cfg.GetInt(join(spNS, "closed_channel_size")), MaxConnectionsStateBuffered: cfg.GetInt(join(spNS, "max_connection_state_buffered")), diff --git a/pkg/network/config/config_test.go b/pkg/network/config/config_test.go index bce6d472476ae5..f1abf3879398fb 100644 --- a/pkg/network/config/config_test.go +++ b/pkg/network/config/config_test.go @@ -1186,7 +1186,7 @@ func TestMaxFailedConnectionsBuffered(t *testing.T) { t.Run("value set", func(t *testing.T) { aconfig.ResetSystemProbeConfig(t) - t.Setenv("DD_SYSTEM_PROBE_CONFIG_MAX_FAILED_CONNECTIONS_BUFFERED", fmt.Sprintf("%d", maxTrackedConnections-1)) + t.Setenv("DD_NETWORK_CONFIG_MAX_FAILED_CONNECTIONS_BUFFERED", fmt.Sprintf("%d", maxTrackedConnections-1)) cfg := New() require.Equal(t, maxTrackedConnections-1, cfg.MaxFailedConnectionsBuffered) }) From 1635cadd8b911c733a7dd0f89bd4485b14b155a0 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Thu, 1 Aug 2024 14:07:11 -0400 Subject: [PATCH 12/14] use standard mutex --- pkg/network/tracer/connection/failure/matching.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 58753bc03c021d..8447824ec7db40 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -62,7 +62,7 @@ type FailedConns struct { maxFailuresBuffered uint32 failureTuple *ebpf.ConnTuple mapCleaner *ddebpf.MapCleaner[ebpf.ConnTuple, int64] - sync.RWMutex + sync.Mutex } // NewFailedConns returns a new FailedConns struct @@ -78,15 +78,15 @@ func NewFailedConns(m *manager.Manager, maxFailedConnsBuffered uint32) *FailedCo // upsertConn adds or updates the failed connection in the failed connection map func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { + fc.Lock() + defer fc.Unlock() + if len(fc.FailedConnMap) >= int(fc.maxFailuresBuffered) { failureTelemetry.failedConnsDropped.Inc() return } connTuple := failedConn.Tup - fc.Lock() - defer fc.Unlock() - stats, ok := fc.FailedConnMap[connTuple] if !ok { stats = &FailedConnStats{ @@ -101,13 +101,14 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { // MatchFailedConn increments the failed connection counters for a given connection based on the failed connection map func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { + fc.Lock() + defer fc.Unlock() + if fc == nil || conn.Type != network.TCP { return } - util.ConnStatsToTuple(conn, fc.failureTuple) - fc.Lock() - defer fc.Unlock() + util.ConnStatsToTuple(conn, fc.failureTuple) if failedConn, ok := fc.FailedConnMap[*fc.failureTuple]; ok { // found matching failed connection From 6fdc8d1a85880565996ace73fce14faaecb098d5 Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Thu, 1 Aug 2024 14:20:25 -0400 Subject: [PATCH 13/14] fix locks --- pkg/network/tracer/connection/failure/matching.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 8447824ec7db40..70c91d4dc61ed7 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -78,6 +78,10 @@ func NewFailedConns(m *manager.Manager, maxFailedConnsBuffered uint32) *FailedCo // upsertConn adds or updates the failed connection in the failed connection map func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { + if fc == nil { + return + } + fc.Lock() defer fc.Unlock() @@ -101,10 +105,14 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { // MatchFailedConn increments the failed connection counters for a given connection based on the failed connection map func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { + if fc == nil { + return + } + fc.Lock() defer fc.Unlock() - if fc == nil || conn.Type != network.TCP { + if conn.Type != network.TCP { return } From 5f5f6424c81a7f85289a960a3dad6c2375c6b74c Mon Sep 17 00:00:00 2001 From: Adam Karpowich Date: Thu, 1 Aug 2024 14:25:21 -0400 Subject: [PATCH 14/14] reorg checks --- pkg/network/tracer/connection/failure/matching.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/network/tracer/connection/failure/matching.go b/pkg/network/tracer/connection/failure/matching.go index 70c91d4dc61ed7..41d894af81ad92 100644 --- a/pkg/network/tracer/connection/failure/matching.go +++ b/pkg/network/tracer/connection/failure/matching.go @@ -105,17 +105,13 @@ func (fc *FailedConns) upsertConn(failedConn *ebpf.FailedConn) { // MatchFailedConn increments the failed connection counters for a given connection based on the failed connection map func (fc *FailedConns) MatchFailedConn(conn *network.ConnectionStats) { - if fc == nil { + if fc == nil || conn.Type != network.TCP { return } fc.Lock() defer fc.Unlock() - if conn.Type != network.TCP { - return - } - util.ConnStatsToTuple(conn, fc.failureTuple) if failedConn, ok := fc.FailedConnMap[*fc.failureTuple]; ok {