Skip to content

Create a experimental HealthCheck GRPC Handler #6225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224

## 1.18.0 2024-09-03
Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,24 @@ querier:
# CLI flag: -querier.store-gateway-client.grpc-compression
[grpc_compression: <string> | default = ""]

# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each
# target and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before
# considering a target unhealthy. 0 means disabled.
# CLI flag: -querier.store-gateway-client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]

# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -querier.store-gateway-client.interval
[interval: <duration> | default = 5s]

# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -querier.store-gateway-client.timeout
[timeout: <duration> | default = 1s]

# If enabled, store gateway query stats will be logged using `info` log level.
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]
Expand Down
36 changes: 36 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3099,6 +3099,24 @@ grpc_client_config:
# CLI flag: -ingester.client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]

# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target
# and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before considering
# a target unhealthy. 0 means disabled.
# CLI flag: -ingester.client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]

# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -ingester.client.interval
[interval: <duration> | default = 5s]

# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -ingester.client.timeout
[timeout: <duration> | default = 1s]

# Max inflight push requests that this ingester client can handle. This limit is
# per-ingester-client. Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.client.max-inflight-push-requests
Expand Down Expand Up @@ -3815,6 +3833,24 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-client.grpc-compression
[grpc_compression: <string> | default = ""]

# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target
# and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before considering
# a target unhealthy. 0 means disabled.
# CLI flag: -querier.store-gateway-client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]

# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -querier.store-gateway-client.interval
[interval: <duration> | default = 5s]

# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -querier.store-gateway-client.timeout
[timeout: <duration> | default = 1s]

# If enabled, store gateway query stats will be logged using `info` log level.
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]
Expand Down
20 changes: 18 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/modules"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
Expand All @@ -65,6 +66,7 @@ const (
Server string = "server"
Distributor string = "distributor"
DistributorService string = "distributor-service"
GrpcClientService string = "grpcclient-service"
Ingester string = "ingester"
IngesterService string = "ingester-service"
Flusher string = "flusher"
Expand Down Expand Up @@ -230,6 +232,19 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
return t.Distributor, nil
}

func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) {
s := grpcclient.NewHealthCheckInterceptors(util_log.Logger)
if t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.UnhealthyThreshold > 0 {
t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.HealthCheckInterceptors = s
}

if t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.UnhealthyThreshold > 0 {
t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.HealthCheckInterceptors = s
}

return s, nil
}

func (t *Cortex) initDistributor() (serv services.Service, err error) {
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor)

Expand Down Expand Up @@ -754,6 +769,7 @@ func (t *Cortex) setupModuleManager() error {
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule)
mm.RegisterModule(GrpcClientService, t.initGrpcClientServices, modules.UserInvisibleModule)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(IngesterService, t.initIngesterService, modules.UserInvisibleModule)
mm.RegisterModule(Flusher, t.initFlusher)
Expand Down Expand Up @@ -782,14 +798,14 @@ func (t *Cortex) setupModuleManager() error {
Ring: {API, RuntimeConfig, MemberlistKV},
Overrides: {RuntimeConfig},
OverridesExporter: {RuntimeConfig},
Distributor: {DistributorService, API},
Distributor: {DistributorService, API, GrpcClientService},
DistributorService: {Ring, Overrides},
Ingester: {IngesterService, Overrides, API},
IngesterService: {Overrides, RuntimeConfig, MemberlistKV},
Flusher: {Overrides, API},
Queryable: {Overrides, DistributorService, Overrides, Ring, API, StoreQueryable, MemberlistKV},
Querier: {TenantFederation},
StoreQueryable: {Overrides, Overrides, MemberlistKV},
StoreQueryable: {Overrides, Overrides, MemberlistKV, GrpcClientService},
QueryFrontendTripperware: {API, Overrides},
QueryFrontend: {QueryFrontendTripperware},
QueryScheduler: {API, Overrides},
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
GRPCClientConfig grpcclient.ConfigWithHealthCheck `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
Expand Down
33 changes: 19 additions & 14 deletions pkg/querier/store_gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/tls"
)

func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
func newStoreGatewayClientFactory(clientCfg grpcclient.ConfigWithHealthCheck, reg prometheus.Registerer) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "storegateway_client_request_duration_seconds",
Expand All @@ -31,7 +31,7 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Re
}
}

func dialStoreGatewayClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
func dialStoreGatewayClient(clientCfg grpcclient.ConfigWithHealthCheck, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
if err != nil {
return nil, err
Expand Down Expand Up @@ -69,15 +69,18 @@ func (c *storeGatewayClient) RemoteAddress() string {

func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool {
// We prefer sane defaults instead of exposing further config options.
clientCfg := grpcclient.Config{
MaxRecvMsgSize: 100 << 20,
MaxSendMsgSize: 16 << 20,
GRPCCompression: clientConfig.GRPCCompression,
RateLimit: 0,
RateLimitBurst: 0,
BackoffOnRatelimits: false,
TLSEnabled: clientConfig.TLSEnabled,
TLS: clientConfig.TLS,
clientCfg := grpcclient.ConfigWithHealthCheck{
Config: grpcclient.Config{
MaxRecvMsgSize: 100 << 20,
MaxSendMsgSize: 16 << 20,
GRPCCompression: clientConfig.GRPCCompression,
RateLimit: 0,
RateLimitBurst: 0,
BackoffOnRatelimits: false,
TLSEnabled: clientConfig.TLSEnabled,
TLS: clientConfig.TLS,
},
HealthCheckConfig: clientConfig.HealthCheckConfig,
}
poolCfg := client.PoolConfig{
CheckInterval: time.Minute,
Expand All @@ -96,13 +99,15 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf
}

type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
GRPCCompression string `yaml:"grpc_compression"`
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
GRPCCompression string `yaml:"grpc_compression"`
HealthCheckConfig grpcclient.HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."`
}

func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f)
}
2 changes: 1 addition & 1 deletion pkg/querier/store_gateway_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Test_newStoreGatewayClientFactory(t *testing.T) {

// Create a client factory and query back the mocked service
// with different clients.
cfg := grpcclient.Config{}
cfg := grpcclient.ConfigWithHealthCheck{}
flagext.DefaultValues(&cfg)

reg := prometheus.NewPedanticRegistry()
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ type Config struct {
SignWriteRequestsEnabled bool `yaml:"-"`
}

type ConfigWithHealthCheck struct {
Config `yaml:",inline"`
HealthCheckConfig HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", "", f)
}

func (cfg *ConfigWithHealthCheck) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix, defaultGrpcCompression, f)
cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f)
}

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
Expand Down Expand Up @@ -75,6 +85,15 @@ func (cfg *Config) CallOptions() []grpc.CallOption {
return opts
}

func (cfg *ConfigWithHealthCheck) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
if cfg.HealthCheckConfig.HealthCheckInterceptors != nil {
unaryClientInterceptors = append(unaryClientInterceptors, cfg.HealthCheckConfig.UnaryHealthCheckInterceptor(cfg))
streamClientInterceptors = append(streamClientInterceptors, cfg.HealthCheckConfig.StreamClientInterceptor(cfg))
}

return cfg.Config.DialOption(unaryClientInterceptors, streamClientInterceptors)
}

// DialOption returns the config as a grpc.DialOptions.
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
Expand Down
Loading
Loading