Skip to content

Commit 11c2eda

Browse files
committed
Support to rewrite span kind
1 parent 9377090 commit 11c2eda

File tree

10 files changed

+138
-21
lines changed

10 files changed

+138
-21
lines changed

pkg/aliyunlog/config/config.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type Configuration struct {
4747
MaxQueryDuration time.Duration
4848
InitResourceFlag bool
4949
TagAppendRuleFile string
50+
KindRewriteRuleFile string
5051
}
5152

5253
// LogstoreBuilder creates new sls.ClientInterface
@@ -85,10 +86,13 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
8586
if c.TagAppendRuleFile == "" {
8687
c.TagAppendRuleFile = source.TagAppendRuleFile
8788
}
89+
if c.KindRewriteRuleFile == "" {
90+
c.KindRewriteRuleFile = source.KindRewriteRuleFile
91+
}
8892
}
8993

9094
// NewClient return client, project, logstore, error
91-
func (c *Configuration) NewClient(logstoreType LogstoreType) (client sls.ClientInterface, project string, logstore string, aggLogstore string, initResourceFlag bool, tagAppendRuleFile string, err error) {
95+
func (c *Configuration) NewClient(logstoreType LogstoreType) (client sls.ClientInterface, project string, logstore string, aggLogstore string, initResourceFlag bool, tagAppendRuleFile string, kindRewriteRuleFile string,err error) {
9296
if c.AliCloudK8SFlag {
9397
shutdown := make(chan struct{}, 1)
9498
for i := 0; i < initEcsTokenTryMax; i++ {
@@ -98,7 +102,7 @@ func (c *Configuration) NewClient(logstoreType LogstoreType) (client sls.ClientI
98102
}
99103
}
100104
if err != nil {
101-
return nil, "", "", "", true, "", err
105+
return nil, "", "", "", true, "", "", err
102106
}
103107

104108
} else {
@@ -107,9 +111,9 @@ func (c *Configuration) NewClient(logstoreType LogstoreType) (client sls.ClientI
107111
// @todo set user agent
108112
//p.UserAgent = userAgent
109113
if logstoreType == SpanType {
110-
return client, c.Project, c.SpanLogstore, c.SpanAggLogstore, c.InitResourceFlag, c.TagAppendRuleFile, nil
114+
return client, c.Project, c.SpanLogstore, c.SpanAggLogstore, c.InitResourceFlag, c.TagAppendRuleFile, c.KindRewriteRuleFile,nil
111115
}
112-
return client, c.Project, c.DependencyLogstore, c.SpanAggLogstore, c.InitResourceFlag, c.TagAppendRuleFile, nil
116+
return client, c.Project, c.DependencyLogstore, c.SpanAggLogstore, c.InitResourceFlag, c.TagAppendRuleFile, c.KindRewriteRuleFile,nil
113117
}
114118

115119
func (c *Configuration) GetMaxQueryDuration() time.Duration {

plugin/storage/aliyunlog/options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
suffixMaxQueryDuration = ".max-query-duration"
3434
suffixInitResourceFlag = ".init-resource-flag"
3535
suffixTagAppenderRule = ".tag-appender-rules"
36+
suffixKindRewriteRule = ".kind-rewrite-rules"
3637
)
3738

3839
// Options contains various type of AliCloud Log Service configs and provides the ability
@@ -65,6 +66,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
6566
MaxQueryDuration: 24 * time.Hour,
6667
InitResourceFlag: true,
6768
TagAppendRuleFile: "",
69+
KindRewriteRuleFile: "",
6870
},
6971
namespace: primaryNamespace,
7072
},
@@ -127,6 +129,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
127129
nsConfig.namespace+suffixTagAppenderRule,
128130
nsConfig.TagAppendRuleFile,
129131
"The file of rule which appending tag to span.")
132+
flagSet.String(
133+
nsConfig.namespace+suffixKindRewriteRule,
134+
nsConfig.KindRewriteRuleFile,
135+
"The file of rule which rewriting span kind.")
130136
}
131137

132138
// InitFromViper initializes Options with properties from viper
@@ -148,6 +154,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
148154
cfg.MaxQueryDuration = v.GetDuration(cfg.namespace + suffixMaxQueryDuration)
149155
cfg.InitResourceFlag = v.GetBool(cfg.namespace + suffixInitResourceFlag)
150156
cfg.TagAppendRuleFile = v.GetString(cfg.namespace + suffixTagAppenderRule)
157+
cfg.KindRewriteRuleFile = v.GetString(cfg.namespace + suffixKindRewriteRule)
151158
}
152159

153160
// GetPrimary returns primary configuration.

plugin/storage/aliyunlogotel/factory.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Factory struct {
4545
depLogstore string
4646
initResourceFlag bool
4747
tagAppendRuleFile string
48+
kindRewriteRuleFile string
4849
}
4950

5051
// NewFactory creates a new Factory.
@@ -69,7 +70,7 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
6970
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
7071
f.metricsFactory, f.logger = metricsFactory, logger
7172
var err error
72-
f.client, f.spanProject, f.spanLogstore, f.spanAggLogstore, f.initResourceFlag, f.tagAppendRuleFile, err = f.primaryConfig.NewClient(config.SpanType)
73+
f.client, f.spanProject, f.spanLogstore, f.spanAggLogstore, f.initResourceFlag, f.tagAppendRuleFile,f.kindRewriteRuleFile, err = f.primaryConfig.NewClient(config.SpanType)
7374
if err != nil {
7475
return err
7576
}
@@ -113,7 +114,8 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
113114
f.initResourceFlag,
114115
f.logger,
115116
f.metricsFactory,
116-
f.tagAppendRuleFile)
117+
f.tagAppendRuleFile,
118+
f.kindRewriteRuleFile)
117119
}
118120

119121
// CreateDependencyReader implements storage.Factory

plugin/storage/aliyunlogotel/options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const (
3434
suffixMaxQueryDuration = ".max-query-duration"
3535
suffixInitResourceFlag = ".init-resource-flag"
3636
suffixTagAppenderRule = ".tag-appender-rules"
37+
suffixKindRewriteRule = ".kind-rewrite-rules"
3738
)
3839

3940
// Options contains various type of AliCloud Log Service configs and provides the ability
@@ -66,6 +67,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
6667
MaxQueryDuration: 24 * time.Hour,
6768
InitResourceFlag: true,
6869
TagAppendRuleFile: "",
70+
KindRewriteRuleFile: "",
6971
},
7072
namespace: primaryNamespace,
7173
},
@@ -132,6 +134,12 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
132134
nsConfig.namespace+suffixTagAppenderRule,
133135
nsConfig.TagAppendRuleFile,
134136
"The file of rule which appending tag to span.")
137+
138+
flagSet.String(
139+
nsConfig.namespace+suffixKindRewriteRule,
140+
nsConfig.KindRewriteRuleFile,
141+
"The file of rule which rewrite span kind.")
142+
135143
}
136144

137145
// InitFromViper initializes Options with properties from viper
@@ -154,6 +162,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
154162
cfg.MaxQueryDuration = v.GetDuration(cfg.namespace + suffixMaxQueryDuration)
155163
cfg.InitResourceFlag = v.GetBool(cfg.namespace + suffixInitResourceFlag)
156164
cfg.TagAppendRuleFile = v.GetString(cfg.namespace + suffixTagAppenderRule)
165+
cfg.KindRewriteRuleFile = v.GetString(cfg.namespace + suffixKindRewriteRule)
157166
}
158167

159168
// GetPrimary returns primary configuration.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package spanstore
2+
3+
import (
4+
"encoding/json"
5+
"go.uber.org/zap"
6+
"io/ioutil"
7+
"os"
8+
)
9+
10+
type KindRewriteRules interface {
11+
SpanKindRules() map[string]string
12+
OperationPrefixRules() map[string]string
13+
}
14+
15+
type kindRules struct {
16+
SpanTagRule map[string]string
17+
OperationNameRules map[string]string
18+
}
19+
20+
func (t kindRules) SpanKindRules() map[string]string {
21+
return t.SpanTagRule
22+
}
23+
24+
func (t kindRules) OperationPrefixRules() map[string]string {
25+
return t.OperationNameRules
26+
}
27+
28+
func initKindRewriteRules(ruleFile string) KindRewriteRules {
29+
var spanTagsRewriteRules = map[string]string{
30+
"db.instance": "client",
31+
"redis.key": "client",
32+
}
33+
34+
var operationPrefixRewriteRules = map[string]string{
35+
"elastic-POST": "client",
36+
}
37+
38+
data := initKindRewriteRule(ruleFile)
39+
40+
if d, ok := data[OperationWithPrefix]; ok {
41+
for k, v := range d {
42+
operationPrefixRewriteRules[k] = v
43+
}
44+
}
45+
46+
if d, ok := data[Tags]; ok {
47+
for k, v := range d {
48+
spanTagsRewriteRules[k] = v
49+
}
50+
}
51+
52+
d, _ := json.Marshal(spanTagsRewriteRules)
53+
d2, _ := json.Marshal(operationPrefixRewriteRules)
54+
logger.Info("The tag rewrite rules.", zap.String("KindRewriteRules", string(d)), zap.String("OperationNamePrefixAppendRules", string(d2)))
55+
56+
return &kindRules{
57+
SpanTagRule: spanTagsRewriteRules,
58+
OperationNameRules: operationPrefixRewriteRules,
59+
}
60+
}
61+
62+
func initKindRewriteRule(kindAppenderRuleFile string) map[string]map[string]string {
63+
if kindAppenderRuleFile != "" {
64+
if file, err := os.Open(kindAppenderRuleFile); err == nil {
65+
if data, e := ioutil.ReadAll(file); e == nil {
66+
logger.Info("The context of kind rewrite rule file.", zap.String("content", string(data)))
67+
tagMapping := make(map[string]map[string]string)
68+
if e1 := json.Unmarshal(data, &tagMapping); e1 == nil {
69+
return tagMapping
70+
} else {
71+
logger.Warn("Failed to pared the kind rewrite rule.", zap.Error(e1))
72+
}
73+
} else {
74+
logger.Warn("Failed to read the kind rewrite rule.", zap.Error(e))
75+
}
76+
} else {
77+
logger.Warn("the kind rewrite rule file is not exist.", zap.Error(err))
78+
}
79+
}
80+
return nil
81+
}

plugin/storage/aliyunlogotel/spanstore/additional_tag_appender.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func initTagAppendRules(ruleFile string) TagAppendRules {
5353

5454
if d, ok := data[OperationWithPrefix]; ok {
5555
for k, v := range d {
56-
spanTagsAppendRules[k] = v
56+
operationPrefixAppendRules[k] = v
5757
}
5858
}
5959

plugin/storage/aliyunlogotel/spanstore/converter.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import (
2828
)
2929

3030
// FromSpan converts a model.Span to a log record
31-
func FromSpan(span *model.Span, topic, source string, file TagAppendRules) (*sls.LogGroup, error) {
32-
return converter{}.fromSpan(span, topic, source, file)
31+
func FromSpan(span *model.Span, topic, source string, file TagAppendRules, rule KindRewriteRules) (*sls.LogGroup, error) {
32+
return converter{}.fromSpan(span, topic, source, file, rule)
3333
}
3434

3535
// ToSpan converts a log record to a model.Span
@@ -44,8 +44,8 @@ func ToTraces(logs []map[string]string) ([]*model.Trace, error) {
4444

4545
type converter struct{}
4646

47-
func (c converter) fromSpan(span *model.Span, topic, source string, file TagAppendRules) (*sls.LogGroup, error) {
48-
logs, err := c.fromSpanToLogs(span, file)
47+
func (c converter) fromSpan(span *model.Span, topic, source string, file TagAppendRules, rule KindRewriteRules) (*sls.LogGroup, error) {
48+
logs, err := c.fromSpanToLogs(span, file, rule)
4949
if err != nil {
5050
return nil, err
5151
}
@@ -56,8 +56,8 @@ func (c converter) fromSpan(span *model.Span, topic, source string, file TagAppe
5656
}, nil
5757
}
5858

59-
func (c converter) fromSpanToLogs(span *model.Span, file TagAppendRules) ([]*sls.Log, error) {
60-
contents, err := c.fromSpanToLogContents(span, file)
59+
func (c converter) fromSpanToLogs(span *model.Span, file TagAppendRules, rule KindRewriteRules) ([]*sls.Log, error) {
60+
contents, err := c.fromSpanToLogContents(span, file, rule)
6161
if err != nil {
6262
return nil, err
6363
}
@@ -73,7 +73,7 @@ func TraceIDToString(t *model.TraceID) string {
7373
return fmt.Sprintf("%016x%016x", t.High, t.Low)
7474
}
7575

76-
func (c converter) fromSpanToLogContents(span *model.Span, rules TagAppendRules) ([]*sls.LogContent, error) {
76+
func (c converter) fromSpanToLogContents(span *model.Span, tagAppendRules TagAppendRules, kindRewriteRules KindRewriteRules) ([]*sls.LogContent, error) {
7777
contents := make([]*sls.LogContent, 0)
7878
contents = c.appendContents(contents, traceIDField, TraceIDToString(&span.TraceID))
7979
contents = c.appendContents(contents, spanIDField, span.SpanID.String())
@@ -88,18 +88,28 @@ func (c converter) fromSpanToLogContents(span *model.Span, rules TagAppendRules)
8888

8989
attributeMap := make(map[string]string)
9090
for _, tag := range span.Tags {
91-
if k, ok := rules.SpanTagRules()[tag.Key]; ok {
91+
if k, ok := tagAppendRules.SpanTagRules()[tag.Key]; ok {
9292
attributeMap[k.TagKey] = k.TagValue
9393
}
9494
attributeMap[tag.Key] = tag.AsString()
95+
96+
if k, ok := kindRewriteRules.SpanKindRules()[tag.Key]; ok {
97+
c.appendContents(contents, spanKindField, k)
98+
}
9599
}
96100

97-
for key, value := range rules.OperationPrefixRules() {
101+
for key, value := range tagAppendRules.OperationPrefixRules() {
98102
if strings.HasPrefix(span.OperationName, key) {
99103
attributeMap[value.TagKey] = value.TagValue
100104
}
101105
}
102106

107+
for key, value := range kindRewriteRules.OperationPrefixRules() {
108+
if strings.HasPrefix(span.OperationName, key) {
109+
c.appendContents(contents, spanKindField, value)
110+
}
111+
}
112+
103113
tagStr, _ := json.Marshal(attributeMap)
104114
contents = c.appendContents(contents, "attribute", string(tagStr))
105115

plugin/storage/aliyunlogotel/spanstore/converter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func TestToSpan(t *testing.T) {
194194

195195
func TestFromSpan(t *testing.T) {
196196
span := getTestJaegerSpan()
197-
logGroup, err := FromSpan(span, "topic", "0.0.0.0", nil)
197+
logGroup, err := FromSpan(span, "topic", "0.0.0.0", nil, nil)
198198
assert.Nil(t, err)
199199
assert.Equal(t, "topic", *logGroup.Topic)
200200
assert.Equal(t, "0.0.0.0", *logGroup.Source)

plugin/storage/aliyunlogotel/spanstore/reader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const (
4545
warningsField = "statusMessage"
4646
serviceNameField = "service"
4747
processTagsPrefix = "process.tags."
48+
spanKindField = "kind"
4849

4950
defaultServiceLimit = 1000
5051
defaultOperationLimit = 1000

plugin/storage/aliyunlogotel/spanstore/writer.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ type SpanWriter struct {
3939
logger *zap.Logger
4040
writerMetrics spanWriterMetrics
4141
producer *producer.Producer
42-
appendTagRuleFile TagAppendRules
42+
appendTagRuleFile TagAppendRules
43+
rewriteKindRuleFile KindRewriteRules
4344
}
4445

45-
func NewSpanWriter(client sls.ClientInterface, project string, logstore string, initResourceFlag bool, logger *zap.Logger, metricsFactory metrics.Factory, file string) (*SpanWriter, error) {
46+
func NewSpanWriter(client sls.ClientInterface, project string, logstore string, initResourceFlag bool, logger *zap.Logger, metricsFactory metrics.Factory, appendTagFile string, rewriteKindFile string) (*SpanWriter, error) {
4647
ctx := context.Background()
4748

4849
if initResourceFlag {
@@ -65,7 +66,8 @@ func NewSpanWriter(client sls.ClientInterface, project string, logstore string,
6566
producerConfig.Endpoint = newClient.Endpoint
6667
producerInstance := producer.InitProducer(producerConfig)
6768
producerInstance.Start()
68-
appendTagRuleFile := initTagAppendRules(file);
69+
appendTagRuleFile := initTagAppendRules(appendTagFile);
70+
rewriteKindRuleFile := initKindRewriteRules(rewriteKindFile);
6971

7072
return &SpanWriter{
7173
ctx: ctx,
@@ -75,6 +77,7 @@ func NewSpanWriter(client sls.ClientInterface, project string, logstore string,
7577
logger: logger,
7678
producer: producerInstance,
7779
appendTagRuleFile: appendTagRuleFile,
80+
rewriteKindRuleFile: rewriteKindRuleFile,
7881
writerMetrics: spanWriterMetrics{
7982
putLogs: storageMetrics.NewWriteMetrics(metricsFactory, "putLogs"),
8083
},
@@ -87,7 +90,7 @@ func (s *SpanWriter) Close() error {
8790
}
8891

8992
func (s *SpanWriter) WriteSpan(span *model.Span) error {
90-
logGroup, err := FromSpan(span, "", "0.0.0.0", s.appendTagRuleFile)
93+
logGroup, err := FromSpan(span, "", "0.0.0.0", s.appendTagRuleFile, s.rewriteKindRuleFile)
9194
if err != nil {
9295
s.logError(span, err, "Failed to convert span to logGroup", s.logger)
9396
}

0 commit comments

Comments
 (0)