diff --git a/CHANGELOG.md b/CHANGELOG.md index a86e11d57d9..c4204e9667d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182 * [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195 * [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209 +* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 ## 1.18.0 2024-09-03 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index f0c5d897732..14cdcc400e1 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -204,6 +204,24 @@ querier: # CLI flag: -querier.store-gateway-client.grpc-compression [grpc_compression: | default = ""] + # EXPERIMENTAL: If enabled, gRPC clients perform health checks for each + # target and fail the request if the target is marked as unhealthy. + healthcheck_config: + # The number of consecutive failed health checks required before + # considering a target unhealthy. 0 means disabled. + # CLI flag: -querier.store-gateway-client.unhealthy-threshold + [unhealthy_threshold: | default = 0] + + # The approximate amount of time between health checks of an individual + # target. + # CLI flag: -querier.store-gateway-client.interval + [interval: | default = 5s] + + # The amount of time during which no response from a target means a failed + # health check. + # CLI flag: -querier.store-gateway-client.timeout + [timeout: | default = 1s] + # If enabled, store gateway query stats will be logged using `info` log level. # CLI flag: -querier.store-gateway-query-stats-enabled [store_gateway_query_stats: | default = true] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 48329d1648f..c217d95c160 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3099,6 +3099,24 @@ grpc_client_config: # CLI flag: -ingester.client.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] + # EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target + # and fail the request if the target is marked as unhealthy. + healthcheck_config: + # The number of consecutive failed health checks required before considering + # a target unhealthy. 0 means disabled. + # CLI flag: -ingester.client.unhealthy-threshold + [unhealthy_threshold: | default = 0] + + # The approximate amount of time between health checks of an individual + # target. + # CLI flag: -ingester.client.interval + [interval: | default = 5s] + + # The amount of time during which no response from a target means a failed + # health check. + # CLI flag: -ingester.client.timeout + [timeout: | default = 1s] + # Max inflight push requests that this ingester client can handle. This limit is # per-ingester-client. Additional requests will be rejected. 0 = unlimited. # CLI flag: -ingester.client.max-inflight-push-requests @@ -3815,6 +3833,24 @@ store_gateway_client: # CLI flag: -querier.store-gateway-client.grpc-compression [grpc_compression: | default = ""] + # EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target + # and fail the request if the target is marked as unhealthy. + healthcheck_config: + # The number of consecutive failed health checks required before considering + # a target unhealthy. 0 means disabled. + # CLI flag: -querier.store-gateway-client.unhealthy-threshold + [unhealthy_threshold: | default = 0] + + # The approximate amount of time between health checks of an individual + # target. + # CLI flag: -querier.store-gateway-client.interval + [interval: | default = 5s] + + # The amount of time during which no response from a target means a failed + # health check. + # CLI flag: -querier.store-gateway-client.timeout + [timeout: | default = 1s] + # If enabled, store gateway query stats will be logged using `info` log level. # CLI flag: -querier.store-gateway-query-stats-enabled [store_gateway_query_stats: | default = true] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index bcb7517b9e9..b1450d30b4d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -48,6 +48,7 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storegateway" + "github.com/cortexproject/cortex/pkg/util/grpcclient" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/modules" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" @@ -65,6 +66,7 @@ const ( Server string = "server" Distributor string = "distributor" DistributorService string = "distributor-service" + GrpcClientService string = "grpcclient-service" Ingester string = "ingester" IngesterService string = "ingester-service" Flusher string = "flusher" @@ -230,6 +232,19 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { return t.Distributor, nil } +func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) { + s := grpcclient.NewHealthCheckInterceptors(util_log.Logger) + if t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.UnhealthyThreshold > 0 { + t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.HealthCheckInterceptors = s + } + + if t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.UnhealthyThreshold > 0 { + t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.HealthCheckInterceptors = s + } + + return s, nil +} + func (t *Cortex) initDistributor() (serv services.Service, err error) { t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor) @@ -754,6 +769,7 @@ func (t *Cortex) setupModuleManager() error { mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule) + mm.RegisterModule(GrpcClientService, t.initGrpcClientServices, modules.UserInvisibleModule) mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(IngesterService, t.initIngesterService, modules.UserInvisibleModule) mm.RegisterModule(Flusher, t.initFlusher) @@ -782,14 +798,14 @@ func (t *Cortex) setupModuleManager() error { Ring: {API, RuntimeConfig, MemberlistKV}, Overrides: {RuntimeConfig}, OverridesExporter: {RuntimeConfig}, - Distributor: {DistributorService, API}, + Distributor: {DistributorService, API, GrpcClientService}, DistributorService: {Ring, Overrides}, Ingester: {IngesterService, Overrides, API}, IngesterService: {Overrides, RuntimeConfig, MemberlistKV}, Flusher: {Overrides, API}, Queryable: {Overrides, DistributorService, Overrides, Ring, API, StoreQueryable, MemberlistKV}, Querier: {TenantFederation}, - StoreQueryable: {Overrides, Overrides, MemberlistKV}, + StoreQueryable: {Overrides, Overrides, MemberlistKV, GrpcClientService}, QueryFrontendTripperware: {API, Overrides}, QueryFrontend: {QueryFrontendTripperware}, QueryScheduler: {API, Overrides}, diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index b437b95d01f..b1c5a8b28ab 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -111,8 +111,8 @@ func (c *closableHealthAndIngesterClient) Close() error { // Config is the configuration struct for the ingester client type Config struct { - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` - MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"` + GRPCClientConfig grpcclient.ConfigWithHealthCheck `yaml:"grpc_client_config"` + MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"` } // RegisterFlags registers configuration settings used by the ingester client config. diff --git a/pkg/querier/store_gateway_client.go b/pkg/querier/store_gateway_client.go index df246f000e2..088619599b3 100644 --- a/pkg/querier/store_gateway_client.go +++ b/pkg/querier/store_gateway_client.go @@ -17,7 +17,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/tls" ) -func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { +func newStoreGatewayClientFactory(clientCfg grpcclient.ConfigWithHealthCheck, reg prometheus.Registerer) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "storegateway_client_request_duration_seconds", @@ -31,7 +31,7 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Re } } -func dialStoreGatewayClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) { +func dialStoreGatewayClient(clientCfg grpcclient.ConfigWithHealthCheck, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) { opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration)) if err != nil { return nil, err @@ -69,15 +69,18 @@ func (c *storeGatewayClient) RemoteAddress() string { func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool { // We prefer sane defaults instead of exposing further config options. - clientCfg := grpcclient.Config{ - MaxRecvMsgSize: 100 << 20, - MaxSendMsgSize: 16 << 20, - GRPCCompression: clientConfig.GRPCCompression, - RateLimit: 0, - RateLimitBurst: 0, - BackoffOnRatelimits: false, - TLSEnabled: clientConfig.TLSEnabled, - TLS: clientConfig.TLS, + clientCfg := grpcclient.ConfigWithHealthCheck{ + Config: grpcclient.Config{ + MaxRecvMsgSize: 100 << 20, + MaxSendMsgSize: 16 << 20, + GRPCCompression: clientConfig.GRPCCompression, + RateLimit: 0, + RateLimitBurst: 0, + BackoffOnRatelimits: false, + TLSEnabled: clientConfig.TLSEnabled, + TLS: clientConfig.TLS, + }, + HealthCheckConfig: clientConfig.HealthCheckConfig, } poolCfg := client.PoolConfig{ CheckInterval: time.Minute, @@ -96,13 +99,15 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf } type ClientConfig struct { - TLSEnabled bool `yaml:"tls_enabled"` - TLS tls.ClientConfig `yaml:",inline"` - GRPCCompression string `yaml:"grpc_compression"` + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` + GRPCCompression string `yaml:"grpc_compression"` + HealthCheckConfig grpcclient.HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."` } func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.") f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") cfg.TLS.RegisterFlagsWithPrefix(prefix, f) + cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f) } diff --git a/pkg/querier/store_gateway_client_test.go b/pkg/querier/store_gateway_client_test.go index c1b306a7e1a..74d6c6f7df7 100644 --- a/pkg/querier/store_gateway_client_test.go +++ b/pkg/querier/store_gateway_client_test.go @@ -36,7 +36,7 @@ func Test_newStoreGatewayClientFactory(t *testing.T) { // Create a client factory and query back the mocked service // with different clients. - cfg := grpcclient.Config{} + cfg := grpcclient.ConfigWithHealthCheck{} flagext.DefaultValues(&cfg) reg := prometheus.NewPedanticRegistry() diff --git a/pkg/util/grpcclient/grpcclient.go b/pkg/util/grpcclient/grpcclient.go index edd767ad4d6..9816b7ab6c6 100644 --- a/pkg/util/grpcclient/grpcclient.go +++ b/pkg/util/grpcclient/grpcclient.go @@ -34,11 +34,21 @@ type Config struct { SignWriteRequestsEnabled bool `yaml:"-"` } +type ConfigWithHealthCheck struct { + Config `yaml:",inline"` + HealthCheckConfig HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."` +} + // RegisterFlags registers flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", "", f) } +func (cfg *ConfigWithHealthCheck) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) { + cfg.Config.RegisterFlagsWithPrefix(prefix, defaultGrpcCompression, f) + cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f) +} + // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *Config) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) { f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") @@ -75,6 +85,15 @@ func (cfg *Config) CallOptions() []grpc.CallOption { return opts } +func (cfg *ConfigWithHealthCheck) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { + if cfg.HealthCheckConfig.HealthCheckInterceptors != nil { + unaryClientInterceptors = append(unaryClientInterceptors, cfg.HealthCheckConfig.UnaryHealthCheckInterceptor(cfg)) + streamClientInterceptors = append(streamClientInterceptors, cfg.HealthCheckConfig.StreamClientInterceptor(cfg)) + } + + return cfg.Config.DialOption(unaryClientInterceptors, streamClientInterceptors) +} + // DialOption returns the config as a grpc.DialOptions. func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { var opts []grpc.DialOption diff --git a/pkg/util/grpcclient/health_check.go b/pkg/util/grpcclient/health_check.go new file mode 100644 index 00000000000..ad378e73b5c --- /dev/null +++ b/pkg/util/grpcclient/health_check.go @@ -0,0 +1,203 @@ +package grpcclient + +import ( + "context" + "errors" + "flag" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/weaveworks/common/user" + "go.uber.org/atomic" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/cortexproject/cortex/pkg/util/services" +) + +var ( + unhealthyErr = errors.New("instance marked as unhealthy") +) + +type HealthCheckConfig struct { + *HealthCheckInterceptors `yaml:"-"` + + UnhealthyThreshold int `yaml:"unhealthy_threshold"` + Interval time.Duration `yaml:"interval"` + Timeout time.Duration `yaml:"timeout"` +} + +// RegisterFlagsWithPrefix for Config. +func (cfg *HealthCheckConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.UnhealthyThreshold, prefix+".unhealthy-threshold", 0, "The number of consecutive failed health checks required before considering a target unhealthy. 0 means disabled.") + f.DurationVar(&cfg.Timeout, prefix+".timeout", 1*time.Second, "The amount of time during which no response from a target means a failed health check.") + f.DurationVar(&cfg.Interval, prefix+".interval", 5*time.Second, "The approximate amount of time between health checks of an individual target.") +} + +type healthCheckEntry struct { + address string + clientConfig *ConfigWithHealthCheck + + sync.RWMutex + unhealthyCount int + lastCheckTime atomic.Time + lastTickTime atomic.Time +} + +type HealthCheckInterceptors struct { + services.Service + logger log.Logger + + sync.RWMutex + activeInstances map[string]*healthCheckEntry + + instanceGcTimeout time.Duration + healthClientFactory func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient +} + +func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors { + h := &HealthCheckInterceptors{ + logger: logger, + instanceGcTimeout: 2 * time.Minute, + healthClientFactory: grpc_health_v1.NewHealthClient, + activeInstances: make(map[string]*healthCheckEntry), + } + + h.Service = services. + NewTimerService(time.Second, nil, h.iteration, nil).WithName("Grp Client HealthCheck Interceptors") + return h +} + +func (e *healthCheckEntry) isHealthy() bool { + e.RLock() + defer e.RUnlock() + return e.unhealthyCount < e.clientConfig.HealthCheckConfig.UnhealthyThreshold +} + +func (e *healthCheckEntry) recordHealth(err error) error { + e.Lock() + defer e.Unlock() + if err != nil { + e.unhealthyCount++ + } else { + e.unhealthyCount = 0 + } + + return err +} + +func (e *healthCheckEntry) tick() { + e.lastTickTime.Store(time.Now()) +} + +func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry { + h.RLock() + defer h.RUnlock() + r := make([]*healthCheckEntry, 0, len(h.activeInstances)) + for _, i := range h.activeInstances { + r = append(r, i) + } + + return r +} + +func (h *HealthCheckInterceptors) iteration(ctx context.Context) error { + level.Debug(h.logger).Log("msg", "Performing health check", "registeredInstances", len(h.registeredInstances())) + for _, instance := range h.registeredInstances() { + dialOpts, err := instance.clientConfig.Config.DialOption(nil, nil) + if err != nil { + return err + } + conn, err := grpc.NewClient(instance.address, dialOpts...) + c := h.healthClientFactory(conn) + if err != nil { + return err + } + + if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout { + h.Lock() + delete(h.activeInstances, instance.address) + h.Unlock() + continue + } + + if time.Since(instance.lastCheckTime.Load()) < instance.clientConfig.HealthCheckConfig.Interval { + continue + } + + instance.lastCheckTime.Store(time.Now()) + + go func(i *healthCheckEntry) { + if err := i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() { + level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err) + } + if err := conn.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err) + } + }(instance) + } + return nil +} + +func (h *HealthCheckInterceptors) getOrAddHealthCheckEntry(address string, clientConfig *ConfigWithHealthCheck) *healthCheckEntry { + h.RLock() + e := h.activeInstances[address] + h.RUnlock() + + if e != nil { + return e + } + + h.Lock() + defer h.Unlock() + + if _, ok := h.activeInstances[address]; !ok { + h.activeInstances[address] = &healthCheckEntry{ + address: address, + clientConfig: clientConfig, + } + } + + return h.activeInstances[address] +} + +func (h *HealthCheckInterceptors) StreamClientInterceptor(clientConfig *ConfigWithHealthCheck) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + e := h.getOrAddHealthCheckEntry(cc.Target(), clientConfig) + e.tick() + if !e.isHealthy() { + return nil, unhealthyErr + } + + return streamer(ctx, desc, cc, method, opts...) + } +} + +func (h *HealthCheckInterceptors) UnaryHealthCheckInterceptor(clientConfig *ConfigWithHealthCheck) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + e := h.getOrAddHealthCheckEntry(cc.Target(), clientConfig) + e.tick() + if !e.isHealthy() { + return unhealthyErr + } + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func healthCheck(client grpc_health_v1.HealthClient, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ctx = user.InjectOrgID(ctx, "0") + + resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + return err + } + if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return fmt.Errorf("failing healthcheck status: %s", resp.Status) + } + return nil +} diff --git a/pkg/util/grpcclient/health_check_test.go b/pkg/util/grpcclient/health_check_test.go new file mode 100644 index 00000000000..5483befa43f --- /dev/null +++ b/pkg/util/grpcclient/health_check_test.go @@ -0,0 +1,138 @@ +package grpcclient + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" + + utillog "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/services" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +type healthClientMock struct { + grpc_health_v1.HealthClient + err atomic.Error +} + +func (h *healthClientMock) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }, h.err.Load() +} + +func TestNewHealthCheckService(t *testing.T) { + i := NewHealthCheckInterceptors(utillog.Logger) + + // set the gc timeout to 5 seconds + i.instanceGcTimeout = time.Second * 5 + + hMock := &healthClientMock{} + i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient { + return hMock + } + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + cfg := ConfigWithHealthCheck{ + HealthCheckConfig: HealthCheckConfig{ + UnhealthyThreshold: 2, + Interval: 0, + Timeout: time.Second, + }, + } + + client, err := grpc.NewClient("localhost:999", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + ui := i.UnaryHealthCheckInterceptor(&cfg) + require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, client, + func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + return nil + })) + + instances := i.registeredInstances() + require.Len(t, instances, 1) + + // Generate healthcheck error and wait instance to become unhealthy + hMock.err.Store(errors.New("some error")) + + cortex_testutil.Poll(t, 5*time.Second, false, func() interface{} { + return instances[0].isHealthy() + }) + + // Mark instance back to a healthy state + hMock.err.Store(nil) + cortex_testutil.Poll(t, 5*time.Second, true, func() interface{} { + return instances[0].isHealthy() + }) + + cortex_testutil.Poll(t, i.instanceGcTimeout*2, 0, func() interface{} { + return len(i.registeredInstances()) + }) +} + +func TestNewHealthCheckInterceptors(t *testing.T) { + i := NewHealthCheckInterceptors(utillog.Logger) + hMock := &healthClientMock{} + hMock.err.Store(fmt.Errorf("some error")) + cfg := ConfigWithHealthCheck{ + HealthCheckConfig: HealthCheckConfig{ + UnhealthyThreshold: 2, + Interval: 0, + Timeout: time.Second, + }, + } + i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient { + return hMock + } + + ui := i.UnaryHealthCheckInterceptor(&cfg) + ccUnhealthy, err := grpc.NewClient("localhost:999", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + ccHealthy, err := grpc.NewClient("localhost:111", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + invokedMap := map[string]int{} + + invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + invokedMap[cc.Target()]++ + return nil + } + + //Should allow first call + require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker)) + + // first health check + require.NoError(t, i.iteration(context.Background())) + + //Should second call even with error + require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker)) + + require.Equal(t, invokedMap["localhost:999"], 2) + + // Second Healthcheck -> should mark as unhealthy + require.NoError(t, i.iteration(context.Background())) + + cortex_testutil.Poll(t, time.Second, true, func() interface{} { + return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr) + }) + + // Other instances should remain healthy + require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccHealthy, invoker)) + + // Should mark the instance back to healthy + hMock.err.Store(nil) + require.NoError(t, i.iteration(context.Background())) + cortex_testutil.Poll(t, time.Second, true, func() interface{} { + return ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker) == nil + }) +}