Skip to content

Commit c99d83c

Browse files
committed
implement maxRetries for log processing
1 parent dc8bde6 commit c99d83c

File tree

5 files changed

+108
-5
lines changed

5 files changed

+108
-5
lines changed

helm/templates/service.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ spec:
120120
- name: SSL_CERT_DIR
121121
value: {{ .Values.image.sslCertDir }}
122122
{{- end }}
123+
{{- if .Values.maxRetries }}
124+
- name: CODER_MAX_RETRIES
125+
value: {{ .Values.maxRetries }}
126+
{{- end }}
123127
{{- with .Values.securityContext }}
124128
securityContext:
125129
{{- toYaml . | nindent 12 }}

helm/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ url: ""
55
# If unspecified or empty it will watch all namespaces.
66
namespaces: []
77

8+
# maxRetries -- Maximum retry attempts for failed log sends (logs are discarded after this limit)
9+
maxRetries: 10
10+
811
# volumes -- A list of extra volumes to add to the coder-logstream pod.
912
volumes:
1013
# emptyDir: {}

logger.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type podEventLoggerOptions struct {
3434

3535
logger slog.Logger
3636
logDebounce time.Duration
37+
maxRetries int
3738

3839
// The following fields are optional!
3940
namespaces []string
@@ -52,6 +53,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
5253
opts.clock = quartz.NewReal()
5354
}
5455

56+
if opts.maxRetries == 0 {
57+
opts.maxRetries = 10
58+
}
59+
5560
logCh := make(chan agentLog, 512)
5661
ctx, cancelFunc := context.WithCancel(ctx)
5762
reporter := &podEventLogger{
@@ -75,6 +80,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
7580
logCache: logCache{
7681
logs: map[string][]agentsdk.Log{},
7782
},
83+
maxRetries: opts.maxRetries,
7884
},
7985
}
8086

@@ -408,7 +414,8 @@ type logQueuer struct {
408414
loggers map[string]agentLoggerLifecycle
409415
logCache logCache
410416

411-
retries map[string]*retryState
417+
retries map[string]*retryState
418+
maxRetries int
412419
}
413420

414421
func (l *logQueuer) work(ctx context.Context) {
@@ -588,8 +595,9 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
588595

589596
// retryState tracks exponential backoff for an agent token.
590597
type retryState struct {
591-
delay time.Duration
592-
timer *quartz.Timer
598+
delay time.Duration
599+
timer *quartz.Timer
600+
retryCount int
593601
}
594602

595603
func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
@@ -603,6 +611,18 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
603611
l.retries[token] = rs
604612
}
605613

614+
rs.retryCount++
615+
616+
// If we've reached the max retries, clear the retry state and delete the log cache.
617+
if rs.retryCount >= l.maxRetries {
618+
l.logger.Error(ctx, "max retries exceeded",
619+
slog.F("retryCount", rs.retryCount),
620+
slog.F("maxRetries", l.maxRetries))
621+
l.clearRetry(token)
622+
l.logCache.delete(token)
623+
return
624+
}
625+
606626
if rs.timer != nil {
607627
return
608628
}
@@ -613,7 +633,9 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
613633
rs.delay = 30 * time.Second
614634
}
615635

616-
l.logger.Info(ctx, "scheduling retry", slog.F("delay", rs.delay.String()))
636+
l.logger.Info(ctx, "scheduling retry",
637+
slog.F("delay", rs.delay.String()),
638+
slog.F("retryCount", rs.retryCount))
617639

618640
rs.timer = l.clock.AfterFunc(rs.delay, func() {
619641
l.mu.Lock()

logger_test.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,24 +594,30 @@ func Test_logQueuer(t *testing.T) {
594594
logCache: logCache{
595595
logs: map[string][]agentsdk.Log{},
596596
},
597+
maxRetries: 10,
597598
}
598599

599600
ctx := context.Background()
600601
token := "test-token"
601602

602603
// Set up a retry state with a large delay
603604
lq.retries = make(map[string]*retryState)
604-
lq.retries[token] = &retryState{delay: 20 * time.Second}
605+
lq.retries[token] = &retryState{
606+
delay: 20 * time.Second,
607+
retryCount: 0,
608+
}
605609

606610
// Schedule a retry - should cap at 30 seconds
607611
lq.scheduleRetry(ctx, token)
608612

609613
rs := lq.retries[token]
614+
require.NotNil(t, rs)
610615
require.Equal(t, 30*time.Second, rs.delay)
611616

612617
// Schedule another retry - should stay at 30 seconds
613618
lq.scheduleRetry(ctx, token)
614619
rs = lq.retries[token]
620+
require.NotNil(t, rs)
615621
require.Equal(t, 30*time.Second, rs.delay)
616622
})
617623

@@ -627,6 +633,7 @@ func Test_logQueuer(t *testing.T) {
627633
logCache: logCache{
628634
logs: map[string][]agentsdk.Log{},
629635
},
636+
maxRetries: 2,
630637
}
631638

632639
ctx := context.Background()
@@ -640,6 +647,64 @@ func Test_logQueuer(t *testing.T) {
640647
lq.clearRetry(token)
641648
require.Nil(t, lq.retries[token])
642649
})
650+
651+
t.Run("MaxRetries", func(t *testing.T) {
652+
t.Parallel()
653+
654+
// Create a failing API that will reject connections
655+
failingAPI := newFailingAgentAPI(t)
656+
agentURL, err := url.Parse(failingAPI.server.URL)
657+
require.NoError(t, err)
658+
clock := quartz.NewMock(t)
659+
ttl := time.Second
660+
661+
ch := make(chan agentLog, 10)
662+
logger := slogtest.Make(t, &slogtest.Options{
663+
IgnoreErrors: true,
664+
})
665+
lq := &logQueuer{
666+
logger: logger,
667+
clock: clock,
668+
q: ch,
669+
coderURL: agentURL,
670+
loggerTTL: ttl,
671+
loggers: map[string]agentLoggerLifecycle{},
672+
logCache: logCache{
673+
logs: map[string][]agentsdk.Log{},
674+
},
675+
maxRetries: 2,
676+
}
677+
678+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
679+
defer cancel()
680+
go lq.work(ctx)
681+
682+
token := "max-retry-token"
683+
ch <- agentLog{
684+
op: opLog,
685+
resourceName: "hello",
686+
agentToken: token,
687+
log: agentsdk.Log{
688+
CreatedAt: time.Now(),
689+
Output: "This is a log.",
690+
Level: codersdk.LogLevelInfo,
691+
},
692+
}
693+
694+
// Wait for retry state to be cleared after exceeding maxRetries
695+
require.Eventually(t, func() bool {
696+
lq.mu.Lock()
697+
defer lq.mu.Unlock()
698+
rs := lq.retries[token]
699+
return rs == nil
700+
}, testutil.WaitShort, testutil.IntervalFast)
701+
702+
// Verify cache is also cleared
703+
lq.mu.Lock()
704+
cachedLogs := lq.logCache.get(token)
705+
lq.mu.Unlock()
706+
require.Nil(t, cachedLogs)
707+
})
643708
}
644709

645710
func Test_logCache(t *testing.T) {

main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net/url"
77
"os"
8+
"strconv"
89
"strings"
910

1011
"cdr.dev/slog"
@@ -30,6 +31,7 @@ func root() *cobra.Command {
3031
kubeConfig string
3132
namespacesStr string
3233
labelSelector string
34+
maxRetriesStr string
3335
)
3436
cmd := &cobra.Command{
3537
Use: "coder-logstream-kube",
@@ -72,13 +74,19 @@ func root() *cobra.Command {
7274
}
7375
}
7476

77+
maxRetries, err := strconv.Atoi(maxRetriesStr)
78+
if err != nil {
79+
return fmt.Errorf("parse max retries: %w", err)
80+
}
81+
7582
reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{
7683
coderURL: parsedURL,
7784
client: client,
7885
namespaces: namespaces,
7986
fieldSelector: fieldSelector,
8087
labelSelector: labelSelector,
8188
logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug),
89+
maxRetries: maxRetries,
8290
})
8391
if err != nil {
8492
return fmt.Errorf("create pod event reporter: %w", err)
@@ -97,6 +105,7 @@ func root() *cobra.Command {
97105
cmd.Flags().StringVarP(&namespacesStr, "namespaces", "n", os.Getenv("CODER_NAMESPACES"), "List of namespaces to use when listing pods")
98106
cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods")
99107
cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods")
108+
cmd.Flags().StringVarP(&maxRetriesStr, "max-retries", "m", os.Getenv("CODER_MAX_RETRIES"), "Maximum retry attempts for failed log sends (logs are discarded after this limit)")
100109

101110
return cmd
102111
}

0 commit comments

Comments
 (0)