Skip to content

Commit 42b0681

Browse files
committed
Use slice to batch writeRequests
Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent 7a1f125 commit 42b0681

File tree

2 files changed

+45
-19
lines changed

2 files changed

+45
-19
lines changed

pkg/ring/kv/dynamodb/dynamodb.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,28 +171,31 @@ func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) erro
171171
}
172172

173173
func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
174-
var writeRequests []*dynamodb.WriteRequest
174+
writeRequestSize := len(put) + len(delete)
175+
if writeRequestSize == 0 {
176+
return nil
177+
}
178+
179+
writeRequests := make([]*dynamodb.WriteRequest, writeRequestSize)
180+
writeRequestsIndex := 0
175181
for key, data := range put {
176182
item := kv.generatePutItemRequest(key, data)
177-
writeRequests = append(writeRequests, &dynamodb.WriteRequest{
183+
writeRequests[writeRequestsIndex] = &dynamodb.WriteRequest{
178184
PutRequest: &dynamodb.PutRequest{
179185
Item: item,
180186
},
181-
})
187+
}
188+
writeRequestsIndex++
182189
}
183190

184191
for _, key := range delete {
185192
item := generateItemKey(key)
186-
187-
writeRequests = append(writeRequests, &dynamodb.WriteRequest{
193+
writeRequests[writeRequestsIndex] = &dynamodb.WriteRequest{
188194
DeleteRequest: &dynamodb.DeleteRequest{
189195
Key: item,
190196
},
191-
})
192-
}
193-
194-
if len(writeRequests) == 0 {
195-
return nil
197+
}
198+
writeRequestsIndex++
196199
}
197200

198201
input := &dynamodb.BatchWriteItemInput{
@@ -202,12 +205,15 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele
202205
}
203206

204207
resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input)
208+
if err != nil {
209+
return err
210+
}
205211

206212
if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 {
207213
return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems)
208214
}
209215

210-
return err
216+
return nil
211217
}
212218

213219
func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue {

pkg/ring/kv/dynamodb/dynamodb_test.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dynamodb
22

33
import (
44
"context"
5+
"fmt"
56
"strconv"
67
"testing"
78
"time"
@@ -60,13 +61,13 @@ func Test_Batch(t *testing.T) {
6061
delete := []dynamodbKey{ddbKeyDelete}
6162

6263
ddbClientMock := &mockDynamodb{
63-
batchWriteItem: func(input *dynamodb.BatchWriteItemInput) *dynamodb.BatchWriteItemOutput {
64+
batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) {
6465
require.NotNil(t, input.RequestItems[tableName])
65-
require.True(t, len(input.RequestItems[tableName]) == 2)
66+
require.EqualValues(t, 2, len(input.RequestItems[tableName]))
6667
require.True(t,
6768
(checkPutRequestForItem(input.RequestItems[tableName][0], ddbKeyUpdate) || checkPutRequestForItem(input.RequestItems[tableName][1], ddbKeyUpdate)) &&
6869
(checkDeleteRequestForItem(input.RequestItems[tableName][0], ddbKeyDelete) || checkDeleteRequestForItem(input.RequestItems[tableName][1], ddbKeyDelete)))
69-
return &dynamodb.BatchWriteItemOutput{}
70+
return &dynamodb.BatchWriteItemOutput{}, nil
7071
},
7172
}
7273

@@ -84,7 +85,7 @@ func Test_EmptyBatch(t *testing.T) {
8485
require.NoError(t, err)
8586
}
8687

87-
func Test_Batch_Error(t *testing.T) {
88+
func Test_Batch_UnprocessedItems(t *testing.T) {
8889
tableName := "TEST"
8990
ddbKeyDelete := dynamodbKey{
9091
primaryKey: "PKDelete",
@@ -93,14 +94,14 @@ func Test_Batch_Error(t *testing.T) {
9394
delete := []dynamodbKey{ddbKeyDelete}
9495

9596
ddbClientMock := &mockDynamodb{
96-
batchWriteItem: func(input *dynamodb.BatchWriteItemInput) *dynamodb.BatchWriteItemOutput {
97+
batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) {
9798
return &dynamodb.BatchWriteItemOutput{
9899
UnprocessedItems: map[string][]*dynamodb.WriteRequest{
99100
tableName: {&dynamodb.WriteRequest{
100101
PutRequest: &dynamodb.PutRequest{Item: generateItemKey(ddbKeyDelete)}},
101102
},
102103
},
103-
}
104+
}, nil
104105
},
105106
}
106107

@@ -109,6 +110,25 @@ func Test_Batch_Error(t *testing.T) {
109110
require.Errorf(t, err, "error processing batch dynamodb")
110111
}
111112

113+
func Test_Batch_Error(t *testing.T) {
114+
tableName := "TEST"
115+
ddbKeyDelete := dynamodbKey{
116+
primaryKey: "PKDelete",
117+
sortKey: "SKDelete",
118+
}
119+
delete := []dynamodbKey{ddbKeyDelete}
120+
121+
ddbClientMock := &mockDynamodb{
122+
batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) {
123+
return &dynamodb.BatchWriteItemOutput{}, fmt.Errorf("mocked error")
124+
},
125+
}
126+
127+
ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour)
128+
err := ddb.Batch(context.TODO(), nil, delete)
129+
require.Errorf(t, err, "mocked error")
130+
}
131+
112132
func checkPutRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey) bool {
113133
return request.PutRequest != nil &&
114134
request.PutRequest.Item[primaryKey] != nil &&
@@ -127,7 +147,7 @@ func checkDeleteRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey)
127147

128148
type mockDynamodb struct {
129149
putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput
130-
batchWriteItem func(input *dynamodb.BatchWriteItemInput) *dynamodb.BatchWriteItemOutput
150+
batchWriteItem func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error)
131151

132152
dynamodbiface.DynamoDBAPI
133153
}
@@ -137,7 +157,7 @@ func (m *mockDynamodb) PutItemWithContext(_ context.Context, input *dynamodb.Put
137157
}
138158

139159
func (m *mockDynamodb) BatchWriteItemWithContext(_ context.Context, input *dynamodb.BatchWriteItemInput, _ ...request.Option) (*dynamodb.BatchWriteItemOutput, error) {
140-
return m.batchWriteItem(input), nil
160+
return m.batchWriteItem(input)
141161
}
142162

143163
func newDynamodbClientMock(tableName string, mock *mockDynamodb, ttl time.Duration) *dynamodbKV {

0 commit comments

Comments
 (0)