Skip to content

Commit 92d21ec

Browse files
author
Dan Straw
committed
feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency to return a copy of the item on failure and avoid a subsequent get aws-powertools#3327
1 parent 377f9bb commit 92d21ec

File tree

11 files changed

+235
-168
lines changed

11 files changed

+235
-168
lines changed

aws_lambda_powertools/utilities/idempotency/base.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
IdempotencyValidationError,
1515
)
1616
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
17-
STATUS_CONSTANTS,
1817
BasePersistenceLayer,
18+
)
19+
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
20+
STATUS_CONSTANTS,
1921
DataRecord,
2022
)
2123
from aws_lambda_powertools.utilities.idempotency.serialization.base import (
@@ -118,11 +120,15 @@ def _process_idempotency(self):
118120
data=self.data,
119121
remaining_time_in_millis=self._get_remaining_time_in_millis(),
120122
)
121-
except IdempotencyKeyError:
123+
except (IdempotencyKeyError, IdempotencyValidationError):
122124
raise
123-
except IdempotencyItemAlreadyExistsError:
124-
# Now we know the item already exists, we can retrieve it
125-
record = self._get_idempotency_record()
125+
except IdempotencyItemAlreadyExistsError as exc:
126+
# We now know the item exists
127+
# Attempt to retrieve the record from the exception where ReturnValuesOnConditionCheckFailure is supported
128+
record = exc.old_data_record
129+
if record is None:
130+
# Perform a GET on the record
131+
record = self._get_idempotency_record()
126132
if record is not None:
127133
return self._handle_for_status(record)
128134
except Exception as exc:

aws_lambda_powertools/utilities/idempotency/exceptions.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
from typing import Optional, Union
77

8+
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord
9+
810

911
class BaseError(Exception):
1012
"""
@@ -30,6 +32,20 @@ class IdempotencyItemAlreadyExistsError(BaseError):
3032
Item attempting to be inserted into persistence store already exists and is not expired
3133
"""
3234

35+
def __init__(self, *args: Optional[Union[str, Exception]], old_data_record: Optional[DataRecord] = None):
36+
self.message = str(args[0]) if args else ""
37+
self.details = "".join(str(arg) for arg in args[1:]) if args[1:] else None
38+
self.old_data_record = old_data_record
39+
40+
def __str__(self):
41+
"""
42+
Return all arguments formatted or original message
43+
"""
44+
old_data_record = f" from [{(str(self.old_data_record))}]" if self.old_data_record else ""
45+
details = f" - ({self.details})" if self.details else ""
46+
47+
return f"{self.message}{details}{old_data_record}"
48+
3349

3450
class IdempotencyItemNotFoundError(BaseError):
3551
"""

aws_lambda_powertools/utilities/idempotency/persistence/base.py

Lines changed: 25 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import os
99
import warnings
1010
from abc import ABC, abstractmethod
11-
from types import MappingProxyType
1211
from typing import Any, Dict, Optional
1312

1413
import jmespath
@@ -18,95 +17,18 @@
1817
from aws_lambda_powertools.shared.json_encoder import Encoder
1918
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
2019
from aws_lambda_powertools.utilities.idempotency.exceptions import (
21-
IdempotencyInvalidStatusError,
2220
IdempotencyItemAlreadyExistsError,
2321
IdempotencyKeyError,
2422
IdempotencyValidationError,
2523
)
24+
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
25+
STATUS_CONSTANTS,
26+
DataRecord,
27+
)
2628
from aws_lambda_powertools.utilities.jmespath_utils import PowertoolsFunctions
2729

2830
logger = logging.getLogger(__name__)
2931

30-
STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})
31-
32-
33-
class DataRecord:
34-
"""
35-
Data Class for idempotency records.
36-
"""
37-
38-
def __init__(
39-
self,
40-
idempotency_key: str,
41-
status: str = "",
42-
expiry_timestamp: Optional[int] = None,
43-
in_progress_expiry_timestamp: Optional[int] = None,
44-
response_data: str = "",
45-
payload_hash: str = "",
46-
) -> None:
47-
"""
48-
49-
Parameters
50-
----------
51-
idempotency_key: str
52-
hashed representation of the idempotent data
53-
status: str, optional
54-
status of the idempotent record
55-
expiry_timestamp: int, optional
56-
time before the record should expire, in seconds
57-
in_progress_expiry_timestamp: int, optional
58-
time before the record should expire while in the INPROGRESS state, in seconds
59-
payload_hash: str, optional
60-
hashed representation of payload
61-
response_data: str, optional
62-
response data from previous executions using the record
63-
"""
64-
self.idempotency_key = idempotency_key
65-
self.payload_hash = payload_hash
66-
self.expiry_timestamp = expiry_timestamp
67-
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
68-
self._status = status
69-
self.response_data = response_data
70-
71-
@property
72-
def is_expired(self) -> bool:
73-
"""
74-
Check if data record is expired
75-
76-
Returns
77-
-------
78-
bool
79-
Whether the record is currently expired or not
80-
"""
81-
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)
82-
83-
@property
84-
def status(self) -> str:
85-
"""
86-
Get status of data record
87-
88-
Returns
89-
-------
90-
str
91-
"""
92-
if self.is_expired:
93-
return STATUS_CONSTANTS["EXPIRED"]
94-
elif self._status in STATUS_CONSTANTS.values():
95-
return self._status
96-
else:
97-
raise IdempotencyInvalidStatusError(self._status)
98-
99-
def response_json_as_dict(self) -> Optional[dict]:
100-
"""
101-
Get response data deserialized to python dict
102-
103-
Returns
104-
-------
105-
Optional[dict]
106-
previous response data deserialized
107-
"""
108-
return json.loads(self.response_data) if self.response_data else None
109-
11032

11133
class BasePersistenceLayer(ABC):
11234
"""
@@ -260,6 +182,27 @@ def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> No
260182
if data_record.payload_hash != data_hash:
261183
raise IdempotencyValidationError("Payload does not match stored record for this event key")
262184

185+
def _validate_hashed_payload(self, old_data_record: DataRecord, data_record: DataRecord) -> None:
186+
"""
187+
Validate that the hashed data provided matches the payload_hash stored data record
188+
189+
Parameters
190+
----------
191+
old_data_record: DataRecord
192+
DataRecord instance fetched from Dynamo
193+
data_record: DataRecord
194+
DataRecord instance which failed insert into Dynamo
195+
196+
Raises
197+
----------
198+
IdempotencyValidationError
199+
Payload doesn't match the stored record for the given idempotency key
200+
201+
"""
202+
if self.payload_validation_enabled:
203+
if old_data_record.payload_hash != data_record.payload_hash:
204+
raise IdempotencyValidationError("Hashed payload does not match stored record for this event key")
205+
263206
def _get_expiry_timestamp(self) -> int:
264207
"""
265208
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""
2+
Data Class for idempotency records.
3+
"""
4+
5+
import datetime
6+
import json
7+
import logging
8+
from types import MappingProxyType
9+
from typing import Optional
10+
11+
logger = logging.getLogger(__name__)
12+
13+
STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})
14+
15+
16+
class DataRecord:
17+
"""
18+
Data Class for idempotency records.
19+
"""
20+
21+
def __init__(
22+
self,
23+
idempotency_key: str,
24+
status: str = "",
25+
expiry_timestamp: Optional[int] = None,
26+
in_progress_expiry_timestamp: Optional[int] = None,
27+
response_data: str = "",
28+
payload_hash: str = "",
29+
) -> None:
30+
"""
31+
32+
Parameters
33+
----------
34+
idempotency_key: str
35+
hashed representation of the idempotent data
36+
status: str, optional
37+
status of the idempotent record
38+
expiry_timestamp: int, optional
39+
time before the record should expire, in seconds
40+
in_progress_expiry_timestamp: int, optional
41+
time before the record should expire while in the INPROGRESS state, in seconds
42+
payload_hash: str, optional
43+
hashed representation of payload
44+
response_data: str, optional
45+
response data from previous executions using the record
46+
"""
47+
self.idempotency_key = idempotency_key
48+
self.payload_hash = payload_hash
49+
self.expiry_timestamp = expiry_timestamp
50+
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
51+
self._status = status
52+
self.response_data = response_data
53+
54+
@property
55+
def is_expired(self) -> bool:
56+
"""
57+
Check if data record is expired
58+
59+
Returns
60+
-------
61+
bool
62+
Whether the record is currently expired or not
63+
"""
64+
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)
65+
66+
@property
67+
def status(self) -> str:
68+
"""
69+
Get status of data record
70+
71+
Returns
72+
-------
73+
str
74+
"""
75+
if self.is_expired:
76+
return STATUS_CONSTANTS["EXPIRED"]
77+
if self._status in STATUS_CONSTANTS.values():
78+
return self._status
79+
80+
from aws_lambda_powertools.utilities.idempotency.exceptions import IdempotencyInvalidStatusError
81+
82+
raise IdempotencyInvalidStatusError(self._status)
83+
84+
def response_json_as_dict(self) -> Optional[dict]:
85+
"""
86+
Get response data deserialized to python dict
87+
88+
Returns
89+
-------
90+
Optional[dict]
91+
previous response data deserialized
92+
"""
93+
return json.loads(self.response_data) if self.response_data else None

aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
from aws_lambda_powertools.utilities.idempotency.exceptions import (
1616
IdempotencyItemAlreadyExistsError,
1717
IdempotencyItemNotFoundError,
18+
IdempotencyValidationError,
1819
)
19-
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
20+
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
2021
STATUS_CONSTANTS,
2122
DataRecord,
2223
)
@@ -234,16 +235,51 @@ def _put_record(self, data_record: DataRecord) -> None:
234235
":now_in_millis": {"N": str(int(now.timestamp() * 1000))},
235236
":inprogress": {"S": STATUS_CONSTANTS["INPROGRESS"]},
236237
},
238+
**(
239+
{"ReturnValuesOnConditionCheckFailure": "ALL_OLD"} # type: ignore
240+
if self.boto3_supports_condition_check_failure(boto3.__version__)
241+
else {}
242+
),
237243
)
238244
except ClientError as exc:
239245
error_code = exc.response.get("Error", {}).get("Code")
240246
if error_code == "ConditionalCheckFailedException":
247+
old_data_record = self._item_to_data_record(exc.response["Item"]) if "Item" in exc.response else None
248+
if old_data_record is not None:
249+
logger.debug(
250+
f"Failed to put record for already existing idempotency key: "
251+
f"{data_record.idempotency_key} with status: {old_data_record.status}, "
252+
f"expiry_timestamp: {old_data_record.expiry_timestamp}, "
253+
f"and in_progress_expiry_timestamp: {old_data_record.in_progress_expiry_timestamp}",
254+
)
255+
self._save_to_cache(data_record=old_data_record)
256+
257+
try:
258+
self._validate_hashed_payload(old_data_record=old_data_record, data_record=data_record)
259+
except IdempotencyValidationError as ive:
260+
raise ive from exc
261+
262+
raise IdempotencyItemAlreadyExistsError(old_data_record=old_data_record) from exc
263+
241264
logger.debug(
242265
f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}",
243266
)
244-
raise IdempotencyItemAlreadyExistsError from exc
245-
else:
246-
raise
267+
raise IdempotencyItemAlreadyExistsError() from exc
268+
269+
raise
270+
271+
@staticmethod
272+
def boto3_supports_condition_check_failure(boto3_version: str):
273+
version = boto3_version.split(".")
274+
# Only supported in boto3 1.26.164 and above
275+
if len(version) >= 3 and int(version[0]) == 1 and int(version[1]) == 26 and int(version[2]) >= 164:
276+
return True
277+
if len(version) >= 2 and int(version[0]) == 1 and int(version[1]) > 26:
278+
return True
279+
if int(version[0]) > 1:
280+
return True
281+
282+
return False
247283

248284
def _update_record(self, data_record: DataRecord):
249285
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")

docs/utilities/idempotency.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ If you're not [changing the default configuration for the DynamoDB persistence l
100100

101101
???+ info "Info: DynamoDB"
102102
Each function invocation will generally make 2 requests to DynamoDB. If the
103-
result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation. For retried invocations, you will
104-
see 1WCU and 1RCU. Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to
103+
result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation.
104+
Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to
105105
estimate the cost.
106106

107107
### Idempotent decorator

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ aws-encryption-sdk = { version = "^3.1.1", optional = true }
5151
coverage = {extras = ["toml"], version = "^7.2"}
5252
pytest = "^7.4.3"
5353
black = "^23.3"
54-
boto3 = "^1.18"
54+
boto3 = "^1.26.164"
5555
isort = "^5.11.5"
5656
pytest-cov = "^4.1.0"
5757
pytest-mock = "^3.11.1"

0 commit comments

Comments
 (0)