Skip to content

Implementing Snappy Block Encoding #5215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [FEATURE] Querier/Store-Gateway: Added `-blocks-storage.bucket-store.ignore-blocks-within` allowing to filter out the recently created blocks from being synced by queriers and store-gateways. #5166
* [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers.
* [FEATURE] Alertmanager: Add support for time_intervals. #5102
* [FEATURE] Added `snappy-block` as an option for grpc compression #5215
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
Expand Down
10 changes: 5 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ query_scheduler:
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy', 'zstd' and '' (disable compression)
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
# CLI flag: -query-scheduler.grpc-client-config.grpc-compression
[grpc_compression: <string> | default = ""]

Expand Down Expand Up @@ -2381,7 +2381,7 @@ grpc_client_config:
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy', 'zstd' and '' (disable compression)
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
# CLI flag: -querier.frontend-client.grpc-compression
[grpc_compression: <string> | default = ""]

Expand Down Expand Up @@ -2642,7 +2642,7 @@ grpc_client_config:
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy', 'zstd' and '' (disable compression)
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
# CLI flag: -ingester.client.grpc-compression
[grpc_compression: <string> | default = ""]

Expand Down Expand Up @@ -3435,7 +3435,7 @@ grpc_client_config:
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy', 'zstd' and '' (disable compression)
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
# CLI flag: -frontend.grpc-client-config.grpc-compression
[grpc_compression: <string> | default = ""]

Expand Down Expand Up @@ -3651,7 +3651,7 @@ ruler_client:
[max_send_msg_size: <int> | default = 16777216]

# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy', 'zstd' and '' (disable compression)
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
# CLI flag: -ruler.client.grpc-compression
[grpc_compression: <string> | default = ""]

Expand Down
5 changes: 3 additions & 2 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cortexproject/cortex/pkg/util/backoff"
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappy"
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock"
"github.com/cortexproject/cortex/pkg/util/grpcencoding/zstd"
"github.com/cortexproject/cortex/pkg/util/tls"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'zstd' and '' (disable compression)")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)")
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")
Expand All @@ -54,7 +55,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {

func (cfg *Config) Validate(log log.Logger) error {
switch cfg.GRPCCompression {
case gzip.Name, snappy.Name, zstd.Name, "":
case gzip.Name, snappy.Name, zstd.Name, snappyblock.Name, "":
// valid
default:
return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression)
Expand Down
178 changes: 178 additions & 0 deletions pkg/util/grpcencoding/encoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package grpcencoding

import (
"bytes"
"io"
"io/ioutil"
"strings"
"testing"

"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappy"
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock"
"github.com/cortexproject/cortex/pkg/util/grpcencoding/zstd"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/encoding"
)

func TestCompressors(t *testing.T) {
testCases := []struct {
name string
}{
{
name: snappy.Name,
},
{
name: snappyblock.Name,
},
{
name: zstd.Name,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testCompress(tc.name, t)
})
}
}

func testCompress(name string, t *testing.T) {
c := encoding.GetCompressor(name)
assert.Equal(t, name, c.Name())

tests := []struct {
test string
input string
}{
{"empty", ""},
{"short", "hello world"},
{"long", strings.Repeat("123456789", 2024)},
}
for _, test := range tests {
t.Run(test.test, func(t *testing.T) {
buf := &bytes.Buffer{}
// Compress
w, err := c.Compress(buf)
require.NoError(t, err)
n, err := w.Write([]byte(test.input))
require.NoError(t, err)
assert.Len(t, test.input, n)
err = w.Close()
require.NoError(t, err)
compressedBytes := buf.Bytes()
buf = bytes.NewBuffer(compressedBytes)

// Decompress
r, err := c.Decompress(buf)
require.NoError(t, err)
out, err := io.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, test.input, string(out))

if sizer, ok := c.(interface {
DecompressedSize(compressedBytes []byte) int
}); ok {
buf = bytes.NewBuffer(compressedBytes)
r, err := c.Decompress(buf)
require.NoError(t, err)
out, err := io.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, len(out), sizer.DecompressedSize(compressedBytes))
}
})
}
}

func BenchmarkCompress(b *testing.B) {
data := []byte(strings.Repeat("123456789", 1024))

testCases := []struct {
name string
}{
{
name: snappy.Name,
},
{
name: snappyblock.Name,
},
{
name: zstd.Name,
},
}

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
c := encoding.GetCompressor("snappy")
b.ResetTimer()
for i := 0; i < b.N; i++ {
w, _ := c.Compress(io.Discard)
_, _ = w.Write(data)
_ = w.Close()
}
b.ReportAllocs()
})
}
}

func BenchmarkDecompress(b *testing.B) {
data := []byte(strings.Repeat("123456789", 1024))

testCases := []struct {
name string
}{
{
name: snappy.Name,
},
{
name: snappyblock.Name,
},
{
name: zstd.Name,
},
}

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
c := encoding.GetCompressor(tc.name)
var buf bytes.Buffer
w, _ := c.Compress(&buf)
_, _ = w.Write(data)
w.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := decompress(c, buf.Bytes(), 10000)
require.NoError(b, err)
}
b.ReportAllocs()
})
}
}

// This function was copied from: https://github.com/grpc/grpc-go/blob/70c52915099a3b30848d0cb22e2f8951dd5aed7f/rpc_util.go#L765
func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize int) ([]byte, int, error) {
dcReader, err := compressor.Decompress(bytes.NewReader(d))
if err != nil {
return nil, 0, err
}
if sizer, ok := compressor.(interface {
DecompressedSize(compressedBytes []byte) int
}); ok {
if size := sizer.DecompressedSize(d); size >= 0 {
if size > maxReceiveMessageSize {
return nil, size, nil
}
// size is used as an estimate to size the buffer, but we
// will read more data if available.
// +MinRead so ReadFrom will not reallocate if size is correct.
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
return buf.Bytes(), int(bytesRead), err
}
}
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
return d, len(d), err
}
14 changes: 0 additions & 14 deletions pkg/util/grpcencoding/snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,6 @@ func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
return reader{dr, &c.readersPool}, nil
}

// If a Compressor implements DecompressedSize(compressedBytes []byte) int,
// gRPC will call it to determine the size of the buffer allocated for the
// result of decompression.
// Return -1 to indicate unknown size.
//
// This is an EXPERIMENTAL feature of grpc-go.
func (c *compressor) DecompressedSize(compressedBytes []byte) int {
decompressedSize, err := snappy.DecodedLen(compressedBytes)
if err != nil {
return -1
}
return decompressedSize
}

type writeCloser struct {
writer *snappy.Writer
pool *sync.Pool
Expand Down
70 changes: 0 additions & 70 deletions pkg/util/grpcencoding/snappy/snappy_test.go

This file was deleted.

Loading