Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224

## 1.18.0 2024-09-03
Expand Down
47 changes: 28 additions & 19 deletions pkg/util/grpcclient/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -55,15 +56,17 @@ type HealthCheckInterceptors struct {
activeInstances map[string]*healthCheckEntry

instanceGcTimeout time.Duration
healthClientFactory func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient
healthClientFactory func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer)
}

func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors {
h := &HealthCheckInterceptors{
logger: logger,
instanceGcTimeout: 2 * time.Minute,
healthClientFactory: grpc_health_v1.NewHealthClient,
activeInstances: make(map[string]*healthCheckEntry),
logger: logger,
instanceGcTimeout: 2 * time.Minute,
healthClientFactory: func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
return grpc_health_v1.NewHealthClient(cc), cc
},
activeInstances: make(map[string]*healthCheckEntry),
}

h.Service = services.
Expand Down Expand Up @@ -107,16 +110,6 @@ func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry {
func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
level.Debug(h.logger).Log("msg", "Performing health check", "registeredInstances", len(h.registeredInstances()))
for _, instance := range h.registeredInstances() {
dialOpts, err := instance.clientConfig.Config.DialOption(nil, nil)
if err != nil {
return err
}
conn, err := grpc.NewClient(instance.address, dialOpts...)
c := h.healthClientFactory(conn)
if err != nil {
return err
}

if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout {
h.Lock()
delete(h.activeInstances, instance.address)
Expand All @@ -131,11 +124,27 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
instance.lastCheckTime.Store(time.Now())

go func(i *healthCheckEntry) {
if err := i.recordHealth(healthCheck(c, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
dialOpts, err := i.clientConfig.Config.DialOption(nil, nil)
if err != nil {
level.Error(h.logger).Log("msg", "error creating dialOpts to perform healthcheck", "address", i.address, "err", err)
return
}
conn, err := grpc.NewClient(i.address, dialOpts...)
if err != nil {
level.Error(h.logger).Log("msg", "error creating client to perform healthcheck", "address", i.address, "err", err)
return
}
if err := conn.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)

client, closer := h.healthClientFactory(conn)

defer func() {
if err := closer.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)
}
}()

if err := i.recordHealth(healthCheck(client, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
}
}(instance)
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/util/grpcclient/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"testing"
"time"

Expand All @@ -20,7 +21,13 @@ import (

type healthClientMock struct {
grpc_health_v1.HealthClient
err atomic.Error
err atomic.Error
open atomic.Bool
}

func (h *healthClientMock) Close() error {
h.open.Store(false)
return nil
}

func (h *healthClientMock) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
Expand All @@ -36,8 +43,9 @@ func TestNewHealthCheckService(t *testing.T) {
i.instanceGcTimeout = time.Second * 5

hMock := &healthClientMock{}
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
return hMock
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
hMock.open.Store(true)
return hMock, hMock
}

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -79,6 +87,8 @@ func TestNewHealthCheckService(t *testing.T) {
cortex_testutil.Poll(t, i.instanceGcTimeout*2, 0, func() interface{} {
return len(i.registeredInstances())
})

require.False(t, hMock.open.Load())
}

func TestNewHealthCheckInterceptors(t *testing.T) {
Expand All @@ -92,8 +102,9 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
Timeout: time.Second,
},
}
i.healthClientFactory = func(cc grpc.ClientConnInterface) grpc_health_v1.HealthClient {
return hMock
i.healthClientFactory = func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer) {
hMock.open.Store(true)
return hMock, hMock
}

ui := i.UnaryHealthCheckInterceptor(&cfg)
Expand All @@ -113,6 +124,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {

// first health check
require.NoError(t, i.iteration(context.Background()))
require.False(t, hMock.open.Load())

//Should second call even with error
require.NoError(t, ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker))
Expand All @@ -121,6 +133,7 @@ func TestNewHealthCheckInterceptors(t *testing.T) {

// Second Healthcheck -> should mark as unhealthy
require.NoError(t, i.iteration(context.Background()))
require.False(t, hMock.open.Load())

cortex_testutil.Poll(t, time.Second, true, func() interface{} {
return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)
Expand Down
Loading