Skip to content

Commit d1d0fac

Browse files
committed
add dynamodb kv with timeout enforced
Signed-off-by: yeya24 <[email protected]>
1 parent 60b5b09 commit d1d0fac

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

pkg/ring/kv/dynamodb/client.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Config struct {
2626
TTL time.Duration `yaml:"ttl"`
2727
PullerSyncTime time.Duration `yaml:"puller_sync_time"`
2828
MaxCasRetries int `yaml:"max_cas_retries"`
29+
Timeout time.Duration `yaml:"timeout"`
2930
}
3031

3132
type Client struct {
@@ -69,8 +70,13 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh
6970
MaxRetries: cfg.MaxCasRetries,
7071
}
7172

73+
var kv dynamoDbClient
74+
kv = dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}
75+
if cfg.Timeout > 0 {
76+
kv = newDynamoDbKVWithTimeout(kv, cfg.Timeout)
77+
}
7278
c := &Client{
73-
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
79+
kv: kv,
7480
codec: cc,
7581
logger: ddbLog(logger),
7682
ddbMetrics: ddbMetrics,

pkg/ring/kv/dynamodb/dynamodb.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,45 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[st
259259
return item
260260
}
261261

262+
type dynamoDbKVWithTimeout struct {
263+
ddbClient dynamoDbClient
264+
timeout time.Duration
265+
}
266+
267+
func newDynamoDbKVWithTimeout(client dynamoDbClient, timeout time.Duration) *dynamoDbKVWithTimeout {
268+
return &dynamoDbKVWithTimeout{ddbClient: client, timeout: timeout}
269+
}
270+
271+
func (d *dynamoDbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) {
272+
ctx, cancel := context.WithTimeout(ctx, d.timeout)
273+
defer cancel()
274+
return d.ddbClient.List(ctx, key)
275+
}
276+
277+
func (d *dynamoDbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
278+
ctx, cancel := context.WithTimeout(ctx, d.timeout)
279+
defer cancel()
280+
return d.ddbClient.Query(ctx, key, isPrefix)
281+
}
282+
283+
func (d *dynamoDbKVWithTimeout) Delete(ctx context.Context, key dynamodbKey) error {
284+
ctx, cancel := context.WithTimeout(ctx, d.timeout)
285+
defer cancel()
286+
return d.ddbClient.Delete(ctx, key)
287+
}
288+
289+
func (d *dynamoDbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data []byte) error {
290+
ctx, cancel := context.WithTimeout(ctx, d.timeout)
291+
defer cancel()
292+
return d.ddbClient.Put(ctx, key, data)
293+
}
294+
295+
func (d *dynamoDbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
296+
ctx, cancel := context.WithTimeout(ctx, d.timeout)
297+
defer cancel()
298+
return d.ddbClient.Batch(ctx, put, delete)
299+
}
300+
262301
func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue {
263302
resp := map[string]*dynamodb.AttributeValue{
264303
primaryKey: {

0 commit comments

Comments
 (0)