diff --git a/CHANGELOG.md b/CHANGELOG.md index 874cf265ba0..b0de1d27ddc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [BUGFIX] Tracing: Fix missing object storage span instrumentation. #5074 * [BUGFIX] Ingester: Ingesters returning empty response for metadata APIs. #5081 * [BUGFIX] Ingester: Fix panic when querying metadata from blocks that are being deleted. #5119 +* [BUGFIX] Ring: Fix case when dynamodb kv reaches the limit of 25 actions per batch call. #5136 * [FEATURE] Alertmanager: Add support for time_intervals. #5102 ## 1.14.0 2022-12-02 diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index ff51ac26556..34797b1516d 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -70,6 +70,7 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh ddbMetrics: ddbMetrics, staleData: make(map[string]staleData), } + level.Info(c.logger).Log("dynamodb kv initialized") return c, nil } diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index e33715d91f6..2f7f9269583 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -3,6 +3,7 @@ package dynamodb import ( "context" "fmt" + "math" "strconv" "time" @@ -13,6 +14,12 @@ import ( "github.com/go-kit/log" ) +const ( + // DdbBatchSizeLimit Current limit of 25 actions per batch + // https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html + DdbBatchSizeLimit = 25 +) + type dynamodbKey struct { primaryKey string sortKey string @@ -176,38 +183,51 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele return nil } - writeRequests := make([]*dynamodb.WriteRequest, 0, writeRequestSize) + writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) + for i := 0; i < len(writeRequestsSlices); i++ { + writeRequestsSlices[i] = make([]*dynamodb.WriteRequest, 0, DdbBatchSizeLimit) + } + + currIdx := 0 for key, data := range put { item := kv.generatePutItemRequest(key, data) - writeRequests = append(writeRequests, &dynamodb.WriteRequest{ + writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.WriteRequest{ PutRequest: &dynamodb.PutRequest{ Item: item, }, }) + if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit { + currIdx++ + } } for _, key := range delete { item := generateItemKey(key) - writeRequests = append(writeRequests, &dynamodb.WriteRequest{ + writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.WriteRequest{ DeleteRequest: &dynamodb.DeleteRequest{ Key: item, }, }) + if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit { + currIdx++ + } } - input := &dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]*dynamodb.WriteRequest{ - *kv.tableName: writeRequests, - }, - } + for _, slice := range writeRequestsSlices { + input := &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]*dynamodb.WriteRequest{ + *kv.tableName: slice, + }, + } - resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input) - if err != nil { - return err - } + resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input) + if err != nil { + return err + } - if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 { - return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) + if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 { + return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) + } } return nil diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index 180b2e23102..b7e04c1164b 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -76,6 +76,59 @@ func Test_Batch(t *testing.T) { require.NoError(t, err) } +func Test_BatchSlices(t *testing.T) { + tableName := "TEST" + ddbKeyDelete := dynamodbKey{ + primaryKey: "PKDelete", + sortKey: "SKDelete", + } + numOfCalls := 0 + ddbClientMock := &mockDynamodb{ + batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { + numOfCalls++ + return &dynamodb.BatchWriteItemOutput{}, nil + }, + } + ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) + + for _, tc := range []struct { + name string + numOfExecutions int + expectedCalls int + }{ + // These tests follow each other (end state of KV in state is starting point in the next state). + { + name: "Test slice on lower bound", + numOfExecutions: 24, + expectedCalls: 1, + }, + { + name: "Test slice on exact size", + numOfExecutions: 25, + expectedCalls: 1, + }, + { + name: "Test slice on upper bound", + numOfExecutions: 26, + expectedCalls: 2, + }, + } { + t.Run(tc.name, func(t *testing.T) { + numOfCalls = 0 + delete := make([]dynamodbKey, 0, tc.numOfExecutions) + for i := 0; i < tc.numOfExecutions; i++ { + delete = append(delete, ddbKeyDelete) + } + + err := ddb.Batch(context.TODO(), nil, delete) + require.NoError(t, err) + require.EqualValues(t, tc.expectedCalls, numOfCalls) + + }) + } + +} + func Test_EmptyBatch(t *testing.T) { tableName := "TEST" ddbClientMock := &mockDynamodb{} @@ -156,7 +209,7 @@ func (m *mockDynamodb) PutItemWithContext(_ context.Context, input *dynamodb.Put return m.putItem(input), nil } -func (m *mockDynamodb) BatchWriteItemWithContext(_ context.Context, input *dynamodb.BatchWriteItemInput, _ ...request.Option) (*dynamodb.BatchWriteItemOutput, error) { +func (m *mockDynamodb) BatchWriteItemWithContext(ctx context.Context, input *dynamodb.BatchWriteItemInput, opts ...request.Option) (*dynamodb.BatchWriteItemOutput, error) { return m.batchWriteItem(input) }