From 6d16ccf392cdf948bd6ae33dd65fe3bd0cb0aaf6 Mon Sep 17 00:00:00 2001 From: Thor Date: Wed, 28 Aug 2019 13:14:42 -0500 Subject: [PATCH] s3.buckets : allow spreading chunks across buckets Signed-off-by: Thor --- pkg/chunk/aws/dynamodb_storage_client.go | 2 ++ pkg/chunk/aws/s3_storage_client.go | 38 +++++++++++++++++++----- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/chunk/aws/dynamodb_storage_client.go b/pkg/chunk/aws/dynamodb_storage_client.go index 42498ed0123..a81ef7323bc 100644 --- a/pkg/chunk/aws/dynamodb_storage_client.go +++ b/pkg/chunk/aws/dynamodb_storage_client.go @@ -126,6 +126,7 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { type StorageConfig struct { DynamoDBConfig S3 flagext.URLValue + BucketNames string S3ForcePathStyle bool } @@ -136,6 +137,7 @@ func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.S3, "s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+ "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:/// to use a mock in-memory implementation.") f.BoolVar(&cfg.S3ForcePathStyle, "s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.") + f.StringVar(&cfg.BucketNames, "s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag") } type dynamoDBStorageClient struct { diff --git a/pkg/chunk/aws/s3_storage_client.go b/pkg/chunk/aws/s3_storage_client.go index dfb74a42584..7b05128e4a1 100644 --- a/pkg/chunk/aws/s3_storage_client.go +++ b/pkg/chunk/aws/s3_storage_client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "hash/fnv" "io/ioutil" "strings" @@ -33,8 +34,8 @@ func init() { } type s3ObjectClient struct { - bucketName string - S3 s3iface.S3API + bucketNames []string + S3 s3iface.S3API } // NewS3ObjectClient makes a new S3-backed ObjectClient. @@ -51,10 +52,13 @@ func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.O s3Config = s3Config.WithMaxRetries(0) // We do our own retries, so we can monitor them s3Client := s3.New(session.New(s3Config)) - bucketName := strings.TrimPrefix(cfg.S3.URL.Path, "/") + bucketNames := []string{strings.TrimPrefix(cfg.S3.URL.Path, "/")} + if cfg.BucketNames != "" { + bucketNames = strings.Split(cfg.BucketNames, ",") // comma separated list of bucket names + } client := s3ObjectClient{ - S3: s3Client, - bucketName: bucketName, + S3: s3Client, + bucketNames: bucketNames, } return client, nil } @@ -68,11 +72,16 @@ func (a s3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([] func (a s3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { var resp *s3.GetObjectOutput + + // Map the key into a bucket + key := c.ExternalKey() + bucket := a.bucketFromKey(key) + err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { var err error resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{ - Bucket: aws.String(a.bucketName), - Key: aws.String(c.ExternalKey()), + Bucket: aws.String(bucket), + Key: aws.String(key), }) return err }) @@ -128,9 +137,22 @@ func (a s3ObjectClient) putS3Chunk(ctx context.Context, key string, buf []byte) return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{ Body: bytes.NewReader(buf), - Bucket: aws.String(a.bucketName), + Bucket: aws.String(a.bucketFromKey(key)), Key: aws.String(key), }) return err }) } + +// bucketFromKey maps a key to a bucket name +func (a s3ObjectClient) bucketFromKey(key string) string { + if len(a.bucketNames) == 0 { + return "" + } + + hasher := fnv.New32a() + hasher.Write([]byte(key)) + hash := hasher.Sum32() + + return a.bucketNames[hash%uint32(len(a.bucketNames))] +}