diff --git a/connector/dynamicroutingconnector/Makefile b/connector/dynamicroutingconnector/Makefile new file mode 100644 index 000000000..ded7a3609 --- /dev/null +++ b/connector/dynamicroutingconnector/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/connector/dynamicroutingconnector/config.go b/connector/dynamicroutingconnector/config.go new file mode 100644 index 000000000..2b43b5fe2 --- /dev/null +++ b/connector/dynamicroutingconnector/config.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector" + +import ( + "errors" + "sort" + "time" + + "go.opentelemetry.io/collector/pipeline" +) + +type Config struct { + // TODO(lahsivjar): Revisit the decision to route to default pipeline + // if NO metadata key results in empty str OR if primary key doesn't exist. + DefaultPipelines []pipeline.ID `mapstructure:"default_pipelines"` + EvaluationInterval time.Duration `mapstructure:"evalaution_interval"` + Pipelines [][]pipeline.ID `mapstructure:"pipelines"` + Thresholds []int `mapstructure:"thresholds"` + PrimaryMetadataKey string `mapstructure:"primary_metadata_key"` + MetadataKeys []string `mapstructure:"metadata_keys"` +} + +func (c *Config) Validate() error { + if len(c.Pipelines) == 0 { + return errors.New("atleast one pipeline needs to be defined") + } + if len(c.Pipelines)+1 != len(c.Thresholds) { + return errors.New("pipelines need to be defined for each threshold bucket, including +inf") + } + if !sort.IntsAreSorted(c.Thresholds) { + return errors.New("thresolds is expected to be in increasing order") + } + + for i := 1; i < len(c.Thresholds); i++ { + if c.Thresholds[i] == c.Thresholds[i-1] { + return errors.New("thresholds are expected to be unique") + } + } + return nil +} diff --git a/connector/dynamicroutingconnector/factory.go b/connector/dynamicroutingconnector/factory.go new file mode 100644 index 000000000..07ca5e3de --- /dev/null +++ b/connector/dynamicroutingconnector/factory.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/consumer" + + "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector/internal/metadata" +) + +// NewFactory returns a connector.Factory. +func NewFactory() connector.Factory { + return connector.NewFactory( + metadata.Type, + createDefaultConfig, + connector.WithTracesToTraces(createTracesToTraces, metadata.TracesToTracesStability), + connector.WithLogsToLogs(createLogsToLogs, metadata.LogsToLogsStability), + connector.WithMetricsToMetrics(createMetricsToMetrics, metadata.MetricsToMetricsStability), + ) +} + +func createTracesToTraces( + _ context.Context, + set connector.Settings, + cfg component.Config, + traces consumer.Traces, +) (connector.Traces, error) { + return newTracesConnector(set, cfg, traces) +} + +func createLogsToLogs( + _ context.Context, + set connector.Settings, + cfg component.Config, + logs consumer.Logs, +) (connector.Logs, error) { + return newLogsConnector(set, cfg, logs) +} + +func createMetricsToMetrics( + _ context.Context, + set connector.Settings, + cfg component.Config, + metrics consumer.Metrics, +) (connector.Metrics, error) { + return newMetricsConnector(set, cfg, metrics) +} + +func createDefaultConfig() component.Config { + return &Config{} +} diff --git a/connector/dynamicroutingconnector/generated_component_test.go b/connector/dynamicroutingconnector/generated_component_test.go new file mode 100644 index 000000000..fdc763b36 --- /dev/null +++ b/connector/dynamicroutingconnector/generated_component_test.go @@ -0,0 +1,107 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by mdatagen. DO NOT EDIT. + +package dynamicroutingconnector + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pipeline" +) + +var typ = component.MustNewType("dynamicrouting") + +func TestComponentFactoryType(t *testing.T) { + require.Equal(t, typ, NewFactory().Type()) +} + +func TestComponentConfigStruct(t *testing.T) { + require.NoError(t, componenttest.CheckConfigStruct(NewFactory().CreateDefaultConfig())) +} + +func TestComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + createFn func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) + name string + }{ + + { + name: "logs_to_logs", + createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) { + router := connector.NewLogsRouter(map[pipeline.ID]consumer.Logs{pipeline.NewID(pipeline.SignalLogs): consumertest.NewNop()}) + return factory.CreateLogsToLogs(ctx, set, cfg, router) + }, + }, + + { + name: "metrics_to_metrics", + createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) { + router := connector.NewMetricsRouter(map[pipeline.ID]consumer.Metrics{pipeline.NewID(pipeline.SignalMetrics): consumertest.NewNop()}) + return factory.CreateMetricsToMetrics(ctx, set, cfg, router) + }, + }, + + { + name: "traces_to_traces", + createFn: func(ctx context.Context, set connector.Settings, cfg component.Config) (component.Component, error) { + router := connector.NewTracesRouter(map[pipeline.ID]consumer.Traces{pipeline.NewID(pipeline.SignalTraces): consumertest.NewNop()}) + return factory.CreateTracesToTraces(ctx, set, cfg, router) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(&cfg)) + + for _, tt := range tests { + t.Run(tt.name+"-shutdown", func(t *testing.T) { + c, err := tt.createFn(context.Background(), connectortest.NewNopSettings(typ), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + t.Run(tt.name+"-lifecycle", func(t *testing.T) { + firstConnector, err := tt.createFn(context.Background(), connectortest.NewNopSettings(typ), cfg) + require.NoError(t, err) + host := componenttest.NewNopHost() + require.NoError(t, err) + require.NoError(t, firstConnector.Start(context.Background(), host)) + require.NoError(t, firstConnector.Shutdown(context.Background())) + secondConnector, err := tt.createFn(context.Background(), connectortest.NewNopSettings(typ), cfg) + require.NoError(t, err) + require.NoError(t, secondConnector.Start(context.Background(), host)) + require.NoError(t, secondConnector.Shutdown(context.Background())) + }) + } +} diff --git a/connector/dynamicroutingconnector/generated_package_test.go b/connector/dynamicroutingconnector/generated_package_test.go new file mode 100644 index 000000000..6303015d6 --- /dev/null +++ b/connector/dynamicroutingconnector/generated_package_test.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by mdatagen. DO NOT EDIT. + +package dynamicroutingconnector + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/connector/dynamicroutingconnector/go.mod b/connector/dynamicroutingconnector/go.mod new file mode 100644 index 000000000..3971afbe1 --- /dev/null +++ b/connector/dynamicroutingconnector/go.mod @@ -0,0 +1,77 @@ +module github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector + +go 1.24.0 + +toolchain go1.24.2 + +require ( + github.com/axiomhq/hyperloglog v0.2.5 + github.com/cespare/xxhash/v2 v2.3.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.135.0 + github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/collector/client v1.41.0 + go.opentelemetry.io/collector/component v1.41.0 + go.opentelemetry.io/collector/component/componenttest v0.135.0 + go.opentelemetry.io/collector/confmap v1.41.0 + go.opentelemetry.io/collector/connector v0.135.0 + go.opentelemetry.io/collector/connector/connectortest v0.135.0 + go.opentelemetry.io/collector/consumer v1.41.0 + go.opentelemetry.io/collector/consumer/consumertest v0.135.0 + go.opentelemetry.io/collector/pdata v1.41.0 + go.opentelemetry.io/collector/pipeline v1.41.0 + go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/gobwas/glob v0.2.3 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/kamstrup/intmap v0.5.1 // indirect + github.com/knadh/koanf/maps v0.1.2 // indirect + github.com/knadh/koanf/providers/confmap v1.0.0 // indirect + github.com/knadh/koanf/v2 v2.2.2 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.135.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/connector/xconnector v0.135.0 // indirect + go.opentelemetry.io/collector/consumer/xconsumer v0.135.0 // indirect + go.opentelemetry.io/collector/featuregate v1.41.0 // indirect + go.opentelemetry.io/collector/internal/fanoutconsumer v0.135.0 // indirect + go.opentelemetry.io/collector/internal/telemetry v0.135.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.135.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.135.0 // indirect + go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect + go.opentelemetry.io/otel v1.38.0 // indirect + go.opentelemetry.io/otel/log v0.14.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/grpc v1.75.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent => ../../internal/sharedcomponent + +replace github.com/elastic/opentelemetry-collector-components/processor/elastictraceprocessor => ../../processor/elastictraceprocessor + +replace github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor => ../../processor/lsmintervalprocessor diff --git a/connector/dynamicroutingconnector/go.sum b/connector/dynamicroutingconnector/go.sum new file mode 100644 index 000000000..3f4364a90 --- /dev/null +++ b/connector/dynamicroutingconnector/go.sum @@ -0,0 +1,185 @@ +github.com/axiomhq/hyperloglog v0.2.5 h1:Hefy3i8nAs8zAI/tDp+wE7N+Ltr8JnwiW3875pvl0N8= +github.com/axiomhq/hyperloglog v0.2.5/go.mod h1:DLUK9yIzpU5B6YFLjxTIcbHu1g4Y1WQb1m5RH3radaM= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= +github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kamstrup/intmap v0.5.1 h1:ENGAowczZA+PJPYYlreoqJvWgQVtAmX1l899WfYFVK0= +github.com/kamstrup/intmap v0.5.1/go.mod h1:gWUVWHKzWj8xpJVFf5GC0O26bWmv3GqdnIX/LMT6Aq4= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= +github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v1.0.0 h1:mHKLJTE7iXEys6deO5p6olAiZdG5zwp8Aebir+/EaRE= +github.com/knadh/koanf/providers/confmap v1.0.0/go.mod h1:txHYHiI2hAtF0/0sCmcuol4IDcuQbKTybiB1nOcUo1A= +github.com/knadh/koanf/v2 v2.2.2 h1:ghbduIkpFui3L587wavneC9e3WIliCgiCgdxYO/wd7A= +github.com/knadh/koanf/v2 v2.2.2/go.mod h1:abWQc0cBXLSF/PSOMCB/SK+T13NXDsPvOksbpi5e/9Q= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.135.0 h1:BZF24aHpbal8sflQzMCHfFajaRfVlYHoBaOf4A6dSJY= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.135.0/go.mod h1:2cUWeLDR5IRO55TUm+TSbeREL94Yx3blJH1HdWzfH+c= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.135.0 h1:ETIkUe0xdzc4aQXQwscKK+oqgvR6lza8ccEfWh7blVI= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.135.0/go.mod h1:qo7n5lhE8hGwMPPS+1pkiPshTCUzliUSc4EBPqUe4Ko= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.135.0 h1:JEXB0KNbq9lmJPObPfzCGfy4kDCXjUWNyDa+XDX2kGE= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.135.0/go.mod h1:AP7VhGu9pFIrEC2i6IddzBZ0ntllQKXAP9uzIO8eOn8= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/collector/client v1.41.0 h1:guBmo07Dyb3wV5ApB2aIFY/Myl394T1XYn+E/YCCbnM= +go.opentelemetry.io/collector/client v1.41.0/go.mod h1:bY1Tbx/UBWWoMS/LDPwq7ftDE7ExvSy/Yknu0bU9dJc= +go.opentelemetry.io/collector/component v1.41.0 h1:NMvPlvfOSzhXPHWB6pTgrGaH6jg25ym1Oog8sTI813s= +go.opentelemetry.io/collector/component v1.41.0/go.mod h1:PA7vA3IxU5PRAbm96++sweaVzeoirBFZpRBs7XbbPEU= +go.opentelemetry.io/collector/component/componenttest v0.135.0 h1:OB6OmCWE1EwHwvV17RgvUeeDimSjHV7wrRGHcUVh06g= +go.opentelemetry.io/collector/component/componenttest v0.135.0/go.mod h1:9epxwkJW7ZXB1mTmCVF3JzfIoM0uhtnBTC2YWxrXczk= +go.opentelemetry.io/collector/confmap v1.41.0 h1:m2Z7uZ1W4KpUdIWmps3vSv9jAvKFIr4EO/yYdSZ4+lE= +go.opentelemetry.io/collector/confmap v1.41.0/go.mod h1:0nVs/u8BR6LZUjkMSOszBv1CSu4AGMoWv4c8zqu0ui0= +go.opentelemetry.io/collector/connector v0.135.0 h1:IpVRPHJ41UW028Zl+j0xcnjN4n0CmTuHkQg1RQwzujs= +go.opentelemetry.io/collector/connector v0.135.0/go.mod h1:+5eii69wBS8DW+zhqZmzbwWcptbbsNWEPvK5DQz4vw4= +go.opentelemetry.io/collector/connector/connectortest v0.135.0 h1:Zk8us8ckOweRGW7w/6KGu2Zz7Ncy0Q9Wh+FCTRG+VgY= +go.opentelemetry.io/collector/connector/connectortest v0.135.0/go.mod h1:3rY/2B1tXMUyrzVgq7/yLOS5dUf55NZfLo6UpfaTNWg= +go.opentelemetry.io/collector/connector/xconnector v0.135.0 h1:nVsUZGrkM12Ahl0j3E82Xu7qJXoN0XjZjeWKdy0TtLA= +go.opentelemetry.io/collector/connector/xconnector v0.135.0/go.mod h1:eMzQIQT28sEFK/lwkzA8KXEYZtlTIAZNOcTmVZR1Biw= +go.opentelemetry.io/collector/consumer v1.41.0 h1:sV77khNsZd5YR+vNtHIJaRcTXIlszNX7ZePpXRpm9PA= +go.opentelemetry.io/collector/consumer v1.41.0/go.mod h1:fDB3ZjVCv2+zFsF/6WSYBSX3pkux/qAYf2Tk/P6b9yA= +go.opentelemetry.io/collector/consumer/consumertest v0.135.0 h1:6WqoRyjvHcVuIrF7UbiPcOI7qx9uP3079pFlKeIngWk= +go.opentelemetry.io/collector/consumer/consumertest v0.135.0/go.mod h1:WcW7FyvELOklWjgjP+tUuR6Y8PoaOOnFiauubFzPbXg= +go.opentelemetry.io/collector/consumer/xconsumer v0.135.0 h1:JTqWWBHrs6MUPEvgGRwrVST8u3+L39mvHmsCZ2MIhro= +go.opentelemetry.io/collector/consumer/xconsumer v0.135.0/go.mod h1:zlIG7cEmgjlqAHCqpMOFX9kqzog0cNFsCR2A9r8DTQI= +go.opentelemetry.io/collector/featuregate v1.41.0 h1:CL4UMsMQj35nMJC3/jUu8VvYB4MHirbAX4B0Z/fCVLY= +go.opentelemetry.io/collector/featuregate v1.41.0/go.mod h1:A72x92glpH3zxekaUybml1vMSv94BH6jQRn5+/htcjw= +go.opentelemetry.io/collector/internal/fanoutconsumer v0.135.0 h1:EJNEL/p9ug5Hq/gM3zLNLYmqD4zGMyp/0oWx0q+hw3Y= +go.opentelemetry.io/collector/internal/fanoutconsumer v0.135.0/go.mod h1:AJeK3tsY6v4dH8UYaSAtYyXvdZUTKY74aVCY+h3M5BU= +go.opentelemetry.io/collector/internal/telemetry v0.135.0 h1:GnWqyy3jTSrmefzYPNamQ0ZIhRTJZFnRW6/rj8lc1sA= +go.opentelemetry.io/collector/internal/telemetry v0.135.0/go.mod h1:ryObkPVpAfn6SG16vKdy1ys3udwQCj5G6m6d5LJLhtc= +go.opentelemetry.io/collector/pdata v1.41.0 h1:2zurAaY0FkURbLa1x7f7ag6HaNZYZKSmI4wgzDegLgo= +go.opentelemetry.io/collector/pdata v1.41.0/go.mod h1:h0OghaTYe4oRvLxK31Ny7gkyjJ1p8oniM5MiCzluQjc= +go.opentelemetry.io/collector/pdata/pprofile v0.135.0 h1:+s7I7Tj28THWRhUeKEv5JnadKCPKLnzouG6x0N25dOQ= +go.opentelemetry.io/collector/pdata/pprofile v0.135.0/go.mod h1:VuxzZ5XT4cPyHfkSBLQ6YmKbGJ6T3VdG0ec0+yjIF94= +go.opentelemetry.io/collector/pdata/testdata v0.135.0 h1:bp+9wKAifJcoYdS+qTwtgcKPM129wIKLUGAAxKY4lck= +go.opentelemetry.io/collector/pdata/testdata v0.135.0/go.mod h1:w0gTft2xsn/adYgUGNBhDDjVhKCvvA9fHTKIbh7rx0o= +go.opentelemetry.io/collector/pipeline v1.41.0 h1:1WtWLkegP9vW4XrAlsDHI+JMPsN9tdzctMoTYzuol9g= +go.opentelemetry.io/collector/pipeline v1.41.0/go.mod h1:NdM+ZqkPe9KahtOXG28RHTRQu4m/FD1i3Ew4qCRdOr8= +go.opentelemetry.io/collector/pipeline/xpipeline v0.135.0 h1:SstI81OsKdc2t/4XVUOT4Goexz8Ub3hcVEFcbQzjCQU= +go.opentelemetry.io/collector/pipeline/xpipeline v0.135.0/go.mod h1:PHR3zAsx6sYOEYBc0zcNK30szcKd6svY+L+udanXh+U= +go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 h1:FGre0nZh5BSw7G73VpT3xs38HchsfPsa2aZtMp0NPOs= +go.opentelemetry.io/contrib/bridges/otelzap v0.12.0/go.mod h1:X2PYPViI2wTPIMIOBjG17KNybTzsrATnvPJ02kkz7LM= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/log v0.14.0 h1:2rzJ+pOAZ8qmZ3DDHg73NEKzSZkhkGIua9gXtxNGgrM= +go.opentelemetry.io/otel/log v0.14.0/go.mod h1:5jRG92fEAgx0SU/vFPxmJvhIuDU9E1SUnEQrMlJpOno= +go.opentelemetry.io/otel/log/logtest v0.14.0 h1:BGTqNeluJDK2uIHAY8lRqxjVAYfqgcaTbVk1n3MWe5A= +go.opentelemetry.io/otel/log/logtest v0.14.0/go.mod h1:IuguGt8XVP4XA4d2oEEDMVDBBCesMg8/tSGWDjuKfoA= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.opentelemetry.io/proto/slim/otlp v1.7.1 h1:lZ11gEokjIWYM3JWOUrIILr2wcf6RX+rq5SPObV9oyc= +go.opentelemetry.io/proto/slim/otlp v1.7.1/go.mod h1:uZ6LJWa49eNM/EXnnvJGTTu8miokU8RQdnO980LJ57g= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1 h1:Tr/eXq6N7ZFjN+THBF/BtGLUz8dciA7cuzGRsCEkZ88= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1/go.mod h1:riqUmAOJFDFuIAzZu/3V6cOrTyfWzpgNJnG5UwrapCk= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Kopwh/dtdRagJy+qsRRPA86/Ux3g7+zFXM= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= +google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/connector/dynamicroutingconnector/internal/metadata/generated_status.go b/connector/dynamicroutingconnector/internal/metadata/generated_status.go new file mode 100644 index 000000000..f4b056d60 --- /dev/null +++ b/connector/dynamicroutingconnector/internal/metadata/generated_status.go @@ -0,0 +1,35 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("dynamicrouting") + ScopeName = "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector" +) + +const ( + LogsToLogsStability = component.StabilityLevelDevelopment + MetricsToMetricsStability = component.StabilityLevelDevelopment + TracesToTracesStability = component.StabilityLevelDevelopment +) diff --git a/connector/dynamicroutingconnector/logs.go b/connector/dynamicroutingconnector/logs.go new file mode 100644 index 000000000..9b3c22dd3 --- /dev/null +++ b/connector/dynamicroutingconnector/logs.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" +) + +type logsConnector struct { + logger *zap.Logger + cfg *Config + router *router[consumer.Logs] +} + +func newLogsConnector( + set connector.Settings, + config component.Config, + logs consumer.Logs, +) (*logsConnector, error) { + cfg := config.(*Config) + lr, ok := logs.(connector.LogsRouterAndConsumer) + if !ok { + return nil, errors.New("expected connector to be a router and consumer") + } + + router, err := newRouter(cfg, set.TelemetrySettings, lr.Consumer) + if err != nil { + return nil, fmt.Errorf("failed to create router: %w", err) + } + + return &logsConnector{ + logger: set.Logger, + cfg: cfg, + router: router, + }, nil +} + +func (c *logsConnector) Start(ctx context.Context, host component.Host) error { + return c.router.Start(ctx, host) +} + +func (c *logsConnector) Shutdown(ctx context.Context) error { + return c.router.Shutdown(ctx) +} + +func (c *logsConnector) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return c.router.Process(ctx).ConsumeLogs(ctx, ld) +} diff --git a/connector/dynamicroutingconnector/metadata.yaml b/connector/dynamicroutingconnector/metadata.yaml new file mode 100644 index 000000000..336144d1c --- /dev/null +++ b/connector/dynamicroutingconnector/metadata.yaml @@ -0,0 +1,15 @@ +type: dynamicrouting +scope_name: github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector +github_project: elastic/opentelemetry-collector-components + +status: + class: connector + stability: + development: + - logs_to_logs + - metrics_to_metrics + - traces_to_traces + distributions: [] + +tests: + config: diff --git a/connector/dynamicroutingconnector/metrics.go b/connector/dynamicroutingconnector/metrics.go new file mode 100644 index 000000000..bf9bc4d6d --- /dev/null +++ b/connector/dynamicroutingconnector/metrics.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +type metricsConnector struct { + logger *zap.Logger + cfg *Config + router *router[consumer.Metrics] +} + +func newMetricsConnector( + set connector.Settings, + config component.Config, + metrics consumer.Metrics, +) (*metricsConnector, error) { + cfg := config.(*Config) + mr, ok := metrics.(connector.MetricsRouterAndConsumer) + if !ok { + return nil, errors.New("expected connector to be a router and consumer") + } + + router, err := newRouter(cfg, set.TelemetrySettings, mr.Consumer) + if err != nil { + return nil, fmt.Errorf("failed to create router: %w", err) + } + + return &metricsConnector{ + logger: set.Logger, + cfg: cfg, + router: router, + }, nil +} + +func (c *metricsConnector) Start(ctx context.Context, host component.Host) error { + return c.router.Start(ctx, host) +} + +func (c *metricsConnector) Shutdown(ctx context.Context) error { + return c.router.Shutdown(ctx) +} + +func (c *metricsConnector) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + return c.router.Process(ctx).ConsumeMetrics(ctx, md) +} diff --git a/connector/dynamicroutingconnector/router.go b/connector/dynamicroutingconnector/router.go new file mode 100644 index 000000000..e033952ca --- /dev/null +++ b/connector/dynamicroutingconnector/router.go @@ -0,0 +1,235 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector" + +import ( + "context" + "fmt" + "slices" + "sort" + "strings" + "sync" + "time" + + "github.com/axiomhq/hyperloglog" + "github.com/cespare/xxhash/v2" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" + "go.uber.org/zap" +) + +var _ component.Component = (*router[any])(nil) + +// consumerProvider is a function with a type parameter C (expected to be one +// of consumer.Traces, consumer.Metrics, or Consumer.Logs). returns a +// consumer for the given component ID(s). +type consumerProvider[C any] func(...pipeline.ID) (C, error) + +type router[C any] struct { + evaluationInterval time.Duration + primaryMetadataKey string + sortedMetadataKeys []string + defaultConsumer C + thresholds []int + consumers []C + + // TODO(lahsivjar): Fix the chaos of Start<>Shutdown orchestration + stop chan struct{} + stopped chan struct{} + + logger *zap.Logger + + dmu sync.RWMutex + decision map[string]*hyperloglog.Sketch + + mu sync.Mutex + m map[string]*hyperloglog.Sketch +} + +func newRouter[C any]( + cfg *Config, + settings component.TelemetrySettings, + provider consumerProvider[C], +) (*router[C], error) { + sortedMetadataKeys := slices.Clone(cfg.MetadataKeys) + slices.Sort(sortedMetadataKeys) + + consumers := make([]C, 0, len(cfg.Pipelines)) + for i, p := range cfg.Pipelines { + c, err := provider(p...) + if err != nil { + return nil, fmt.Errorf("failed to create consumer from provided pipelines at idx %d: %w", i, err) + } + consumers = append(consumers, c) + } + + var ( + err error + defaultConsumer C + ) + if len(cfg.DefaultPipelines) > 0 { + defaultConsumer, err = provider(cfg.DefaultPipelines...) + if err != nil { + return nil, fmt.Errorf("failed to create consumer from default pipelines: %w", err) + } + } + return &router[C]{ + evaluationInterval: cfg.EvaluationInterval, + primaryMetadataKey: cfg.PrimaryMetadataKey, + sortedMetadataKeys: sortedMetadataKeys, + defaultConsumer: defaultConsumer, + thresholds: cfg.Thresholds, + consumers: consumers, + stop: make(chan struct{}), + decision: make(map[string]*hyperloglog.Sketch), + m: make(map[string]*hyperloglog.Sketch), + }, nil +} + +func (r *router[C]) Start(ctx context.Context, _ component.Host) error { + // TODO(lahsivjar): Protect by lock to handle multiple calls to start + if r.stopped == nil { + r.stopped = make(chan struct{}) + } + + select { + case <-r.stop: + // Already signaled to be stopped + return nil + case <-r.stopped: + // Already stopped + return nil + default: + } + + go func() { + defer close(r.stopped) + // Use timers to ensure that decions are always atleast + // evaluation interval apart. + timer := time.NewTimer(r.evaluationInterval) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-r.stop: + // stop has been signaled + return + case <-timer.C: + } + + r.updateDecisions() + timer.Reset(r.evaluationInterval) + } + }() + return nil +} + +func (r *router[C]) Shutdown(ctx context.Context) error { + select { + case <-r.stop: + // Shutdown has already been called once + return nil + default: + } + + close(r.stop) + if r.stopped != nil { + // Start has ran before + select { + case <-ctx.Done(): + // shutdown context done + return fmt.Errorf("failed to shutdown due to context timeout while waiting for router to stop: %w", ctx.Err()) + case <-r.stopped: + // wait for evaluation goroutine to stop + } + } + return nil +} + +func (r *router[C]) Process(ctx context.Context) C { + return r.getNextConsumer(r.estimateCardinality(ctx)) +} + +func (r *router[C]) estimateCardinality(ctx context.Context) string { + clientMeta := client.FromContext(ctx).Metadata + var pk string + switch pks := clientMeta.Get(r.primaryMetadataKey); len(pks) { + case 0: + return "" + case 1: + pk = pks[0] + default: + pk = strings.Join(pks, ":") + } + + var hash xxhash.Digest + for _, k := range r.sortedMetadataKeys { + vs := clientMeta.Get(k) + if len(vs) == 0 { + continue + } + hash.WriteString(k) + for _, v := range vs { + hash.WriteString(":") + hash.WriteString(v) + } + } + + r.mu.Lock() + defer r.mu.Unlock() + + hll, ok := r.m[pk] + if !ok || hll == nil { + hll = hyperloglog.New() + r.m[pk] = hll + } + hll.InsertHash(hash.Sum64()) + return pk +} + +func (r *router[C]) getNextConsumer(pk string) C { + if pk == "" { + return r.defaultConsumer + } + + r.dmu.RLock() + defer r.dmu.RUnlock() + + hll, ok := r.decision[pk] + if !ok { + return r.defaultConsumer + } + return r.consumers[sort.SearchInts(r.thresholds, int(hll.Estimate()))] +} + +// updateDecisions updates the current cache to be used for decisions, discarding +// the previous decision map. +func (r *router[C]) updateDecisions() { + r.dmu.Lock() + defer r.dmu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + + r.decision = r.m + // TODO(lahsivjar): Map allocation can be optimized by reusing + // the older decision map. + r.m = make(map[string]*hyperloglog.Sketch) +} diff --git a/connector/dynamicroutingconnector/traces.go b/connector/dynamicroutingconnector/traces.go new file mode 100644 index 000000000..a44d8911c --- /dev/null +++ b/connector/dynamicroutingconnector/traces.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dynamicroutingconnector // import "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +type tracesConnector struct { + logger *zap.Logger + cfg *Config + router *router[consumer.Traces] +} + +func newTracesConnector( + set connector.Settings, + config component.Config, + traces consumer.Traces, +) (*tracesConnector, error) { + cfg := config.(*Config) + tr, ok := traces.(connector.TracesRouterAndConsumer) + if !ok { + return nil, errors.New("expected connector to be a router and consumer") + } + + router, err := newRouter(cfg, set.TelemetrySettings, tr.Consumer) + if err != nil { + return nil, fmt.Errorf("failed to create router: %w", err) + } + + return &tracesConnector{ + logger: set.Logger, + cfg: cfg, + router: router, + }, nil +} + +func (c *tracesConnector) Start(ctx context.Context, host component.Host) error { + return c.router.Start(ctx, host) +} + +func (c *tracesConnector) Shutdown(ctx context.Context) error { + return c.router.Shutdown(ctx) +} + +func (c *tracesConnector) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func (c *tracesConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + return c.router.Process(ctx).ConsumeTraces(ctx, td) +} diff --git a/connector/dynamicroutingconnector/traces_test.go b/connector/dynamicroutingconnector/traces_test.go new file mode 100644 index 000000000..7897f790d --- /dev/null +++ b/connector/dynamicroutingconnector/traces_test.go @@ -0,0 +1,192 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dynamicroutingconnector + +import ( + "context" + "testing" + "time" + + "github.com/elastic/opentelemetry-collector-components/connector/dynamicroutingconnector/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" +) + +func TestTracesRouting(t *testing.T) { + pipelineDefault := pipeline.NewIDWithName(pipeline.SignalTraces, "default") + pipeline_0_2 := pipeline.NewIDWithName(pipeline.SignalTraces, "thershold_0_2") + pipeline_2_5 := pipeline.NewIDWithName(pipeline.SignalTraces, "thershold_2_5") + pipeline_5_inf := pipeline.NewIDWithName(pipeline.SignalTraces, "thershold_5_inf") + cfg := Config{ + DefaultPipelines: []pipeline.ID{pipelineDefault}, + Pipelines: [][]pipeline.ID{ + {pipeline_0_2}, + {pipeline_2_5}, + {pipeline_5_inf}, + }, + Thresholds: []int{2, 5}, + PrimaryMetadataKey: "x-tenant-id", + MetadataKeys: []string{"x-forwarded-for", "user-agent"}, + } + + for _, tc := range []struct { + name string + ctx context.Context + evaluationInterval time.Duration + initialData []ptrace.Traces + input ptrace.Traces + expectSinkDefault []ptrace.Traces + expectSink_0_2 []ptrace.Traces + expectSink_2_5 []ptrace.Traces + expectSink_5_inf []ptrace.Traces + }{ + { + name: "primay_key_missing", + ctx: t.Context(), + evaluationInterval: time.Second, + initialData: []ptrace.Traces{ + newTestTraces("1", "1", "1", "1"), + }, + input: newTestTraces("2", "2", "2", "2"), + expectSinkDefault: []ptrace.Traces{ + newTestTraces("1", "1", "1", "1"), + newTestTraces("2", "2", "2", "2"), + }, + }, + { + name: "metadata_attrs_missing", + ctx: client.NewContext( + t.Context(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-forwarded-for": {"10.2.4.2"}, + "user-agent": {"otel-0.135.0"}, + }), + }, + ), + evaluationInterval: time.Second, + initialData: []ptrace.Traces{ + newTestTraces("1", "1", "1", "1"), + }, + input: newTestTraces("2", "2", "2", "2"), + expectSinkDefault: []ptrace.Traces{ + newTestTraces("1", "1", "1", "1"), + newTestTraces("2", "2", "2", "2"), + }, + }, + { + name: "happy_path", + ctx: client.NewContext( + t.Context(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "x-tenant-id": {"tenant-1"}, + "x-forwarded-for": {"10.2.4.2"}, + "user-agent": {"otel-0.135.0"}, + }), + }, + ), + evaluationInterval: time.Second, + initialData: []ptrace.Traces{ + newTestTraces("1", "1", "1", "1"), + }, + input: newTestTraces("2", "2", "2", "2"), + expectSinkDefault: []ptrace.Traces{ + newTestTraces("1", "1", "1", "1"), + }, + expectSink_0_2: []ptrace.Traces{ + newTestTraces("2", "2", "2", "2"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var sinkDefault, sink_0_2, sink_2_5, sink_5_inf consumertest.TracesSink + routerAndConsumer := connector.NewTracesRouter(map[pipeline.ID]consumer.Traces{ + pipelineDefault: &sinkDefault, + pipeline_0_2: &sink_0_2, + pipeline_2_5: &sink_2_5, + pipeline_5_inf: &sink_5_inf, + }) + + cfg.EvaluationInterval = tc.evaluationInterval + connector, err := NewFactory().CreateTracesToTraces( + t.Context(), + connectortest.NewNopSettings(metadata.Type), + &cfg, + routerAndConsumer.(consumer.Traces), + ) + require.NoError(t, err) + + ctx := t.Context() + if tc.ctx != nil { + ctx = tc.ctx + } + + router := connector.(*tracesConnector).router + for _, d := range tc.initialData { + require.NoError(t, connector.ConsumeTraces(ctx, d)) + } + // Update the decisions to be based on initial data + router.updateDecisions() + + require.NoError(t, connector.ConsumeTraces(ctx, tc.input)) + compareTracesSlice(t, tc.expectSinkDefault, sinkDefault.AllTraces()) + compareTracesSlice(t, tc.expectSink_0_2, sink_0_2.AllTraces()) + compareTracesSlice(t, tc.expectSink_2_5, sink_2_5.AllTraces()) + compareTracesSlice(t, tc.expectSink_5_inf, sink_5_inf.AllTraces()) + }) + } +} + +func newTestTraces(resourceIDs, scopeIDs, spanIDs, spanEventIDs string) ptrace.Traces { + td := ptrace.NewTraces() + for resourceN := 0; resourceN < len(resourceIDs); resourceN++ { + rs := td.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("resourceName", "resource"+string(resourceIDs[resourceN])) + for scopeN := 0; scopeN < len(scopeIDs); scopeN++ { + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().SetName("scope" + string(scopeIDs[scopeN])) + for spanN := 0; spanN < len(spanIDs); spanN++ { + s := ss.Spans().AppendEmpty() + s.SetName("span" + string(spanIDs[spanN])) + for spanEventN := 0; spanEventN < len(spanEventIDs); spanEventN++ { + se := s.Events().AppendEmpty() + se.Attributes().PutStr("spanEventName", "spanEvent"+string(spanEventIDs[spanEventN])) + } + } + } + } + return td +} + +func compareTracesSlice(t *testing.T, expected []ptrace.Traces, actual []ptrace.Traces) { + t.Helper() + + require.Equal(t, len(expected), len(actual)) + for i := 0; i < len(expected); i++ { + assert.NoError(t, ptracetest.CompareTraces(expected[i], actual[i])) + } +}