Skip to content

Commit 3b7987e

Browse files
committed
add log exporter
1 parent 448cc74 commit 3b7987e

File tree

5 files changed

+440
-10
lines changed

5 files changed

+440
-10
lines changed

pkg/engine/engine.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,5 +459,7 @@ func exportCapture(ctx context.Context, capture *xcap.Capture, plan *physical.Pl
459459
return parentID, ok
460460
})
461461

462+
xcap.ExportLog(capture, logger)
463+
462464
return xcap.ExportTrace(ctx, capture, logger)
463465
}

pkg/xcap/aggregation.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,28 @@ type AggregatedObservation struct {
1010
// Record aggregates a new observation into this aggregated observation.
1111
// It updates the value according to the statistic's aggregation type.
1212
func (a *AggregatedObservation) Record(obs Observation) {
13-
stat := obs.statistic()
14-
val := obs.value()
13+
a.aggregate(obs.statistic().Aggregation(), obs.value())
14+
a.Count++
15+
}
16+
17+
// Merge aggregates another AggregatedObservation into this one.
18+
func (a *AggregatedObservation) Merge(other *AggregatedObservation) {
19+
if other == nil {
20+
return
21+
}
22+
a.aggregate(a.Statistic.Aggregation(), other.Value)
23+
a.Count += other.Count
24+
}
1525

16-
switch stat.Aggregation() {
26+
func (a *AggregatedObservation) aggregate(aggType AggregationType, val any) {
27+
switch aggType {
1728
case AggregationTypeSum:
1829
switch v := val.(type) {
1930
case int64:
2031
a.Value = a.Value.(int64) + v
2132
case float64:
2233
a.Value = a.Value.(float64) + v
2334
}
24-
2535
case AggregationTypeMin:
2636
switch v := val.(type) {
2737
case int64:
@@ -33,7 +43,6 @@ func (a *AggregatedObservation) Record(obs Observation) {
3343
a.Value = v
3444
}
3545
}
36-
3746
case AggregationTypeMax:
3847
switch v := val.(type) {
3948
case int64:
@@ -50,16 +59,12 @@ func (a *AggregatedObservation) Record(obs Observation) {
5059
a.Value = v
5160
}
5261
}
53-
5462
case AggregationTypeLast:
55-
// Last value overwrites
5663
a.Value = val
57-
5864
case AggregationTypeFirst:
65+
// Keep the first value, don't update
5966
if a.Value == nil {
6067
a.Value = val
6168
}
6269
}
63-
64-
a.Count++
6570
}

pkg/xcap/exporter.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,73 @@ func observationToAttribute(key StatisticKey, obs *AggregatedObservation) attrib
101101
// Fallback: convert to string
102102
return attrKey.String(fmt.Sprintf("%v", obs.Value))
103103
}
104+
105+
// ExportLog exports a Capture as a structured log line with aggregated statistics.
106+
func ExportLog(capture *Capture, logger log.Logger) {
107+
if capture == nil || logger == nil {
108+
return
109+
}
110+
111+
summary := summarizeObservations(capture)
112+
level.Info(logger).Log(summary.toLogValues()...)
113+
}
114+
115+
// summarizeObservations collects and summarizes observations from the capture.
116+
func summarizeObservations(capture *Capture) *observations {
117+
if capture == nil {
118+
return nil
119+
}
120+
121+
collect := newObservationCollector(capture)
122+
result := newObservations()
123+
124+
// collect observations from all DataObjScan regions. observations from
125+
// child regions are rolled-up to include dataset reader and bucket stats.
126+
// streamView is excluded as it is handled separately below.
127+
result.merge(
128+
collect.fromRegions("DataObjScan", true, "streamsView.init").
129+
filter(
130+
// object store calls
131+
"bucket.get", "bucket.getrange", "bucket.attributes",
132+
// dataset reader stats
133+
"row.max", "rows.after.pruning", "read.calls",
134+
"primary.pages.downloaded", "secondary.pages.downloaded",
135+
"primary.pages.compressed.bytes", "secondary.pages.compressed.bytes",
136+
"primary.rows.read", "secondary.rows.read",
137+
"primary.row.read.bytes", "secondary.row.read.bytes",
138+
"pages.scanned", "pages.cache.hit",
139+
"pages.download.requests", "pages.download.duration.ns",
140+
).
141+
prefix("logs_dataset_").
142+
normalizeKeys(),
143+
)
144+
145+
// metastore index and resolved section stats
146+
result.merge(
147+
collect.fromRegions("ObjectMetastore.Sections", true).
148+
filter("metastore.index.objects", "metastore.resolved.sections").
149+
normalizeKeys(),
150+
)
151+
152+
// metastore bucket and dataset reader stats
153+
result.merge(
154+
collect.fromRegions("ObjectMetastore.Sections", true).
155+
filter("bucket.get", "bucket.getrange", "bucket.attributes",
156+
"primary.pages.downloaded", "secondary.pages.downloaded",
157+
"primary.pages.compressed.bytes", "secondary.pages.compressed.bytes").
158+
prefix("metastore_").
159+
normalizeKeys(),
160+
)
161+
162+
// streamsView bucket and dataset reader stats
163+
result.merge(
164+
collect.fromRegions("streamsView.init", true).
165+
filter("bucket.get", "bucket.getrange", "bucket.attributes",
166+
"primary.pages.downloaded", "secondary.pages.downloaded",
167+
"primary.pages.compressed.bytes", "secondary.pages.compressed.bytes").
168+
prefix("streams_").
169+
normalizeKeys(),
170+
)
171+
172+
return result
173+
}

pkg/xcap/summary.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package xcap
2+
3+
import (
4+
"sort"
5+
"strings"
6+
"time"
7+
8+
"github.com/dustin/go-humanize"
9+
)
10+
11+
// observations holds aggregated observations that can be transformed and merged.
12+
//
13+
// All transformation methods (filter, prefix, normalizeKeys) return new instances,
14+
// leaving the original unchanged.
15+
type observations struct {
16+
data map[StatisticKey]*AggregatedObservation
17+
}
18+
19+
// newObservations creates an empty observations.
20+
func newObservations() *observations {
21+
return &observations{data: make(map[StatisticKey]*AggregatedObservation)}
22+
}
23+
24+
// filter returns a new observations containing only entries with matching stat names.
25+
func (o *observations) filter(names ...string) *observations {
26+
if len(names) == 0 || o == nil {
27+
return o
28+
}
29+
30+
nameSet := make(map[string]struct{}, len(names))
31+
for _, n := range names {
32+
nameSet[n] = struct{}{}
33+
}
34+
35+
result := newObservations()
36+
for k, obs := range o.data {
37+
if _, ok := nameSet[k.Name]; ok {
38+
result.data[k] = obs
39+
}
40+
}
41+
return result
42+
}
43+
44+
// prefix returns a new observations with all stat names prefixed.
45+
func (o *observations) prefix(p string) *observations {
46+
if p == "" || o == nil {
47+
return o
48+
}
49+
50+
result := newObservations()
51+
for k, obs := range o.data {
52+
newKey := StatisticKey{
53+
Name: p + k.Name,
54+
DataType: k.DataType,
55+
Aggregation: k.Aggregation,
56+
}
57+
result.data[newKey] = obs
58+
}
59+
return result
60+
}
61+
62+
// normalizeKeys returns a new observations with dots replaced by underscores in stat names.
63+
func (o *observations) normalizeKeys() *observations {
64+
if o == nil {
65+
return o
66+
}
67+
68+
result := newObservations()
69+
for k, obs := range o.data {
70+
newKey := StatisticKey{
71+
Name: strings.ReplaceAll(k.Name, ".", "_"),
72+
DataType: k.DataType,
73+
Aggregation: k.Aggregation,
74+
}
75+
result.data[newKey] = obs
76+
}
77+
return result
78+
}
79+
80+
// merge merges another observations into this one.
81+
func (o *observations) merge(other *observations) {
82+
if other == nil {
83+
return
84+
}
85+
for k, obs := range other.data {
86+
if existing, ok := o.data[k]; ok {
87+
existing.Merge(obs)
88+
} else {
89+
o.data[k] = &AggregatedObservation{
90+
Statistic: obs.Statistic,
91+
Value: obs.Value,
92+
Count: obs.Count,
93+
}
94+
}
95+
}
96+
}
97+
98+
// ToLogValues converts observations to a slice suitable for go-kit/log.
99+
// Keys are sorted for deterministic output.
100+
func (o *observations) toLogValues() []any {
101+
if o == nil {
102+
return nil
103+
}
104+
105+
// Collect key-value pairs for sorting by name.
106+
type kv struct {
107+
name string
108+
value any
109+
}
110+
pairs := make([]kv, 0, len(o.data))
111+
for k, obs := range o.data {
112+
pairs = append(pairs, kv{name: k.Name, value: obs.Value})
113+
}
114+
sort.Slice(pairs, func(i, j int) bool {
115+
return strings.Compare(pairs[i].name, pairs[j].name) < 0
116+
})
117+
118+
result := make([]any, 0, len(pairs)*2)
119+
for _, p := range pairs {
120+
value := p.value
121+
122+
// Format bytes values (keys ending with "_bytes")
123+
if strings.HasSuffix(p.name, "_bytes") {
124+
switch val := value.(type) {
125+
case uint64:
126+
value = humanize.Bytes(val)
127+
case int64:
128+
value = humanize.Bytes(uint64(val))
129+
}
130+
}
131+
132+
// Format duration values (keys ending with "duration_ns")
133+
if strings.HasSuffix(p.name, "duration_ns") {
134+
switch val := value.(type) {
135+
case int64:
136+
value = time.Duration(val).String()
137+
case uint64:
138+
value = time.Duration(val).String()
139+
}
140+
}
141+
142+
result = append(result, p.name, value)
143+
}
144+
return result
145+
}
146+
147+
// observationCollector provides methods to collect observations from a Capture.
148+
type observationCollector struct {
149+
capture *Capture
150+
childrenMap map[identifier][]*Region
151+
}
152+
153+
// newObservationCollector creates a new collector for gathering observations from the given capture.
154+
func newObservationCollector(capture *Capture) *observationCollector {
155+
if capture == nil {
156+
return nil
157+
}
158+
159+
// Build parent -> children map
160+
childrenMap := make(map[identifier][]*Region)
161+
for _, r := range capture.Regions() {
162+
childrenMap[r.parentID] = append(childrenMap[r.parentID], r)
163+
}
164+
165+
return &observationCollector{
166+
capture: capture,
167+
childrenMap: childrenMap,
168+
}
169+
}
170+
171+
// fromRegions collects observations from regions with the given name.
172+
// If rollUp is true, each region's stats include all its descendant stats
173+
// aggregated according to each stat's aggregation type.
174+
func (c *observationCollector) fromRegions(name string, rollUp bool, excluded ...string) *observations {
175+
if c == nil {
176+
return newObservations()
177+
}
178+
179+
excludedSet := make(map[string]struct{}, len(excluded))
180+
for _, name := range excluded {
181+
excludedSet[name] = struct{}{}
182+
}
183+
184+
result := newObservations()
185+
for _, region := range c.capture.Regions() {
186+
if region.name != name {
187+
continue
188+
}
189+
190+
var obs *observations
191+
if rollUp {
192+
obs = c.rollUpObservations(region, excludedSet)
193+
} else {
194+
obs = c.getRegionObservations(region)
195+
}
196+
197+
result.merge(obs)
198+
}
199+
200+
return result
201+
}
202+
203+
// getRegionObservations returns a copy of a region's observations.
204+
func (c *observationCollector) getRegionObservations(region *Region) *observations {
205+
result := newObservations()
206+
for k, obs := range region.observations {
207+
result.data[k] = &AggregatedObservation{
208+
Statistic: obs.Statistic,
209+
Value: obs.Value,
210+
Count: obs.Count,
211+
}
212+
}
213+
return result
214+
}
215+
216+
// rollUpObservations computes observations for a region including all its descendants.
217+
// Stats are aggregated according to their aggregation type.
218+
func (c *observationCollector) rollUpObservations(region *Region, excludedSet map[string]struct{}) *observations {
219+
result := c.getRegionObservations(region)
220+
221+
// Recursively aggregate from children.
222+
for _, child := range c.childrenMap[region.id] {
223+
// Skip children with excluded names.
224+
if _, excluded := excludedSet[child.name]; excluded {
225+
continue
226+
}
227+
result.merge(c.rollUpObservations(child, excludedSet))
228+
}
229+
230+
return result
231+
}

0 commit comments

Comments
 (0)