Skip to content

Commit 4a7f5b0

Browse files
thorfourcsmarchbanks
authored andcommitted
s3.buckets : allow spreading chunks across buckets (#1625)
Signed-off-by: Thor <[email protected]>
1 parent e7511e2 commit 4a7f5b0

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

pkg/chunk/aws/dynamodb_storage_client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
126126
type StorageConfig struct {
127127
DynamoDBConfig
128128
S3 flagext.URLValue
129+
BucketNames string
129130
S3ForcePathStyle bool
130131
}
131132

@@ -136,6 +137,7 @@ func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) {
136137
f.Var(&cfg.S3, "s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+
137138
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
138139
f.BoolVar(&cfg.S3ForcePathStyle, "s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
140+
f.StringVar(&cfg.BucketNames, "s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
139141
}
140142

141143
type dynamoDBStorageClient struct {

pkg/chunk/aws/s3_storage_client.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"hash/fnv"
78
"io/ioutil"
89
"strings"
910

@@ -33,8 +34,8 @@ func init() {
3334
}
3435

3536
type s3ObjectClient struct {
36-
bucketName string
37-
S3 s3iface.S3API
37+
bucketNames []string
38+
S3 s3iface.S3API
3839
}
3940

4041
// NewS3ObjectClient makes a new S3-backed ObjectClient.
@@ -51,10 +52,13 @@ func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.O
5152

5253
s3Config = s3Config.WithMaxRetries(0) // We do our own retries, so we can monitor them
5354
s3Client := s3.New(session.New(s3Config))
54-
bucketName := strings.TrimPrefix(cfg.S3.URL.Path, "/")
55+
bucketNames := []string{strings.TrimPrefix(cfg.S3.URL.Path, "/")}
56+
if cfg.BucketNames != "" {
57+
bucketNames = strings.Split(cfg.BucketNames, ",") // comma separated list of bucket names
58+
}
5559
client := s3ObjectClient{
56-
S3: s3Client,
57-
bucketName: bucketName,
60+
S3: s3Client,
61+
bucketNames: bucketNames,
5862
}
5963
return client, nil
6064
}
@@ -68,11 +72,16 @@ func (a s3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]
6872

6973
func (a s3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
7074
var resp *s3.GetObjectOutput
75+
76+
// Map the key into a bucket
77+
key := c.ExternalKey()
78+
bucket := a.bucketFromKey(key)
79+
7180
err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
7281
var err error
7382
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
74-
Bucket: aws.String(a.bucketName),
75-
Key: aws.String(c.ExternalKey()),
83+
Bucket: aws.String(bucket),
84+
Key: aws.String(key),
7685
})
7786
return err
7887
})
@@ -128,9 +137,22 @@ func (a s3ObjectClient) putS3Chunk(ctx context.Context, key string, buf []byte)
128137
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
129138
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
130139
Body: bytes.NewReader(buf),
131-
Bucket: aws.String(a.bucketName),
140+
Bucket: aws.String(a.bucketFromKey(key)),
132141
Key: aws.String(key),
133142
})
134143
return err
135144
})
136145
}
146+
147+
// bucketFromKey maps a key to a bucket name
148+
func (a s3ObjectClient) bucketFromKey(key string) string {
149+
if len(a.bucketNames) == 0 {
150+
return ""
151+
}
152+
153+
hasher := fnv.New32a()
154+
hasher.Write([]byte(key))
155+
hash := hasher.Sum32()
156+
157+
return a.bucketNames[hash%uint32(len(a.bucketNames))]
158+
}

0 commit comments

Comments
 (0)