Skip to content

Commit 1734c75

Browse files
Add an ability to load an arbitrary list of Cortex modules (#3275)
* Add an ability to load an arbitrary list of Cortex modules Signed-off-by: Igor Novgorodov <[email protected]> * Fix module loading logic Signed-off-by: Igor Novgorodov <[email protected]> * Improve multiple module loading Signed-off-by: Igor Novgorodov <[email protected]> * Add forgotten files Signed-off-by: Igor Novgorodov <[email protected]> * Fix linter errors Signed-off-by: Igor Novgorodov <[email protected]> * Reduce state in ModuleManager Signed-off-by: Igor Novgorodov <[email protected]> * Fix pkg/cortex & pkg/util/modules tests, make initModules() stateless, update docs Signed-off-by: Igor Novgorodov <[email protected]> * Fix missed mess with &c.Target pointer cast Signed-off-by: Igor Novgorodov <[email protected]> * Cosmetic changes to minimize diff Signed-off-by: Igor Novgorodov <[email protected]> * Fix problems with PR #3287 Signed-off-by: Igor Novgorodov <[email protected]> * Update CHANGELOG.md Signed-off-by: Marco Pracucci <[email protected]> * Update pkg/cortex/cortex_test.go Signed-off-by: Marco Pracucci <[email protected]> * Fixed linter Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent 56bbaba commit 1734c75

File tree

12 files changed

+174
-38
lines changed

12 files changed

+174
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* [CHANGE] Increased default `-<prefix>.redis.timeout` from `100ms` to `500ms`. #3301
5050
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
5151
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
52+
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275
5253
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
5354
- `-blocks-storage.s3.http.idle-conn-timeout`
5455
- `-blocks-storage.s3.http.response-header-timeout`

cmd/cortex/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,13 @@ func main() {
146146
// In testing mode skip JAEGER setup to avoid panic due to
147147
// "duplicate metrics collector registration attempted"
148148
if !testMode {
149+
name := "cortex"
150+
if len(cfg.Target) == 1 {
151+
name += "-" + cfg.Target[0]
152+
}
153+
149154
// Setting the environment variable JAEGER_AGENT_HOST enables tracing.
150-
if trace, err := tracing.NewFromEnv("cortex-" + cfg.Target); err != nil {
155+
if trace, err := tracing.NewFromEnv(name); err != nil {
151156
level.Error(util.Logger).Log("msg", "Failed to setup tracing", "err", err.Error())
152157
} else {
153158
defer trace.Close()

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ Where default_value is the value to use if the environment variable is undefined
5252
### Supported contents and default values of the config file
5353

5454
```yaml
55-
# The Cortex module to run. Use "-modules" command line flag to get a list of
56-
# available modules, and to see which modules are included in "All".
55+
# Comma-separated list of Cortex modules to load. The alias 'all' can be used in
56+
# the list to load a number of core modules and will enable single-binary mode.
57+
# Use '-modules' command line flag to get a list of available modules, and to
58+
# see which modules are included in 'all'.
5759
# CLI flag: -target
5860
[target: <string> | default = "all"]
5961

pkg/cortex/cortex.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ import (
7171

7272
// Config is the root config for Cortex.
7373
type Config struct {
74-
Target string `yaml:"target"`
75-
AuthEnabled bool `yaml:"auth_enabled"`
76-
PrintConfig bool `yaml:"-"`
77-
HTTPPrefix string `yaml:"http_prefix"`
74+
Target flagext.StringSliceCSV `yaml:"target"`
75+
AuthEnabled bool `yaml:"auth_enabled"`
76+
PrintConfig bool `yaml:"-"`
77+
HTTPPrefix string `yaml:"http_prefix"`
7878

7979
API api.Config `yaml:"api"`
8080
Server server.Config `yaml:"server"`
@@ -109,7 +109,14 @@ type Config struct {
109109
func (c *Config) RegisterFlags(f *flag.FlagSet) {
110110
c.Server.MetricsNamespace = "cortex"
111111
c.Server.ExcludeRequestInLog = true
112-
f.StringVar(&c.Target, "target", All, "The Cortex module to run. Use \"-modules\" command line flag to get a list of available modules, and to see which modules are included in \"All\".")
112+
113+
// Set the default module list to 'all'
114+
c.Target = []string{All}
115+
116+
f.Var(&c.Target, "target", "Comma-separated list of Cortex modules to load. "+
117+
"The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. "+
118+
"Use '-modules' command line flag to get a list of available modules, and to see which modules are included in 'all'.")
119+
113120
f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")
114121
f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.")
115122
f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.")
@@ -203,6 +210,10 @@ func (c *Config) Validate(log log.Logger) error {
203210
return nil
204211
}
205212

213+
func (c *Config) isModuleEnabled(m string) bool {
214+
return util.StringsContain(c.Target, m)
215+
}
216+
206217
// validateYAMLEmptyNodes ensure that no empty node has been specified in the YAML config file.
207218
// When an empty node is defined in YAML, the YAML parser sets the whole struct to its zero value
208219
// and so we loose all default values. It's very difficult to detect this case for the user, so we
@@ -307,16 +318,18 @@ func (t *Cortex) setupThanosTracing() {
307318

308319
// Run starts Cortex running, and blocks until a Cortex stops.
309320
func (t *Cortex) Run() error {
310-
if !t.ModuleManager.IsUserVisibleModule(t.Cfg.Target) {
311-
level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", t.Cfg.Target)
321+
for _, module := range t.Cfg.Target {
322+
if !t.ModuleManager.IsUserVisibleModule(module) {
323+
level.Warn(util.Logger).Log("msg", "selected target is an internal module, is this intended?", "target", module)
324+
}
312325
}
313326

314-
serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target)
327+
var err error
328+
t.ServiceMap, err = t.ModuleManager.InitModuleServices(t.Cfg.Target...)
315329
if err != nil {
316330
return err
317331
}
318332

319-
t.ServiceMap = serviceMap
320333
t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler))
321334

322335
// get all services, create service manager and tell it to start

pkg/cortex/cortex_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,14 @@ func TestCortex(t *testing.T) {
6868
},
6969
},
7070
},
71-
Target: All,
71+
72+
Target: []string{All, Compactor},
7273
}
7374

7475
c, err := New(cfg)
7576
require.NoError(t, err)
7677

77-
serviceMap, err := c.ModuleManager.InitModuleServices(c.Cfg.Target)
78+
serviceMap, err := c.ModuleManager.InitModuleServices(cfg.Target...)
7879
require.NoError(t, err)
7980
require.NotNil(t, serviceMap)
8081

@@ -88,4 +89,7 @@ func TestCortex(t *testing.T) {
8889
require.NotNil(t, serviceMap[IngesterService])
8990
require.NotNil(t, serviceMap[Ring])
9091
require.NotNil(t, serviceMap[DistributorService])
92+
93+
// check that compactor is configured which is not part of Target=All
94+
require.NotNil(t, serviceMap[Compactor])
9195
}

pkg/cortex/modules.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
135135
t.Cfg.LimitsConfig.RulerEvaluationDelay = t.Cfg.Ruler.EvaluationDelay
136136

137137
// No need to report if this field isn't going to be used.
138-
if t.Cfg.Target == All || t.Cfg.Target == Ruler {
138+
if t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(All) {
139139
flagext.DeprecatedFlagsUsed.Inc()
140140
level.Warn(util.Logger).Log("msg", "Using DEPRECATED YAML config field ruler.evaluation_delay_duration, please use limits.ruler_evaluation_delay_duration instead.")
141141
}
@@ -173,7 +173,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
173173
// Check whether the distributor can join the distributors ring, which is
174174
// whenever it's not running as an internal dependency (ie. querier or
175175
// ruler's dependency)
176-
canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor)
176+
canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All)
177177

178178
t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer)
179179
if err != nil {
@@ -222,12 +222,12 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
222222
}, []string{"method", "route"})
223223

224224
// if we are not configured for single binary mode then the querier needs to register its paths externally
225-
registerExternally := t.Cfg.Target != All
225+
registerExternally := !t.Cfg.isModuleEnabled(All)
226226
handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally, t.TombstonesLoader, querierRequestDuration, receivedMessageSize, sentMessageSize, inflightRequests)
227227

228228
// single binary mode requires a properly configured worker. if the operator did not attempt to configure the
229229
// worker we will attempt an automatic configuration here
230-
if t.Cfg.Worker.Address == "" && t.Cfg.Target == All {
230+
if t.Cfg.Worker.Address == "" && t.Cfg.isModuleEnabled(All) {
231231
address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort)
232232
level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address)
233233
t.Cfg.Worker.Address = address
@@ -298,7 +298,7 @@ func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, l
298298
case storage.StorageEngineBlocks:
299299
// When running in single binary, if the blocks sharding is disabled and no custom
300300
// store-gateway address has been configured, we can set it to the running process.
301-
if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
301+
if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
302302
cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort)
303303
}
304304

@@ -510,13 +510,12 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
510510
// unfortunately there is no way to generate a "default" config and compare default against actual
511511
// to determine if it's unconfigured. the following check, however, correctly tests this.
512512
// Single binary integration tests will break if this ever drifts
513-
if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() {
513+
if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
514514
level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
515515
return
516516
}
517517

518518
t.RulerStorage, err = ruler.NewRuleStorage(t.Cfg.Ruler.StoreConfig, rules.FileLoader{})
519-
520519
return
521520
}
522521

@@ -574,7 +573,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
574573
return
575574
}
576575

577-
t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Target == AlertManager, t.Cfg.Alertmanager.EnableAPI)
576+
t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI)
578577
return t.Alertmanager, nil
579578
}
580579

@@ -593,7 +592,7 @@ func (t *Cortex) initCompactor() (serv services.Service, err error) {
593592

594593
func (t *Cortex) initStoreGateway() (serv services.Service, err error) {
595594
if t.Cfg.Storage.Engine != storage.StorageEngineBlocks {
596-
if t.Cfg.Target != All {
595+
if !t.Cfg.isModuleEnabled(All) {
597596
return nil, fmt.Errorf("storage engine must be set to blocks to enable the store-gateway")
598597
}
599598
return nil, nil

pkg/cortex/server_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S
5454
return services.NewBasicService(nil, runFn, stoppingFn)
5555
}
5656

57+
// DisableSignalHandling puts a dummy signal handler
5758
func DisableSignalHandling(config *server.Config) {
5859
config.SignalHandler = make(ignoreSignalHandler)
5960
}

pkg/util/flagext/stringslicecsv.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package flagext
2+
3+
import "strings"
4+
5+
// StringSliceCSV is a slice of strings that is parsed from a comma-separated string
6+
// It implements flag.Value and yaml Marshalers
7+
type StringSliceCSV []string
8+
9+
// String implements flag.Value
10+
func (v StringSliceCSV) String() string {
11+
return strings.Join(v, ",")
12+
}
13+
14+
// Set implements flag.Value
15+
func (v *StringSliceCSV) Set(s string) error {
16+
*v = strings.Split(s, ",")
17+
return nil
18+
}
19+
20+
// UnmarshalYAML implements yaml.Unmarshaler.
21+
func (v *StringSliceCSV) UnmarshalYAML(unmarshal func(interface{}) error) error {
22+
var s string
23+
if err := unmarshal(&s); err != nil {
24+
return err
25+
}
26+
27+
return v.Set(s)
28+
}
29+
30+
// MarshalYAML implements yaml.Marshaler.
31+
func (v StringSliceCSV) MarshalYAML() (interface{}, error) {
32+
return v.String(), nil
33+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package flagext
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"gopkg.in/yaml.v2"
8+
)
9+
10+
func Test_StringSliceCSV(t *testing.T) {
11+
type TestStruct struct {
12+
CSV StringSliceCSV `yaml:"csv"`
13+
}
14+
15+
var testStruct TestStruct
16+
s := "a,b,c,d"
17+
assert.Nil(t, testStruct.CSV.Set(s))
18+
19+
assert.Equal(t, []string{"a", "b", "c", "d"}, []string(testStruct.CSV))
20+
assert.Equal(t, s, testStruct.CSV.String())
21+
22+
expected := []byte(`csv: a,b,c,d
23+
`)
24+
25+
actual, err := yaml.Marshal(testStruct)
26+
assert.Nil(t, err)
27+
assert.Equal(t, expected, actual)
28+
29+
var testStruct2 TestStruct
30+
31+
err = yaml.Unmarshal(expected, &testStruct2)
32+
assert.Nil(t, err)
33+
assert.Equal(t, testStruct, testStruct2)
34+
}

pkg/util/modules/modules.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,29 +64,45 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error {
6464
return nil
6565
}
6666

67-
// InitModuleServices initialises the target module by initialising all its dependencies
67+
// InitModuleServices initialises given modules by initialising all their dependencies
6868
// in the right order. Modules are wrapped in such a way that they start after their
6969
// dependencies have been started and stop before their dependencies are stopped.
70-
func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
71-
if _, ok := m.modules[target]; !ok {
72-
return nil, fmt.Errorf("unrecognised module name: %s", target)
70+
func (m *Manager) InitModuleServices(modules ...string) (map[string]services.Service, error) {
71+
servicesMap := map[string]services.Service{}
72+
initMap := map[string]bool{}
73+
74+
for _, module := range modules {
75+
if err := m.initModule(module, initMap, servicesMap); err != nil {
76+
return nil, err
77+
}
7378
}
7479

75-
servicesMap := map[string]services.Service{}
80+
return servicesMap, nil
81+
}
82+
83+
func (m *Manager) initModule(name string, initMap map[string]bool, servicesMap map[string]services.Service) error {
84+
if _, ok := m.modules[name]; !ok {
85+
return fmt.Errorf("unrecognised module name: %s", name)
86+
}
7687

7788
// initialize all of our dependencies first
78-
deps := m.orderedDeps(target)
79-
deps = append(deps, target) // lastly, initialize the requested module
89+
deps := m.orderedDeps(name)
90+
deps = append(deps, name) // lastly, initialize the requested module
8091

8192
for ix, n := range deps {
93+
// Skip already initialized modules
94+
if initMap[n] {
95+
continue
96+
}
97+
8298
mod := m.modules[n]
8399

84100
var serv services.Service
85101

86102
if mod.initFn != nil {
87103
s, err := mod.initFn()
88104
if err != nil {
89-
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
105+
return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
90106
}
91107

92108
if s != nil {
@@ -99,9 +115,11 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service
99115
if serv != nil {
100116
servicesMap[n] = serv
101117
}
118+
119+
initMap[n] = true
102120
}
103121

104-
return servicesMap, nil
122+
return nil
105123
}
106124

107125
// UserVisibleModuleNames gets list of module names that are

0 commit comments

Comments
 (0)