diff --git a/CHANGELOG.md b/CHANGELOG.md index 22805ff3ee8..a47446ee212 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * [FEATURE] Querier/Store-Gateway: Added `-blocks-storage.bucket-store.ignore-blocks-within` allowing to filter out the recently created blocks from being synced by queriers and store-gateways. #5166 * [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers. * [FEATURE] Alertmanager: Add support for time_intervals. #5102 +* [FEATURE] Added `snappy-block` as an option for grpc compression #5215 * [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976 * [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005 * [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d598f651ea3..d24b22b9266 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -198,7 +198,7 @@ query_scheduler: [max_send_msg_size: | default = 16777216] # Use compression when sending messages. Supported values are: 'gzip', - # 'snappy', 'zstd' and '' (disable compression) + # 'snappy', 'snappy-block' ,'zstd' and '' (disable compression) # CLI flag: -query-scheduler.grpc-client-config.grpc-compression [grpc_compression: | default = ""] @@ -2381,7 +2381,7 @@ grpc_client_config: [max_send_msg_size: | default = 16777216] # Use compression when sending messages. Supported values are: 'gzip', - # 'snappy', 'zstd' and '' (disable compression) + # 'snappy', 'snappy-block' ,'zstd' and '' (disable compression) # CLI flag: -querier.frontend-client.grpc-compression [grpc_compression: | default = ""] @@ -2642,7 +2642,7 @@ grpc_client_config: [max_send_msg_size: | default = 16777216] # Use compression when sending messages. Supported values are: 'gzip', - # 'snappy', 'zstd' and '' (disable compression) + # 'snappy', 'snappy-block' ,'zstd' and '' (disable compression) # CLI flag: -ingester.client.grpc-compression [grpc_compression: | default = ""] @@ -3435,7 +3435,7 @@ grpc_client_config: [max_send_msg_size: | default = 16777216] # Use compression when sending messages. Supported values are: 'gzip', - # 'snappy', 'zstd' and '' (disable compression) + # 'snappy', 'snappy-block' ,'zstd' and '' (disable compression) # CLI flag: -frontend.grpc-client-config.grpc-compression [grpc_compression: | default = ""] @@ -3651,7 +3651,7 @@ ruler_client: [max_send_msg_size: | default = 16777216] # Use compression when sending messages. Supported values are: 'gzip', - # 'snappy', 'zstd' and '' (disable compression) + # 'snappy', 'snappy-block' ,'zstd' and '' (disable compression) # CLI flag: -ruler.client.grpc-compression [grpc_compression: | default = ""] diff --git a/pkg/util/grpcclient/grpcclient.go b/pkg/util/grpcclient/grpcclient.go index c1cd4bf5d07..cfec645c938 100644 --- a/pkg/util/grpcclient/grpcclient.go +++ b/pkg/util/grpcclient/grpcclient.go @@ -13,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/backoff" "github.com/cortexproject/cortex/pkg/util/grpcencoding/snappy" + "github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock" "github.com/cortexproject/cortex/pkg/util/grpcencoding/zstd" "github.com/cortexproject/cortex/pkg/util/tls" ) @@ -41,7 +42,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).") - f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'zstd' and '' (disable compression)") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)") f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.") @@ -54,7 +55,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { func (cfg *Config) Validate(log log.Logger) error { switch cfg.GRPCCompression { - case gzip.Name, snappy.Name, zstd.Name, "": + case gzip.Name, snappy.Name, zstd.Name, snappyblock.Name, "": // valid default: return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) diff --git a/pkg/util/grpcencoding/encoding_test.go b/pkg/util/grpcencoding/encoding_test.go new file mode 100644 index 00000000000..9888aee270e --- /dev/null +++ b/pkg/util/grpcencoding/encoding_test.go @@ -0,0 +1,178 @@ +package grpcencoding + +import ( + "bytes" + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/cortexproject/cortex/pkg/util/grpcencoding/snappy" + "github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock" + "github.com/cortexproject/cortex/pkg/util/grpcencoding/zstd" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/encoding" +) + +func TestCompressors(t *testing.T) { + testCases := []struct { + name string + }{ + { + name: snappy.Name, + }, + { + name: snappyblock.Name, + }, + { + name: zstd.Name, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testCompress(tc.name, t) + }) + } +} + +func testCompress(name string, t *testing.T) { + c := encoding.GetCompressor(name) + assert.Equal(t, name, c.Name()) + + tests := []struct { + test string + input string + }{ + {"empty", ""}, + {"short", "hello world"}, + {"long", strings.Repeat("123456789", 2024)}, + } + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + buf := &bytes.Buffer{} + // Compress + w, err := c.Compress(buf) + require.NoError(t, err) + n, err := w.Write([]byte(test.input)) + require.NoError(t, err) + assert.Len(t, test.input, n) + err = w.Close() + require.NoError(t, err) + compressedBytes := buf.Bytes() + buf = bytes.NewBuffer(compressedBytes) + + // Decompress + r, err := c.Decompress(buf) + require.NoError(t, err) + out, err := io.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, test.input, string(out)) + + if sizer, ok := c.(interface { + DecompressedSize(compressedBytes []byte) int + }); ok { + buf = bytes.NewBuffer(compressedBytes) + r, err := c.Decompress(buf) + require.NoError(t, err) + out, err := io.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, len(out), sizer.DecompressedSize(compressedBytes)) + } + }) + } +} + +func BenchmarkCompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + + testCases := []struct { + name string + }{ + { + name: snappy.Name, + }, + { + name: snappyblock.Name, + }, + { + name: zstd.Name, + }, + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + c := encoding.GetCompressor("snappy") + b.ResetTimer() + for i := 0; i < b.N; i++ { + w, _ := c.Compress(io.Discard) + _, _ = w.Write(data) + _ = w.Close() + } + b.ReportAllocs() + }) + } +} + +func BenchmarkDecompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + + testCases := []struct { + name string + }{ + { + name: snappy.Name, + }, + { + name: snappyblock.Name, + }, + { + name: zstd.Name, + }, + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + c := encoding.GetCompressor(tc.name) + var buf bytes.Buffer + w, _ := c.Compress(&buf) + _, _ = w.Write(data) + w.Close() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := decompress(c, buf.Bytes(), 10000) + require.NoError(b, err) + } + b.ReportAllocs() + }) + } +} + +// This function was copied from: https://github.com/grpc/grpc-go/blob/70c52915099a3b30848d0cb22e2f8951dd5aed7f/rpc_util.go#L765 +func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize int) ([]byte, int, error) { + dcReader, err := compressor.Decompress(bytes.NewReader(d)) + if err != nil { + return nil, 0, err + } + if sizer, ok := compressor.(interface { + DecompressedSize(compressedBytes []byte) int + }); ok { + if size := sizer.DecompressedSize(d); size >= 0 { + if size > maxReceiveMessageSize { + return nil, size, nil + } + // size is used as an estimate to size the buffer, but we + // will read more data if available. + // +MinRead so ReadFrom will not reallocate if size is correct. + buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead)) + bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1)) + return buf.Bytes(), int(bytesRead), err + } + } + // Read from LimitReader with limit max+1. So if the underlying + // reader is over limit, the result will be bigger than max. + d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1)) + return d, len(d), err +} diff --git a/pkg/util/grpcencoding/snappy/snappy.go b/pkg/util/grpcencoding/snappy/snappy.go index 6ffc4b678e9..fe01b4ca351 100644 --- a/pkg/util/grpcencoding/snappy/snappy.go +++ b/pkg/util/grpcencoding/snappy/snappy.go @@ -51,20 +51,6 @@ func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { return reader{dr, &c.readersPool}, nil } -// If a Compressor implements DecompressedSize(compressedBytes []byte) int, -// gRPC will call it to determine the size of the buffer allocated for the -// result of decompression. -// Return -1 to indicate unknown size. -// -// This is an EXPERIMENTAL feature of grpc-go. -func (c *compressor) DecompressedSize(compressedBytes []byte) int { - decompressedSize, err := snappy.DecodedLen(compressedBytes) - if err != nil { - return -1 - } - return decompressedSize -} - type writeCloser struct { writer *snappy.Writer pool *sync.Pool diff --git a/pkg/util/grpcencoding/snappy/snappy_test.go b/pkg/util/grpcencoding/snappy/snappy_test.go deleted file mode 100644 index d288c95c215..00000000000 --- a/pkg/util/grpcencoding/snappy/snappy_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package snappy - -import ( - "bytes" - "io" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSnappy(t *testing.T) { - c := newCompressor() - assert.Equal(t, "snappy", c.Name()) - - tests := []struct { - test string - input string - }{ - {"empty", ""}, - {"short", "hello world"}, - {"long", strings.Repeat("123456789", 1024)}, - } - for _, test := range tests { - t.Run(test.test, func(t *testing.T) { - var buf bytes.Buffer - // Compress - w, err := c.Compress(&buf) - require.NoError(t, err) - n, err := w.Write([]byte(test.input)) - require.NoError(t, err) - assert.Len(t, test.input, n) - err = w.Close() - require.NoError(t, err) - // Decompress - r, err := c.Decompress(&buf) - require.NoError(t, err) - out, err := io.ReadAll(r) - require.NoError(t, err) - assert.Equal(t, test.input, string(out)) - }) - } -} - -func BenchmarkSnappyCompress(b *testing.B) { - data := []byte(strings.Repeat("123456789", 1024)) - c := newCompressor() - b.ResetTimer() - for i := 0; i < b.N; i++ { - w, _ := c.Compress(io.Discard) - _, _ = w.Write(data) - _ = w.Close() - } -} - -func BenchmarkSnappyDecompress(b *testing.B) { - data := []byte(strings.Repeat("123456789", 1024)) - c := newCompressor() - var buf bytes.Buffer - w, _ := c.Compress(&buf) - _, _ = w.Write(data) - reader := bytes.NewReader(buf.Bytes()) - b.ResetTimer() - for i := 0; i < b.N; i++ { - r, _ := c.Decompress(reader) - _, _ = io.ReadAll(r) - _, _ = reader.Seek(0, io.SeekStart) - } -} diff --git a/pkg/util/grpcencoding/snappyblock/snappyblock.go b/pkg/util/grpcencoding/snappyblock/snappyblock.go new file mode 100644 index 00000000000..a40e8429ddc --- /dev/null +++ b/pkg/util/grpcencoding/snappyblock/snappyblock.go @@ -0,0 +1,143 @@ +package snappyblock + +import ( + "bytes" + "io" + "sync" + + "github.com/golang/snappy" + "google.golang.org/grpc/encoding" +) + +// Name is the name registered for the snappy compressor. +const Name = "snappy-block" + +func init() { + encoding.RegisterCompressor(newCompressor()) +} + +type compressor struct { + writersPool sync.Pool + readersPool sync.Pool +} + +func newCompressor() *compressor { + c := &compressor{} + c.readersPool = sync.Pool{ + New: func() interface{} { + return &reader{ + pool: &c.readersPool, + cbuff: bytes.NewBuffer(make([]byte, 0, 512)), + } + }, + } + c.writersPool = sync.Pool{ + New: func() interface{} { + return &writeCloser{ + pool: &c.writersPool, + buff: bytes.NewBuffer(make([]byte, 0, 512)), + } + }, + } + return c +} + +func (c *compressor) Name() string { + return Name +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + wr := c.writersPool.Get().(*writeCloser) + wr.Reset(w) + return wr, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + dr := c.readersPool.Get().(*reader) + err := dr.Reset(r) + if err != nil { + return nil, err + } + + return dr, nil +} + +// DecompressedSize If a Compressor implements DecompressedSize(compressedBytes []byte) int, +// gRPC will call it to determine the size of the buffer allocated for the +// result of decompression. +// Return -1 to indicate unknown size. +// +// This is an EXPERIMENTAL feature of grpc-go. +func (c *compressor) DecompressedSize(compressedBytes []byte) int { + decompressedSize, err := snappy.DecodedLen(compressedBytes) + if err != nil { + return -1 + } + return decompressedSize +} + +type writeCloser struct { + i io.Writer + pool *sync.Pool + buff *bytes.Buffer + + dst []byte +} + +func (w *writeCloser) Reset(i io.Writer) { + w.i = i +} + +func (w *writeCloser) Write(p []byte) (n int, err error) { + return w.buff.Write(p) +} + +func (w *writeCloser) Close() error { + defer func() { + w.buff.Reset() + w.dst = w.dst[0:cap(w.dst)] + w.pool.Put(w) + }() + + if w.i != nil { + w.dst = snappy.Encode(w.dst, w.buff.Bytes()) + _, err := w.i.Write(w.dst) + return err + } + + return nil +} + +type reader struct { + pool *sync.Pool + cbuff *bytes.Buffer + dbuff *bytes.Buffer + dst []byte +} + +func (r *reader) Reset(ir io.Reader) error { + _, err := r.cbuff.ReadFrom(ir) + + if err != nil { + return err + } + + r.dst, err = snappy.Decode(r.dst, r.cbuff.Bytes()) + + if err != nil { + return err + } + + r.dbuff = bytes.NewBuffer(r.dst) + return nil +} + +func (r *reader) Read(p []byte) (n int, err error) { + n, err = r.dbuff.Read(p) + if err == io.EOF { + r.cbuff.Reset() + r.dst = r.dst[0:cap(r.dst)] + r.pool.Put(r) + } + return n, err +} diff --git a/pkg/util/grpcencoding/zstd/zstd.go b/pkg/util/grpcencoding/zstd/zstd.go index 7a6803c7e7e..c55c9f1a999 100644 --- a/pkg/util/grpcencoding/zstd/zstd.go +++ b/pkg/util/grpcencoding/zstd/zstd.go @@ -16,13 +16,17 @@ type compressor struct { } func init() { + encoding.RegisterCompressor(newCompressor()) +} + +func newCompressor() *compressor { enc, _ := zstd.NewWriter(nil) dec, _ := zstd.NewReader(nil) c := &compressor{ encoder: enc, decoder: dec, } - encoding.RegisterCompressor(c) + return c } // SetLevel updates the registered compressor to use a particular compression