Skip to content

Enable 'snappy-block' compression on ingester clients by default #6148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [CHANGE] OTLP: Set `AddMetricSuffixes` to true to always enable metric name normalization. #6136
* [CHANGE] Querier: Deprecate and enable by default `querier.ingester-metadata-streaming` flag. #6147
* [CHANGE] QueryFrontend/QueryScheduler: Deprecate `-querier.max-outstanding-requests-per-tenant` and `-query-scheduler.max-outstanding-requests-per-tenant` flags. Use frontend.max-outstanding-requests-per-tenant instead. #6146
* [CHANGE] Ingesters: Enable 'snappy-block' compression on ingester clients by default. #6148
* [FEATURE] Ingester/Distributor: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 #6010 #6020
* [FEATURE] Querier: Enable querying native histogram chunks. #5944 #6031
* [FEATURE] Query Frontend: Support native histogram in query frontend response. #5996 #6043
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3020,7 +3020,7 @@ grpc_client_config:
# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
# CLI flag: -ingester.client.grpc-compression
[grpc_compression: <string> | default = ""]
[grpc_compression: <string> | default = "snappy-block"]

# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -ingester.client.grpc-client-rate-limit
Expand Down
93 changes: 74 additions & 19 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,53 @@ import (
"github.com/cortexproject/cortex/integration/e2ecortex"
)

type versionsImagesFlags struct {
flagsForOldImage func(map[string]string) map[string]string
flagsForNewImage func(map[string]string) map[string]string
}

var (
// If you change the image tag, remember to update it in the preloading done
// by GitHub Actions too (see .github/workflows/test-build-deploy.yml).
previousVersionImages = map[string]func(map[string]string) map[string]string{
"quay.io/cortexproject/cortex:v1.13.1": func(m map[string]string) map[string]string {
m["-ingester.stream-chunks-when-using-blocks"] = "true"
return m
previousVersionImages = map[string]*versionsImagesFlags{
"quay.io/cortexproject/cortex:v1.13.1": {
flagsForOldImage: func(m map[string]string) map[string]string {
m["-ingester.stream-chunks-when-using-blocks"] = "true"
return m
},
flagsForNewImage: func(m map[string]string) map[string]string {
m["-ingester.client.grpc-compression"] = "snappy"
return m
},
},
"quay.io/cortexproject/cortex:v1.13.2": {
flagsForOldImage: func(m map[string]string) map[string]string {
m["-ingester.stream-chunks-when-using-blocks"] = "true"
return m
},
flagsForNewImage: func(m map[string]string) map[string]string {
m["-ingester.client.grpc-compression"] = "snappy"
return m
},
},
"quay.io/cortexproject/cortex:v1.14.0": {
flagsForOldImage: func(m map[string]string) map[string]string {
return m
},
flagsForNewImage: func(m map[string]string) map[string]string {
m["-ingester.client.grpc-compression"] = "snappy"
return m
},
},
"quay.io/cortexproject/cortex:v1.13.2": func(m map[string]string) map[string]string {
m["-ingester.stream-chunks-when-using-blocks"] = "true"
return m
"quay.io/cortexproject/cortex:v1.14.1": {
flagsForOldImage: func(m map[string]string) map[string]string {
return m
},
flagsForNewImage: func(m map[string]string) map[string]string {
m["-ingester.client.grpc-compression"] = "snappy"
return m
},
},
"quay.io/cortexproject/cortex:v1.14.0": nil,
"quay.io/cortexproject/cortex:v1.14.1": nil,
"quay.io/cortexproject/cortex:v1.15.0": nil,
"quay.io/cortexproject/cortex:v1.15.1": nil,
"quay.io/cortexproject/cortex:v1.15.2": nil,
Expand All @@ -44,27 +77,41 @@ var (
)

func TestBackwardCompatibilityWithBlocksStorage(t *testing.T) {
for previousImage, flagsFn := range previousVersionImages {
for previousImage, imagesFlags := range previousVersionImages {
t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) {
flags := blocksStorageFlagsWithFlushOnShutdown()
if flagsFn != nil {
flags = flagsFn(flags)
var flagsForNewImage func(map[string]string) map[string]string
if imagesFlags != nil {
if imagesFlags.flagsForOldImage != nil {
flags = imagesFlags.flagsForOldImage(flags)
}

if imagesFlags.flagsForNewImage != nil {
flagsForNewImage = imagesFlags.flagsForNewImage
}
}

runBackwardCompatibilityTestWithBlocksStorage(t, previousImage, flags)
runBackwardCompatibilityTestWithBlocksStorage(t, previousImage, flags, flagsForNewImage)
})
}
}

func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
for previousImage, flagsFn := range previousVersionImages {
for previousImage, imagesFlags := range previousVersionImages {
t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) {
flags := blocksStorageFlagsWithFlushOnShutdown()
if flagsFn != nil {
flags = flagsFn(flags)
var flagsForNewImage func(map[string]string) map[string]string
if imagesFlags != nil {
if imagesFlags.flagsForOldImage != nil {
flags = imagesFlags.flagsForOldImage(flags)
}

if imagesFlags.flagsForNewImage != nil {
flagsForNewImage = imagesFlags.flagsForNewImage
}
}

runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage, flags)
runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage, flags, flagsForNewImage)
})
}
}
Expand All @@ -75,7 +122,7 @@ func blocksStorageFlagsWithFlushOnShutdown() map[string]string {
})
}

func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage string, flagsForOldImage map[string]string) {
func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage string, flagsForOldImage map[string]string, flagsForNewImageFn func(map[string]string) map[string]string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand All @@ -87,6 +134,10 @@ func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage s

flagsForNewImage := blocksStorageFlagsWithFlushOnShutdown()

if flagsForNewImageFn != nil {
flagsForNewImage = flagsForNewImageFn(flagsForNewImage)
}

// Start other Cortex components (ingester running on previous version).
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", "consul", consul.NetworkHTTPEndpoint(), flagsForNewImage, "")
Expand Down Expand Up @@ -127,7 +178,7 @@ func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage s
}

// Check for issues like https://github.com/cortexproject/cortex/issues/2356
func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string, flagsForPreviousImage map[string]string) {
func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string, flagsForPreviousImage map[string]string, flagsForNewImageFn func(map[string]string) map[string]string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand All @@ -141,6 +192,10 @@ func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previo
"-distributor.replication-factor": "3",
})

if flagsForNewImageFn != nil {
flagsForNewImage = flagsForNewImageFn(flagsForNewImage)
}

// Start other Cortex components (ingester running on previous version).
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Addr, "frontend.instance-addr", "", "IP address to advertise to querier (via scheduler) (resolved via interfaces by default).")
f.IntVar(&cfg.Port, "frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", f)
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", "", f)
}

// Frontend implements GrpcRoundTripper. It queues HTTP requests,
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock"

"github.com/go-kit/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -116,7 +117,7 @@ type Config struct {

// RegisterFlags registers configuration settings used by the ingester client config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", snappyblock.Name, f)
f.Int64Var(&cfg.MaxInflightPushRequests, "ingester.client.max-inflight-push-requests", 0, "Max inflight push requests that this ingester client can handle. This limit is per-ingester-client. Additional requests will be rejected. 0 = unlimited.")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", false, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.")
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", "", f)
}

func (cfg *Config) Validate(log log.Logger) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error {

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", f)
cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", "", f)
cfg.Ring.RegisterFlags(f)
cfg.Notifier.RegisterFlags(f)

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flagext.DeprecatedFlag(f, "query-scheduler.max-outstanding-requests-per-tenant", "Deprecated: Use frontend.max-outstanding-requests-per-tenant instead.", util_log.Logger)
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", "", f)
}

// NewScheduler creates a new Scheduler.
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type Config struct {

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
cfg.RegisterFlagsWithPrefix("", "", f)
}

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", defaultGrpcCompression, "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)")
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")
Expand Down
Loading