diff --git a/development/tsdb-blocks-storage-s3/config/cortex.yaml b/development/tsdb-blocks-storage-s3/config/cortex.yaml index 4d80166b81a..c1d479afb39 100644 --- a/development/tsdb-blocks-storage-s3/config/cortex.yaml +++ b/development/tsdb-blocks-storage-s3/config/cortex.yaml @@ -66,3 +66,6 @@ compactor: store: consul consul: host: consul:8500 + +frontend_worker: + address: "query-frontend:9007" diff --git a/development/tsdb-blocks-storage-s3/docker-compose.yml b/development/tsdb-blocks-storage-s3/docker-compose.yml index 9104f6b4fe7..eee8a02c098 100644 --- a/development/tsdb-blocks-storage-s3/docker-compose.yml +++ b/development/tsdb-blocks-storage-s3/docker-compose.yml @@ -108,3 +108,17 @@ services: - 8006:8006 volumes: - ./config:/cortex/config + + query-frontend: + build: + context: . + dockerfile: dev.dockerfile + image: cortex + command: ["sh", "-c", "sleep 3 && exec ./cortex -config.file=./config/cortex.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007"] + depends_on: + - consul + - minio + ports: + - 8007:8007 + volumes: + - ./config:/cortex/config diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go index 52466f0536d..6d36191d42e 100644 --- a/integration/alertmanager_test.go +++ b/integration/alertmanager_test.go @@ -2,9 +2,6 @@ package main import ( "context" - "io/ioutil" - "os" - "path/filepath" "testing" "github.com/stretchr/testify/require" @@ -18,16 +15,9 @@ func TestAlertmanager(t *testing.T) { require.NoError(t, err) defer s.Close() - alertmanagerDir := filepath.Join(s.SharedDir(), "alertmanager_configs") - require.NoError(t, os.Mkdir(alertmanagerDir, os.ModePerm)) + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml))) - require.NoError(t, ioutil.WriteFile( - filepath.Join(alertmanagerDir, "user-1.yaml"), - []byte(cortexAlertmanagerUserConfigYaml), - os.ModePerm), - ) - - alertmanager := e2ecortex.NewAlertmanager("alertmanager", AlertmanagerConfigs, "") + alertmanager := e2ecortex.NewAlertmanager("alertmanager", AlertmanagerFlags, "") require.NoError(t, s.StartAndWaitReady(alertmanager)) require.NoError(t, alertmanager.WaitSumMetrics(e2e.Equals(1), "cortex_alertmanager_configs")) diff --git a/integration/api_config_test.go b/integration/api_config_test.go index 0c57cfd9df2..9cf9a97445a 100644 --- a/integration/api_config_test.go +++ b/integration/api_config_test.go @@ -25,7 +25,7 @@ func TestConfigAPIEndpoint(t *testing.T) { "-config.file": filepath.Join(e2e.ContainerSharedDir, cortexConfigFile), } - cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags, "", 9009) + cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags, "", 9009, 9095) require.NoError(t, s.StartAndWaitReady(cortex1)) // Get config from /config API endpoint. @@ -40,6 +40,6 @@ func TestConfigAPIEndpoint(t *testing.T) { // Start again Cortex in single binary with the exported config // and ensure it starts (pass the readiness probe). require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, body)) - cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags, "", 9009) + cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags, "", 9009, 9095) require.NoError(t, s.StartAndWaitReady(cortex2)) } diff --git a/integration/backward_compatibility_test.go b/integration/backward_compatibility_test.go index 67f0b482ae6..17af1b5edaa 100644 --- a/integration/backward_compatibility_test.go +++ b/integration/backward_compatibility_test.go @@ -31,9 +31,9 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { // Start Cortex components (ingester running on previous version). require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) - tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorage, previousVersionImage) - ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ChunksStorage, "") - distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorage, "") + tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, previousVersionImage) + ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "") + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "") require.NoError(t, s.StartAndWaitReady(distributor, ingester1, tableManager)) // Wait until the first table-manager sync has completed, so that we're @@ -54,7 +54,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorage, map[string]string{ + ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{ "-ingester.join-after": "10s", }), "") // Start ingester-2 on new version, to ensure the transfer is backward compatible. @@ -66,7 +66,7 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { // Query the new ingester both with the old and the new querier. for _, image := range []string{previousVersionImage, ""} { - querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorage, image) + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, image) require.NoError(t, s.StartAndWaitReady(querier)) // Wait until the querier has updated the ring. diff --git a/integration/configs.go b/integration/configs.go index 47e90a8e4c6..6ba0d6815cc 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -3,6 +3,8 @@ package main import ( "fmt" "path/filepath" + "strings" + "text/template" "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" @@ -33,13 +35,13 @@ receivers: ) var ( - AlertmanagerConfigs = map[string]string{ + AlertmanagerFlags = map[string]string{ "-alertmanager.storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"), "-alertmanager.storage.type": "local", "-alertmanager.web.external-url": "http://localhost/api/prom", } - BlocksStorage = map[string]string{ + BlocksStorageFlags = map[string]string{ "-store.engine": "tsdb", "-experimental.tsdb.backend": "s3", "-experimental.tsdb.block-ranges-period": "1m", @@ -53,10 +55,90 @@ var ( "-experimental.tsdb.s3.insecure": "true", } - ChunksStorage = map[string]string{ + BlocksStorageConfig = buildConfigFromTemplate(` +storage: + engine: tsdb + +tsdb: + backend: s3 + block_ranges_period: ["1m"] + retention_period: 5m + ship_interval: 1m + + bucket_store: + sync_interval: 5s + + s3: + bucket_name: cortex + access_key_id: {{.MinioAccessKey}} + secret_access_key: {{.MinioSecretKey}} + endpoint: {{.MinioEndpoint}} + insecure: true +`, struct { + MinioAccessKey string + MinioSecretKey string + MinioEndpoint string + }{ + MinioAccessKey: e2edb.MinioAccessKey, + MinioSecretKey: e2edb.MinioSecretKey, + MinioEndpoint: fmt.Sprintf("%s-minio-9000:9000", networkName), + }) + + ChunksStorageFlags = map[string]string{ "-dynamodb.url": fmt.Sprintf("dynamodb://u:p@%s-dynamodb.:8000", networkName), "-dynamodb.poll-interval": "1m", "-config-yaml": filepath.Join(e2e.ContainerSharedDir, cortexSchemaConfigFile), "-table-manager.retention-period": "168h", } + + ChunksStorageConfig = buildConfigFromTemplate(` +storage: + aws: + dynamodbconfig: + dynamodb: {{.DynamoDBURL}} + +table_manager: + dynamodb_poll_interval: 1m + retention_period: 168h + +schema: +{{.SchemaConfig}} +`, struct { + DynamoDBURL string + SchemaConfig string + }{ + DynamoDBURL: fmt.Sprintf("dynamodb://u:p@%s-dynamodb.:8000", networkName), + SchemaConfig: indentConfig(cortexSchemaConfigYaml, 2), + }) ) + +func buildConfigFromTemplate(tmpl string, data interface{}) string { + t, err := template.New("config").Parse(tmpl) + if err != nil { + panic(err) + } + + w := &strings.Builder{} + if err = t.Execute(w, data); err != nil { + panic(err) + } + + return w.String() +} + +func indentConfig(config string, indentation int) string { + output := strings.Builder{} + + for _, line := range strings.Split(config, "\n") { + if line == "" { + output.WriteString("\n") + continue + } + + output.WriteString(strings.Repeat(" ", indentation)) + output.WriteString(line) + output.WriteString("\n") + } + + return output.String() +} diff --git a/integration/e2e/util.go b/integration/e2e/util.go index 9962b95d5c3..e0efff31916 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -17,6 +17,10 @@ func RunCommandAndGetOutput(name string, args ...string) ([]byte, error) { return cmd.CombinedOutput() } +func EmptyFlags() map[string]string { + return map[string]string{} +} + func MergeFlags(inputs ...map[string]string) map[string]string { output := map[string]string{} diff --git a/integration/e2ecortex/service.go b/integration/e2ecortex/service.go new file mode 100644 index 00000000000..db3e740175a --- /dev/null +++ b/integration/e2ecortex/service.go @@ -0,0 +1,33 @@ +package e2ecortex + +import "github.com/cortexproject/cortex/integration/e2e" + +// CortexService represents a Cortex service with at least an HTTP and GRPC port exposed. +type CortexService struct { + *e2e.HTTPService + + grpcPort int +} + +func NewCortexService( + name string, + image string, + command *e2e.Command, + readiness *e2e.ReadinessProbe, + httpPort int, + grpcPort int, + otherPorts ...int, +) *CortexService { + return &CortexService{ + HTTPService: e2e.NewHTTPService(name, image, command, readiness, httpPort, otherPorts...), + grpcPort: grpcPort, + } +} + +func (s *CortexService) GRPCEndpoint() string { + return s.Endpoint(s.grpcPort) +} + +func (s *CortexService) NetworkGRPCEndpoint() string { + return s.NetworkEndpoint(s.grpcPort) +} diff --git a/integration/e2ecortex/services.go b/integration/e2ecortex/services.go index 08eee5f60d2..d91ff2df0fa 100644 --- a/integration/e2ecortex/services.go +++ b/integration/e2ecortex/services.go @@ -2,13 +2,14 @@ package e2ecortex import ( "os" + "path/filepath" "github.com/cortexproject/cortex/integration/e2e" ) const ( - HTTPPort = 80 - GRPCPort = 9095 + httpPort = 80 + grpcPort = 9095 ) // GetDefaultImage returns the Docker image to use to run Cortex. @@ -22,12 +23,20 @@ func GetDefaultImage() string { return "quay.io/cortexproject/cortex:latest" } -func NewDistributor(name string, consulAddress string, flags map[string]string, image string) *e2e.HTTPService { +func NewDistributor(name string, consulAddress string, flags map[string]string, image string) *CortexService { + return NewDistributorWithConfigFile(name, consulAddress, "", flags, image) +} + +func NewDistributorWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService { + if configFile != "" { + flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile) + } + if image == "" { image = GetDefaultImage() } - return e2e.NewHTTPService( + return NewCortexService( name, image, e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ @@ -39,22 +48,31 @@ func NewDistributor(name string, consulAddress string, flags map[string]string, "-ring.store": "consul", "-consul.hostname": consulAddress, }, flags))...), - e2e.NewReadinessProbe(HTTPPort, "/ring", 200), - HTTPPort, + e2e.NewReadinessProbe(httpPort, "/ring", 200), + httpPort, + grpcPort, ) } -func NewQuerier(name string, consulAddress string, flags map[string]string, image string) *e2e.HTTPService { +func NewQuerier(name string, consulAddress string, flags map[string]string, image string) *CortexService { + return NewQuerierWithConfigFile(name, consulAddress, "", flags, image) +} + +func NewQuerierWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService { + if configFile != "" { + flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile) + } + if image == "" { image = GetDefaultImage() } - return e2e.NewHTTPService( + return NewCortexService( name, image, e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ "-target": "querier", - "-log.level": "info", // TODO warn + "-log.level": "warn", "-distributor.replication-factor": "1", // Configure the ingesters ring backend "-ring.store": "consul", @@ -65,17 +83,25 @@ func NewQuerier(name string, consulAddress string, flags map[string]string, imag "-querier.frontend-client.backoff-retries": "1", "-querier.worker-parallelism": "1", }, flags))...), - e2e.NewReadinessProbe(HTTPPort, "/ready", 204), - HTTPPort, + e2e.NewReadinessProbe(httpPort, "/ready", 204), + httpPort, + grpcPort, ) } -func NewIngester(name string, consulAddress string, flags map[string]string, image string) *e2e.HTTPService { +func NewIngester(name string, consulAddress string, flags map[string]string, image string) *CortexService { + return NewIngesterWithConfigFile(name, consulAddress, "", flags, image) +} + +func NewIngesterWithConfigFile(name, consulAddress, configFile string, flags map[string]string, image string) *CortexService { + if configFile != "" { + flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile) + } if image == "" { image = GetDefaultImage() } - return e2e.NewHTTPService( + return NewCortexService( name, image, e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ @@ -91,17 +117,26 @@ func NewIngester(name string, consulAddress string, flags map[string]string, ima "-ring.store": "consul", "-consul.hostname": consulAddress, }, flags))...), - e2e.NewReadinessProbe(HTTPPort, "/ready", 204), - HTTPPort, + e2e.NewReadinessProbe(httpPort, "/ready", 204), + httpPort, + grpcPort, ) } -func NewTableManager(name string, flags map[string]string, image string) *e2e.HTTPService { +func NewTableManager(name string, flags map[string]string, image string) *CortexService { + return NewTableManagerWithConfigFile(name, "", flags, image) +} + +func NewTableManagerWithConfigFile(name, configFile string, flags map[string]string, image string) *CortexService { + if configFile != "" { + flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile) + } + if image == "" { image = GetDefaultImage() } - return e2e.NewHTTPService( + return NewCortexService( name, image, e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ @@ -109,36 +144,45 @@ func NewTableManager(name string, flags map[string]string, image string) *e2e.HT "-log.level": "warn", }, flags))...), // The table-manager doesn't expose a readiness probe, so we just check if the / returns 404 - e2e.NewReadinessProbe(HTTPPort, "/", 404), - HTTPPort, + e2e.NewReadinessProbe(httpPort, "/", 404), + httpPort, + grpcPort, ) } -func NewQueryFrontend(name string, flags map[string]string, image string) *e2e.HTTPService { +func NewQueryFrontend(name string, flags map[string]string, image string) *CortexService { + return NewQueryFrontendWithConfigFile(name, "", flags, image) +} + +func NewQueryFrontendWithConfigFile(name, configFile string, flags map[string]string, image string) *CortexService { + if configFile != "" { + flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile) + } + if image == "" { image = GetDefaultImage() } - return e2e.NewHTTPService( + return NewCortexService( name, image, e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ "-target": "query-frontend", - "-log.level": "info", // TODO warn + "-log.level": "warn", }, flags))...), // The query-frontend doesn't expose a readiness probe, so we just check if the / returns 404 - e2e.NewReadinessProbe(HTTPPort, "/", 404), - HTTPPort, - GRPCPort, + e2e.NewReadinessProbe(httpPort, "/", 404), + httpPort, + grpcPort, ) } -func NewSingleBinary(name string, flags map[string]string, image string, httpPort int, otherPorts ...int) *e2e.HTTPService { +func NewSingleBinary(name string, flags map[string]string, image string, httpPort, grpcPort int, otherPorts ...int) *CortexService { if image == "" { image = GetDefaultImage() } - return e2e.NewHTTPService( + return NewCortexService( name, image, e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ @@ -146,16 +190,17 @@ func NewSingleBinary(name string, flags map[string]string, image string, httpPor }, flags))...), e2e.NewReadinessProbe(httpPort, "/ready", 204), httpPort, + grpcPort, otherPorts..., ) } -func NewAlertmanager(name string, flags map[string]string, image string) *e2e.HTTPService { +func NewAlertmanager(name string, flags map[string]string, image string) *CortexService { if image == "" { image = GetDefaultImage() } - return e2e.NewHTTPService( + return NewCortexService( name, image, e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ @@ -163,7 +208,8 @@ func NewAlertmanager(name string, flags map[string]string, image string) *e2e.HT "-log.level": "warn", }, flags))...), // The alertmanager doesn't expose a readiness probe, so we just check if the / returns 404 - e2e.NewReadinessProbe(HTTPPort, "/", 404), - HTTPPort, + e2e.NewReadinessProbe(httpPort, "/", 404), + httpPort, + grpcPort, ) } diff --git a/integration/getting_started_single_process_config_test.go b/integration/getting_started_single_process_config_test.go index b78a6fc3878..5eaab726e92 100644 --- a/integration/getting_started_single_process_config_test.go +++ b/integration/getting_started_single_process_config_test.go @@ -26,10 +26,10 @@ func TestGettingStartedSingleProcessConfig(t *testing.T) { "-config.file": filepath.Join(e2e.ContainerSharedDir, cortexConfigFile), } - cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "", 9009) + cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "", 9009, 9095) require.NoError(t, s.StartAndWaitReady(cortex)) - c, err := e2ecortex.NewClient(cortex.Endpoint(9009), cortex.Endpoint(9009), "", "user-1") + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "user-1") require.NoError(t, err) // Push some series to Cortex. diff --git a/integration/ingester_flush_test.go b/integration/ingester_flush_test.go index bbc53bcc5c1..49506731d45 100644 --- a/integration/ingester_flush_test.go +++ b/integration/ingester_flush_test.go @@ -30,12 +30,12 @@ func TestIngesterFlushWithChunksStorage(t *testing.T) { // Start Cortex components. require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) - tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorage, "") - ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorage, map[string]string{ + tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "") + ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), mergeFlags(ChunksStorageFlags, map[string]string{ "-ingester.max-transfer-retries": "0", }), "") - querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorage, "") - distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorage, "") + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "") + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "") require.NoError(t, s.StartAndWaitReady(distributor, querier, ingester1, tableManager)) // Wait until the first table-manager sync has completed, so that we're diff --git a/integration/ingester_hand_over_test.go b/integration/ingester_hand_over_test.go index 8d71006131e..019f9d37e32 100644 --- a/integration/ingester_hand_over_test.go +++ b/integration/ingester_hand_over_test.go @@ -14,20 +14,20 @@ import ( ) func TestIngesterHandOverWithBlocksStorage(t *testing.T) { - runIngesterHandOverTest(t, BlocksStorage, func(t *testing.T, s *e2e.Scenario) { - minio := e2edb.NewMinio(9000, BlocksStorage["-experimental.tsdb.s3.bucket-name"]) + runIngesterHandOverTest(t, BlocksStorageFlags, func(t *testing.T, s *e2e.Scenario) { + minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"]) require.NoError(t, s.StartAndWaitReady(minio)) }) } func TestIngesterHandOverWithChunksStorage(t *testing.T) { - runIngesterHandOverTest(t, ChunksStorage, func(t *testing.T, s *e2e.Scenario) { + runIngesterHandOverTest(t, ChunksStorageFlags, func(t *testing.T, s *e2e.Scenario) { dynamo := e2edb.NewDynamoDB() require.NoError(t, s.StartAndWaitReady(dynamo)) require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) - tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorage, "") + tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "") require.NoError(t, s.StartAndWaitReady(tableManager)) // Wait until the first table-manager sync has completed, so that we're diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index bfdec60181e..c07ea00d2f6 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -55,7 +55,7 @@ func TestSingleBinaryWithMemberlist(t *testing.T) { require.NoError(t, s.Stop(cortex3)) } -func newSingleBinary(name string, join string) *e2e.HTTPService { +func newSingleBinary(name string, join string) *e2ecortex.CortexService { flags := map[string]string{ "-target": "all", // single-binary mode "-log.level": "warn", @@ -77,7 +77,7 @@ func newSingleBinary(name string, join string) *e2e.HTTPService { serv := e2ecortex.NewSingleBinary( name, - mergeFlags(ChunksStorage, flags), + mergeFlags(ChunksStorageFlags, flags), "", 80, 8000, diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 5c3f80532c4..e0b74102c94 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -16,30 +16,66 @@ import ( "github.com/cortexproject/cortex/integration/e2ecortex" ) -func TestQueryFrontendWithBlocksStorage(t *testing.T) { - runQueryFrontendTest(t, BlocksStorage, func(t *testing.T, s *e2e.Scenario) { - minio := e2edb.NewMinio(9000, BlocksStorage["-experimental.tsdb.s3.bucket-name"]) +type queryFrontendSetup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) + +func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) { + runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + return "", BlocksStorageFlags + }) +} + +func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) { + runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) + + minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"]) require.NoError(t, s.StartAndWaitReady(minio)) + + return cortexConfigFile, e2e.EmptyFlags() }) } -func TestQueryFrontendWithChunksStorage(t *testing.T) { - runQueryFrontendTest(t, ChunksStorage, func(t *testing.T, s *e2e.Scenario) { +func TestQueryFrontendWithChunksStorageViaFlags(t *testing.T) { + runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) + dynamo := e2edb.NewDynamoDB() require.NoError(t, s.StartAndWaitReady(dynamo)) + tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "") + require.NoError(t, s.StartAndWaitReady(tableManager)) + + // Wait until the first table-manager sync has completed, so that we're + // sure the tables have been created. + require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_dynamo_sync_tables_seconds")) + + return "", ChunksStorageFlags + }) +} + +func TestQueryFrontendWithChunksStorageViaConfigFile(t *testing.T) { + runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(ChunksStorageConfig))) require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) - tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorage, "") + dynamo := e2edb.NewDynamoDB() + require.NoError(t, s.StartAndWaitReady(dynamo)) + + tableManager := e2ecortex.NewTableManagerWithConfigFile("table-manager", cortexConfigFile, e2e.EmptyFlags(), "") require.NoError(t, s.StartAndWaitReady(tableManager)) // Wait until the first table-manager sync has completed, so that we're // sure the tables have been created. require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_dynamo_sync_tables_seconds")) + + return cortexConfigFile, e2e.EmptyFlags() }) } -func runQueryFrontendTest(t *testing.T, flags map[string]string, setup func(t *testing.T, s *e2e.Scenario)) { +func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) { const numUsers = 10 const numQueriesPerUser = 10 @@ -50,18 +86,18 @@ func runQueryFrontendTest(t *testing.T, flags map[string]string, setup func(t *t consul := e2edb.NewConsul() require.NoError(t, s.StartAndWaitReady(consul)) - setup(t, s) + configFile, flags := setup(t, s) // Start Cortex components. - queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") - ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") - distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", configFile, flags, "") + ingester := e2ecortex.NewIngesterWithConfigFile("ingester", consul.NetworkHTTPEndpoint(), configFile, flags, "") + distributor := e2ecortex.NewDistributorWithConfigFile("distributor", consul.NetworkHTTPEndpoint(), configFile, flags, "") require.NoError(t, s.StartAndWaitReady(queryFrontend, distributor, ingester)) // Start the querier after the query-frontend otherwise we're not // able to get the query-frontend network endpoint. - querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-querier.frontend-address": queryFrontend.NetworkEndpoint(e2ecortex.GRPCPort), + querier := e2ecortex.NewQuerierWithConfigFile("querier", consul.NetworkHTTPEndpoint(), configFile, mergeFlags(flags, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), }), "") require.NoError(t, s.StartAndWaitReady(querier)) diff --git a/integration/util.go b/integration/util.go index 33d3665a83d..d2147120f7c 100644 --- a/integration/util.go +++ b/integration/util.go @@ -28,8 +28,15 @@ func getCortexProjectDir() string { } func writeFileToSharedDir(s *e2e.Scenario, dst string, content []byte) error { + dst = filepath.Join(s.SharedDir(), dst) + + // Ensure the entire path of directories exist. + if err := os.MkdirAll(filepath.Dir(dst), os.ModePerm); err != nil { + return err + } + return ioutil.WriteFile( - filepath.Join(s.SharedDir(), dst), + dst, content, os.ModePerm) }