@@ -167,20 +167,36 @@ def _put_record(self, data_record: DataRecord) -> None:
167167 now = datetime .datetime .now ()
168168 try :
169169 logger .debug (f"Putting record for idempotency key: { data_record .idempotency_key } " )
170+
171+ condition_expression = "attribute_not_exists(#id) OR #now < :now"
172+ expression_attribute_names = {
173+ "#id" : self .key_attr ,
174+ "#now" : self .expiry_attr ,
175+ "#status" : self .status_attr ,
176+ }
177+ expression_attribute_values = {":now" : int (now .timestamp ())}
178+
179+ # When we want to expire_in_progress invocations, we check if the in_progress timestamp exists
180+ # and we are past that timestamp. We also make sure the status is INPROGRESS because we don't
181+ # want to repeat COMPLETE invocations.
182+ #
183+ # We put this in an if block because customers might want to disable the feature,
184+ # reverting to the old behavior that relies just on the idempotency key.
185+ if self .expires_in_progress :
186+ condition_expression += (
187+ " OR ("
188+ "attribute_exists(#in_progress_expiry) AND "
189+ "#in_progress_expiry < :now AND #status = :inprogress"
190+ ")"
191+ )
192+ expression_attribute_names ["#in_progress_expiry" ] = self .in_progress_expiry_attr
193+ expression_attribute_values [":inprogress" ] = STATUS_CONSTANTS ["INPROGRESS" ]
194+
170195 self .table .put_item (
171196 Item = item ,
172- ConditionExpression = (
173- "attribute_not_exists(#id) OR "
174- "#now < :now OR "
175- "(attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now AND #status = :inprogress)"
176- ),
177- ExpressionAttributeNames = {
178- "#id" : self .key_attr ,
179- "#now" : self .expiry_attr ,
180- "#in_progress_expiry" : self .in_progress_expiry_attr ,
181- "#status" : self .status_attr ,
182- },
183- ExpressionAttributeValues = {":now" : int (now .timestamp ()), ":inprogress" : STATUS_CONSTANTS ["INPROGRESS" ]},
197+ ConditionExpression = condition_expression ,
198+ ExpressionAttributeNames = expression_attribute_names ,
199+ ExpressionAttributeValues = expression_attribute_values ,
184200 )
185201 except self .table .meta .client .exceptions .ConditionalCheckFailedException :
186202 logger .debug (f"Failed to put record for already existing idempotency key: { data_record .idempotency_key } " )
@@ -200,16 +216,16 @@ def _update_record(self, data_record: DataRecord):
200216 "#status" : self .status_attr ,
201217 }
202218
203- if self .expires_in_progress :
204- update_expression += ", #in_progress_expiry = :in_progress_expiry"
205- expression_attr_values [":in_progress_expiry" ] = data_record .in_progress_expiry_timestamp
206- expression_attr_names ["#in_progress_expiry" ] = self .in_progress_expiry_attr
207-
208219 if self .payload_validation_enabled :
209220 update_expression += ", #validation_key = :validation_key"
210221 expression_attr_values [":validation_key" ] = data_record .payload_hash
211222 expression_attr_names ["#validation_key" ] = self .validation_key_attr
212223
224+ if self .expires_in_progress :
225+ update_expression += ", #in_progress_expiry = :in_progress_expiry"
226+ expression_attr_values [":in_progress_expiry" ] = data_record .in_progress_expiry_timestamp
227+ expression_attr_names ["#in_progress_expiry" ] = self .in_progress_expiry_attr
228+
213229 kwargs = {
214230 "Key" : self ._get_key (data_record .idempotency_key ),
215231 "UpdateExpression" : update_expression ,
0 commit comments