Skip to content

Commit 834bc2b

Browse files
committed
feat: implement retry mechanism for log processing
1 parent 13d885f commit 834bc2b

File tree

1 file changed

+175
-68
lines changed

1 file changed

+175
-68
lines changed

logger.go

Lines changed: 175 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,8 @@ type logQueuer struct {
407407
loggerTTL time.Duration
408408
loggers map[string]agentLoggerLifecycle
409409
logCache logCache
410+
411+
retries map[string]*retryState
410412
}
411413

412414
func (l *logQueuer) work(ctx context.Context) {
@@ -427,87 +429,120 @@ func (l *logQueuer) work(ctx context.Context) {
427429
}
428430
}
429431

432+
func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []agentsdk.Log) (agentLoggerLifecycle, error) {
433+
client := agentsdk.New(l.coderURL)
434+
client.SetSessionToken(log.agentToken)
435+
logger := l.logger.With(slog.F("resource_name", log.resourceName))
436+
client.SDK.SetLogger(logger)
437+
438+
_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
439+
ID: sourceUUID,
440+
Icon: "/icon/k8s.png",
441+
DisplayName: "Kubernetes",
442+
})
443+
if err != nil {
444+
// This shouldn't fail sending the log, as it only affects how they
445+
// appear.
446+
logger.Error(ctx, "post log source", slog.Error(err))
447+
l.scheduleRetry(ctx, log.agentToken)
448+
return agentLoggerLifecycle{}, err
449+
}
450+
451+
ls := agentsdk.NewLogSender(logger)
452+
sl := ls.GetScriptLogger(sourceUUID)
453+
454+
gracefulCtx, gracefulCancel := context.WithCancel(context.Background())
455+
456+
// connect to Agent v2.0 API, since we don't need features added later.
457+
// This maximizes compatibility.
458+
arpc, err := client.ConnectRPC20(gracefulCtx)
459+
if err != nil {
460+
logger.Error(ctx, "drpc connect", slog.Error(err))
461+
gracefulCancel()
462+
l.scheduleRetry(ctx, log.agentToken)
463+
return agentLoggerLifecycle{}, err
464+
}
465+
go func() {
466+
err := ls.SendLoop(gracefulCtx, arpc)
467+
// if the send loop exits on its own without the context
468+
// canceling, timeout the logger and force it to recreate.
469+
if err != nil && ctx.Err() == nil {
470+
l.loggerTimeout(log.agentToken)
471+
}
472+
}()
473+
474+
closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
475+
logger.Info(ctx, "logger timeout firing")
476+
l.loggerTimeout(log.agentToken)
477+
})
478+
lifecycle := agentLoggerLifecycle{
479+
scriptLogger: sl,
480+
close: func() {
481+
// We could be stopping for reasons other than the timeout. If
482+
// so, stop the timer.
483+
closeTimer.Stop()
484+
defer gracefulCancel()
485+
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
486+
defer timeout.Stop()
487+
logger.Info(ctx, "logger closing")
488+
489+
if err := sl.Flush(gracefulCtx); err != nil {
490+
// ctx err
491+
logger.Warn(gracefulCtx, "timeout reached while flushing")
492+
return
493+
}
494+
495+
if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
496+
// ctx err
497+
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
498+
}
499+
500+
_ = arpc.DRPCConn().Close()
501+
client.SDK.HTTPClient.CloseIdleConnections()
502+
},
503+
}
504+
lifecycle.closeTimer = closeTimer
505+
return lifecycle, nil
506+
}
507+
430508
func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
431509
l.mu.Lock()
432510
defer l.mu.Unlock()
433-
queuedLogs := l.logCache.push(log)
511+
512+
queuedLogs := l.logCache.get(log.agentToken)
513+
if isAgentLogEmpty(log) {
514+
if queuedLogs == nil {
515+
return
516+
}
517+
} else {
518+
queuedLogs = l.logCache.push(log)
519+
}
520+
434521
lgr, ok := l.loggers[log.agentToken]
435522
if !ok {
436-
client := agentsdk.New(l.coderURL)
437-
client.SetSessionToken(log.agentToken)
438-
logger := l.logger.With(slog.F("resource_name", log.resourceName))
439-
client.SDK.SetLogger(logger)
440-
441-
_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
442-
ID: sourceUUID,
443-
Icon: "/icon/k8s.png",
444-
DisplayName: "Kubernetes",
445-
})
446-
if err != nil {
447-
// This shouldn't fail sending the log, as it only affects how they
448-
// appear.
449-
logger.Error(ctx, "post log source", slog.Error(err))
523+
// skip if we're in a retry cooldown window
524+
if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil {
525+
return
450526
}
451527

452-
ls := agentsdk.NewLogSender(logger)
453-
sl := ls.GetScriptLogger(sourceUUID)
454-
455-
gracefulCtx, gracefulCancel := context.WithCancel(context.Background())
456-
457-
// connect to Agent v2.0 API, since we don't need features added later.
458-
// This maximizes compatibility.
459-
arpc, err := client.ConnectRPC20(gracefulCtx)
528+
var err error
529+
lgr, err = l.newLogger(ctx, log, queuedLogs)
460530
if err != nil {
461-
logger.Error(ctx, "drpc connect", slog.Error(err))
462-
gracefulCancel()
531+
l.scheduleRetry(ctx, log.agentToken)
463532
return
464533
}
465-
go func() {
466-
err := ls.SendLoop(gracefulCtx, arpc)
467-
// if the send loop exits on its own without the context
468-
// canceling, timeout the logger and force it to recreate.
469-
if err != nil && ctx.Err() == nil {
470-
l.loggerTimeout(log.agentToken)
471-
}
472-
}()
473-
474-
closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
475-
logger.Info(ctx, "logger timeout firing")
476-
l.loggerTimeout(log.agentToken)
477-
})
478-
lifecycle := agentLoggerLifecycle{
479-
scriptLogger: sl,
480-
close: func() {
481-
// We could be stopping for reasons other than the timeout. If
482-
// so, stop the timer.
483-
closeTimer.Stop()
484-
defer gracefulCancel()
485-
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
486-
defer timeout.Stop()
487-
logger.Info(ctx, "logger closing")
488-
489-
if err := sl.Flush(gracefulCtx); err != nil {
490-
// ctx err
491-
logger.Warn(gracefulCtx, "timeout reached while flushing")
492-
return
493-
}
494-
495-
if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
496-
// ctx err
497-
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
498-
}
499-
500-
_ = arpc.DRPCConn().Close()
501-
client.SDK.HTTPClient.CloseIdleConnections()
502-
},
503-
}
504-
lifecycle.closeTimer = closeTimer
505-
l.loggers[log.agentToken] = lifecycle
506-
lgr = lifecycle
534+
l.loggers[log.agentToken] = lgr
507535
}
508536

509537
lgr.resetCloseTimer(l.loggerTTL)
510-
_ = lgr.scriptLogger.Send(ctx, queuedLogs...)
538+
if len(queuedLogs) == 0 {
539+
return
540+
}
541+
if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil {
542+
l.scheduleRetry(ctx, log.agentToken)
543+
return
544+
}
545+
l.clearRetry(log.agentToken)
511546
l.logCache.delete(log.agentToken)
512547
}
513548

@@ -518,6 +553,8 @@ func (l *logQueuer) processDelete(log agentLog) {
518553
delete(l.loggers, log.agentToken)
519554

520555
}
556+
l.clearRetry(log.agentToken)
557+
l.logCache.delete(log.agentToken)
521558
l.mu.Unlock()
522559

523560
if ok {
@@ -549,6 +586,64 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
549586
}
550587
}
551588

589+
// retryState tracks exponential backoff for an agent token.
590+
type retryState struct {
591+
delay time.Duration
592+
timer *quartz.Timer
593+
}
594+
595+
func (l *logQueuer) ensureRetryMap() {
596+
if l.retries == nil {
597+
l.retries = make(map[string]*retryState)
598+
}
599+
}
600+
601+
func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
602+
l.ensureRetryMap()
603+
604+
rs := l.retries[token]
605+
if rs == nil {
606+
rs = &retryState{delay: time.Second}
607+
l.retries[token] = rs
608+
}
609+
610+
if rs.timer != nil {
611+
return
612+
}
613+
614+
if rs.delay < time.Second {
615+
rs.delay = time.Second
616+
} else if rs.delay > 30*time.Second {
617+
rs.delay = 30 * time.Second
618+
}
619+
620+
l.logger.Info(ctx, "scheduling retry", slog.F("delay", rs.delay.String()))
621+
622+
rs.timer = l.clock.AfterFunc(rs.delay, func() {
623+
l.mu.Lock()
624+
if cur := l.retries[token]; cur != nil {
625+
cur.timer = nil
626+
}
627+
l.mu.Unlock()
628+
629+
l.q <- agentLog{op: opLog, agentToken: token}
630+
})
631+
632+
rs.delay *= 2
633+
if rs.delay > 30*time.Second {
634+
rs.delay = 30 * time.Second
635+
}
636+
}
637+
638+
func (l *logQueuer) clearRetry(token string) {
639+
if rs := l.retries[token]; rs != nil {
640+
if rs.timer != nil {
641+
rs.timer.Stop()
642+
}
643+
delete(l.retries, token)
644+
}
645+
}
646+
552647
func newColor(value ...color.Attribute) *color.Color {
553648
c := color.New(value...)
554649
c.EnableColor()
@@ -572,3 +667,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
572667
func (l *logCache) delete(token string) {
573668
delete(l.logs, token)
574669
}
670+
671+
func (l *logCache) get(token string) []agentsdk.Log {
672+
logs, ok := l.logs[token]
673+
if !ok {
674+
return nil
675+
}
676+
return logs
677+
}
678+
679+
func isAgentLogEmpty(log agentLog) bool {
680+
return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero()
681+
}

0 commit comments

Comments
 (0)