Skip to content

Commit a2df7fa

Browse files
authored
feat: add telemetry scheduler (#1107)
1 parent 99f304e commit a2df7fa

30 files changed

+2195
-238
lines changed

.codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ coverage:
1616
informational: true
1717
ignore:
1818
- "log_fallback.go"
19+
- "internal/testutils"

client.go

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import (
1515

1616
"github.com/getsentry/sentry-go/internal/debug"
1717
"github.com/getsentry/sentry-go/internal/debuglog"
18+
httpInternal "github.com/getsentry/sentry-go/internal/http"
19+
"github.com/getsentry/sentry-go/internal/protocol"
20+
"github.com/getsentry/sentry-go/internal/ratelimit"
21+
"github.com/getsentry/sentry-go/internal/telemetry"
1822
)
1923

2024
// The identifier of the SDK.
@@ -252,6 +256,8 @@ type ClientOptions struct {
252256
// IMPORTANT: to not ignore any status codes, the option should be an empty slice and not nil. The nil option is
253257
// used for defaulting to 404 ignores.
254258
TraceIgnoreStatusCodes [][]int
259+
// DisableTelemetryBuffer disables the telemetry buffer layer for prioritizing events and uses the old transport layer.
260+
DisableTelemetryBuffer bool
255261
}
256262

257263
// Client is the underlying processor that is used by the main API and Hub
@@ -266,8 +272,9 @@ type Client struct {
266272
sdkVersion string
267273
// Transport is read-only. Replacing the transport of an existing client is
268274
// not supported, create a new client instead.
269-
Transport Transport
270-
batchLogger *BatchLogger
275+
Transport Transport
276+
batchLogger *BatchLogger
277+
telemetryBuffer *telemetry.Buffer
271278
}
272279

273280
// NewClient creates and returns an instance of Client configured using
@@ -371,12 +378,15 @@ func NewClient(options ClientOptions) (*Client, error) {
371378
sdkVersion: SDKVersion,
372379
}
373380

374-
if options.EnableLogs {
381+
client.setupTransport()
382+
383+
if !options.DisableTelemetryBuffer {
384+
client.setupTelemetryBuffer()
385+
} else if options.EnableLogs {
375386
client.batchLogger = NewBatchLogger(&client)
376387
client.batchLogger.Start()
377388
}
378389

379-
client.setupTransport()
380390
client.setupIntegrations()
381391

382392
return &client, nil
@@ -398,6 +408,52 @@ func (client *Client) setupTransport() {
398408
client.Transport = transport
399409
}
400410

411+
func (client *Client) setupTelemetryBuffer() {
412+
if client.options.DisableTelemetryBuffer {
413+
return
414+
}
415+
416+
if client.dsn == nil {
417+
debuglog.Println("Telemetry buffer disabled: no DSN configured")
418+
return
419+
}
420+
421+
// We currently disallow using custom Transport with the new Telemetry Buffer, due to the difference in transport signatures.
422+
// The option should be enabled when the new Transport interface signature changes.
423+
if client.options.Transport != nil {
424+
debuglog.Println("Cannot enable Telemetry Buffer with custom Transport: fallback to old transport")
425+
if client.options.EnableLogs {
426+
client.batchLogger = NewBatchLogger(client)
427+
client.batchLogger.Start()
428+
}
429+
return
430+
}
431+
432+
transport := httpInternal.NewAsyncTransport(httpInternal.TransportOptions{
433+
Dsn: client.options.Dsn,
434+
HTTPClient: client.options.HTTPClient,
435+
HTTPTransport: client.options.HTTPTransport,
436+
HTTPProxy: client.options.HTTPProxy,
437+
HTTPSProxy: client.options.HTTPSProxy,
438+
CaCerts: client.options.CaCerts,
439+
})
440+
client.Transport = &internalAsyncTransportAdapter{transport: transport}
441+
442+
storage := map[ratelimit.Category]telemetry.Storage[protocol.EnvelopeItemConvertible]{
443+
ratelimit.CategoryError: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
444+
ratelimit.CategoryTransaction: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0),
445+
ratelimit.CategoryLog: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryLog, 10*100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second),
446+
ratelimit.CategoryMonitor: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0),
447+
}
448+
449+
sdkInfo := &protocol.SdkInfo{
450+
Name: client.sdkIdentifier,
451+
Version: client.sdkVersion,
452+
}
453+
454+
client.telemetryBuffer = telemetry.NewBuffer(storage, transport, &client.dsn.Dsn, sdkInfo)
455+
}
456+
401457
func (client *Client) setupIntegrations() {
402458
integrations := []Integration{
403459
new(contextifyFramesIntegration),
@@ -538,7 +594,7 @@ func (client *Client) RecoverWithContext(
538594
// the network synchronously, configure it to use the HTTPSyncTransport in the
539595
// call to Init.
540596
func (client *Client) Flush(timeout time.Duration) bool {
541-
if client.batchLogger != nil {
597+
if client.batchLogger != nil || client.telemetryBuffer != nil {
542598
ctx, cancel := context.WithTimeout(context.Background(), timeout)
543599
defer cancel()
544600
return client.FlushWithContext(ctx)
@@ -562,6 +618,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
562618
if client.batchLogger != nil {
563619
client.batchLogger.Flush(ctx.Done())
564620
}
621+
if client.telemetryBuffer != nil {
622+
return client.telemetryBuffer.FlushWithContext(ctx)
623+
}
565624
return client.Transport.FlushWithContext(ctx)
566625
}
567626

@@ -570,6 +629,12 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
570629
// Close should be called after Flush and before terminating the program
571630
// otherwise some events may be lost.
572631
func (client *Client) Close() {
632+
if client.telemetryBuffer != nil {
633+
client.telemetryBuffer.Close(5 * time.Second)
634+
}
635+
if client.batchLogger != nil {
636+
client.batchLogger.Shutdown()
637+
}
573638
client.Transport.Close()
574639
}
575640

@@ -690,7 +755,13 @@ func (client *Client) processEvent(event *Event, hint *EventHint, scope EventMod
690755
}
691756
}
692757

693-
client.Transport.SendEvent(event)
758+
if client.telemetryBuffer != nil {
759+
if !client.telemetryBuffer.Add(event) {
760+
debuglog.Println("Event dropped: telemetry buffer full or unavailable")
761+
}
762+
} else {
763+
client.Transport.SendEvent(event)
764+
}
694765

695766
return &event.EventID
696767
}

client_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sentry
22

33
import (
4+
"bytes"
45
"context"
56
"encoding/json"
67
"errors"
@@ -12,6 +13,7 @@ import (
1213
"testing"
1314
"time"
1415

16+
"github.com/getsentry/sentry-go/internal/debuglog"
1517
"github.com/google/go-cmp/cmp"
1618
"github.com/google/go-cmp/cmp/cmpopts"
1719
pkgErrors "github.com/pkg/errors"
@@ -49,6 +51,7 @@ func setupClientTest() (*Client, *MockScope, *MockTransport) {
4951
client, _ := NewClient(ClientOptions{
5052
Dsn: "http://[email protected]/1337",
5153
Transport: transport,
54+
// keep default buffers enabled
5255
Integrations: func(_ []Integration) []Integration {
5356
return []Integration{}
5457
},
@@ -1032,3 +1035,43 @@ func TestClientSetsUpTransport(t *testing.T) {
10321035
client, _ = NewClient(ClientOptions{})
10331036
require.IsType(t, &noopTransport{}, client.Transport)
10341037
}
1038+
1039+
func TestClient_SetupTelemetryBuffer_WithDSN(t *testing.T) {
1040+
client, err := NewClient(ClientOptions{
1041+
Dsn: "https://public@localhost/1",
1042+
})
1043+
if err != nil {
1044+
t.Fatalf("unexpected error: %v", err)
1045+
}
1046+
1047+
if client.telemetryBuffer == nil {
1048+
t.Fatal("expected telemetryBuffer to be initialized")
1049+
}
1050+
1051+
if _, ok := client.Transport.(*internalAsyncTransportAdapter); !ok {
1052+
t.Fatalf("expected internalAsyncTransportAdapter, got %T", client.Transport)
1053+
}
1054+
1055+
if !client.telemetryBuffer.Add(NewEvent()) {
1056+
t.Fatal("expected Add to succeed with default buffers")
1057+
}
1058+
}
1059+
1060+
func TestClient_SetupTelemetryBuffer_NoDSN(t *testing.T) {
1061+
var buf bytes.Buffer
1062+
debuglog.SetOutput(&buf)
1063+
defer debuglog.SetOutput(&bytes.Buffer{})
1064+
1065+
client, err := NewClient(ClientOptions{})
1066+
if err != nil {
1067+
t.Fatalf("unexpected error: %v", err)
1068+
}
1069+
1070+
if client.telemetryBuffer != nil {
1071+
t.Fatal("expected telemetryBuffer to be nil when DSN is missing")
1072+
}
1073+
1074+
if _, ok := client.Transport.(*noopTransport); !ok {
1075+
t.Fatalf("expected noopTransport, got %T", client.Transport)
1076+
}
1077+
}

hub_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/getsentry/sentry-go/internal/protocol"
1213
"github.com/google/go-cmp/cmp"
1314
"github.com/google/go-cmp/cmp/cmpopts"
1415
)
@@ -177,9 +178,9 @@ func TestConfigureScope(t *testing.T) {
177178
}
178179

179180
func TestLastEventID(t *testing.T) {
180-
uuid := EventID(uuid())
181-
hub := &Hub{lastEventID: uuid}
182-
assertEqual(t, uuid, hub.LastEventID())
181+
eventID := EventID(protocol.GenerateEventID())
182+
hub := &Hub{lastEventID: eventID}
183+
assertEqual(t, eventID, hub.LastEventID())
183184
}
184185

185186
func TestLastEventIDUpdatesAfterCaptures(t *testing.T) {

interfaces.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,70 @@ func (e *Event) ToEnvelopeWithTime(dsn *protocol.Dsn, sentAt time.Time) (*protoc
493493
return envelope, nil
494494
}
495495

496+
// ToEnvelopeItem converts the Event to a Sentry envelope item.
497+
func (e *Event) ToEnvelopeItem() (*protocol.EnvelopeItem, error) {
498+
eventBody, err := json.Marshal(e)
499+
if err != nil {
500+
// Try fallback: remove problematic fields and retry
501+
e.Breadcrumbs = nil
502+
e.Contexts = nil
503+
e.Extra = map[string]interface{}{
504+
"info": fmt.Sprintf("Could not encode original event as JSON. "+
505+
"Succeeded by removing Breadcrumbs, Contexts and Extra. "+
506+
"Please verify the data you attach to the scope. "+
507+
"Error: %s", err),
508+
}
509+
510+
eventBody, err = json.Marshal(e)
511+
if err != nil {
512+
return nil, fmt.Errorf("event could not be marshaled even with fallback: %w", err)
513+
}
514+
515+
DebugLogger.Printf("Event marshaling succeeded with fallback after removing problematic fields")
516+
}
517+
518+
// TODO: all event types should be abstracted to implement EnvelopeItemConvertible and convert themselves.
519+
var item *protocol.EnvelopeItem
520+
switch e.Type {
521+
case transactionType:
522+
item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeTransaction, eventBody)
523+
case checkInType:
524+
item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeCheckIn, eventBody)
525+
case logEvent.Type:
526+
item = protocol.NewLogItem(len(e.Logs), eventBody)
527+
default:
528+
item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeEvent, eventBody)
529+
}
530+
531+
return item, nil
532+
}
533+
534+
// GetCategory returns the rate limit category for this event.
535+
func (e *Event) GetCategory() ratelimit.Category {
536+
return e.toCategory()
537+
}
538+
539+
// GetEventID returns the event ID.
540+
func (e *Event) GetEventID() string {
541+
return string(e.EventID)
542+
}
543+
544+
// GetSdkInfo returns SDK information for the envelope header.
545+
func (e *Event) GetSdkInfo() *protocol.SdkInfo {
546+
return &e.Sdk
547+
}
548+
549+
// GetDynamicSamplingContext returns trace context for the envelope header.
550+
func (e *Event) GetDynamicSamplingContext() map[string]string {
551+
trace := make(map[string]string)
552+
if dsc := e.sdkMetaData.dsc; dsc.HasEntries() {
553+
for k, v := range dsc.Entries {
554+
trace[k] = v
555+
}
556+
}
557+
return trace
558+
}
559+
496560
// TODO: Event.Contexts map[string]interface{} => map[string]EventContext,
497561
// to prevent accidentally storing T when we mean *T.
498562
// For example, the TraceContext must be stored as *TraceContext to pick up the
@@ -667,6 +731,69 @@ type Log struct {
667731
Attributes map[string]Attribute `json:"attributes,omitempty"`
668732
}
669733

734+
// ToEnvelopeItem converts the Log to a Sentry envelope item for batching.
735+
func (l *Log) ToEnvelopeItem() (*protocol.EnvelopeItem, error) {
736+
type logJSON struct {
737+
Timestamp *float64 `json:"timestamp,omitempty"`
738+
TraceID string `json:"trace_id,omitempty"`
739+
Level string `json:"level"`
740+
Severity int `json:"severity_number,omitempty"`
741+
Body string `json:"body,omitempty"`
742+
Attributes map[string]protocol.LogAttribute `json:"attributes,omitempty"`
743+
}
744+
745+
// Convert time.Time to seconds float if set
746+
var ts *float64
747+
if !l.Timestamp.IsZero() {
748+
sec := float64(l.Timestamp.UnixNano()) / 1e9
749+
ts = &sec
750+
}
751+
752+
attrs := make(map[string]protocol.LogAttribute, len(l.Attributes))
753+
for k, v := range l.Attributes {
754+
attrs[k] = protocol.LogAttribute{Value: v.Value, Type: string(v.Type)}
755+
}
756+
757+
logData, err := json.Marshal(logJSON{
758+
Timestamp: ts,
759+
TraceID: l.TraceID.String(),
760+
Level: string(l.Level),
761+
Severity: l.Severity,
762+
Body: l.Body,
763+
Attributes: attrs,
764+
})
765+
if err != nil {
766+
return nil, err
767+
}
768+
769+
return &protocol.EnvelopeItem{
770+
Header: &protocol.EnvelopeItemHeader{
771+
Type: protocol.EnvelopeItemTypeLog,
772+
},
773+
Payload: logData,
774+
}, nil
775+
}
776+
777+
// GetCategory returns the rate limit category for logs.
778+
func (l *Log) GetCategory() ratelimit.Category {
779+
return ratelimit.CategoryLog
780+
}
781+
782+
// GetEventID returns empty string (event ID set when batching).
783+
func (l *Log) GetEventID() string {
784+
return ""
785+
}
786+
787+
// GetSdkInfo returns nil (SDK info set when batching).
788+
func (l *Log) GetSdkInfo() *protocol.SdkInfo {
789+
return nil
790+
}
791+
792+
// GetDynamicSamplingContext returns nil (trace context set when batching).
793+
func (l *Log) GetDynamicSamplingContext() map[string]string {
794+
return nil
795+
}
796+
670797
type AttrType string
671798

672799
const (

0 commit comments

Comments
 (0)