Skip to content

Commit 84477e8

Browse files
authored
Merge pull request #363 from hjgraca/idempotency-inprogressexpiration
2 parents 1378d75 + a234e37 commit 84477e8

File tree

13 files changed

+740
-67
lines changed

13 files changed

+740
-67
lines changed

docs/utilities/idempotency.md

Lines changed: 389 additions & 23 deletions
Large diffs are not rendered by default.

libraries/src/AWS.Lambda.Powertools.Idempotency/AWS.Lambda.Powertools.Idempotency.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
</ItemGroup>
3131

3232
<ItemGroup>
33-
<PackageReference Include="Amazon.Lambda.Core" Version="2.1.0" />
33+
<PackageReference Include="Amazon.Lambda.Core" Version="1.0.0" />
3434
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.7.104.1" />
3535
<PackageReference Include="JmesPath.Net" Version="1.0.308" />
3636
</ItemGroup>

libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using Amazon.Lambda.Core;
1718
using AWS.Lambda.Powertools.Common;
1819
using AWS.Lambda.Powertools.Idempotency.Persistence;
1920

@@ -85,6 +86,21 @@ public static void Configure(Action<IdempotencyBuilder> configurationAction)
8586
Instance.SetPersistenceStore(builder.Store);
8687
}
8788

89+
/// <summary>
90+
/// Holds ILambdaContext
91+
/// </summary>
92+
public ILambdaContext LambdaContext { get; private set; }
93+
94+
/// <summary>
95+
/// Can be used in a method which is not the handler to capture the Lambda context,
96+
/// to calculate the remaining time before the invocation times out.
97+
/// </summary>
98+
/// <param name="context"></param>
99+
public static void RegisterLambdaContext(ILambdaContext context)
100+
{
101+
Instance.LambdaContext = context;
102+
}
103+
88104
/// <summary>
89105
/// Create a builder that can be used to configure and create <see cref="Idempotency"/>
90106
/// </summary>

libraries/src/AWS.Lambda.Powertools.Idempotency/IdempotentAttribute.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected internal sealed override T WrapSync<T>(Func<object[], T> target, objec
8989

9090
Task<T> ResultDelegate() => Task.FromResult(target(args));
9191

92-
var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload);
92+
var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload,GetContext(eventArgs));
9393
if (idempotencyHandler == null)
9494
{
9595
throw new Exception("Failed to create an instance of IdempotencyAspectHandler");
@@ -127,7 +127,7 @@ protected internal sealed override async Task<T> WrapAsync<T>(
127127

128128
Task<T> ResultDelegate() => target(args);
129129

130-
var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload);
130+
var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload, GetContext(eventArgs));
131131
if (idempotencyHandler == null)
132132
{
133133
throw new Exception("Failed to create an instance of IdempotencyAspectHandler");
@@ -172,4 +172,14 @@ private static bool IsPlacedOnRequestHandler(MethodBase method)
172172
//Check if method has two arguments and the second one is of type ILambdaContext
173173
return method.GetParameters().Length == 2 && method.GetParameters()[1].ParameterType == typeof(ILambdaContext);
174174
}
175+
176+
private static ILambdaContext GetContext(AspectEventArgs args)
177+
{
178+
if (IsPlacedOnRequestHandler(args.Method))
179+
{
180+
return (ILambdaContext)args.Args[1];
181+
}
182+
183+
return Idempotency.Instance.LambdaContext;
184+
}
175185
}

libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Text.Json;
1818
using System.Threading.Tasks;
19+
using Amazon.Lambda.Core;
1920
using AWS.Lambda.Powertools.Idempotency.Exceptions;
2021
using AWS.Lambda.Powertools.Idempotency.Persistence;
2122

@@ -35,6 +36,9 @@ internal class IdempotencyAspectHandler<T>
3536
/// Request payload
3637
/// </summary>
3738
private readonly JsonDocument _data;
39+
40+
private readonly ILambdaContext _lambdaContext;
41+
3842
/// <summary>
3943
/// Persistence store
4044
/// </summary>
@@ -46,13 +50,16 @@ internal class IdempotencyAspectHandler<T>
4650
/// <param name="target"></param>
4751
/// <param name="functionName"></param>
4852
/// <param name="payload"></param>
53+
/// <param name="lambdaContext"></param>
4954
public IdempotencyAspectHandler(
5055
Func<Task<T>> target,
5156
string functionName,
52-
JsonDocument payload)
57+
JsonDocument payload,
58+
ILambdaContext lambdaContext)
5359
{
5460
_target = target;
5561
_data = payload;
62+
_lambdaContext = lambdaContext;
5663
_persistenceStore = Idempotency.Instance.PersistenceStore;
5764
_persistenceStore.Configure(Idempotency.Instance.IdempotencyOptions, functionName);
5865
}
@@ -94,7 +101,7 @@ private async Task<T> ProcessIdempotency()
94101
{
95102
// We call saveInProgress first as an optimization for the most common case where no idempotent record
96103
// already exists. If it succeeds, there's no need to call getRecord.
97-
await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow);
104+
await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow, GetRemainingTimeInMillis());
98105
}
99106
catch (IdempotencyItemAlreadyExistsException)
100107
{
@@ -167,6 +174,10 @@ private Task<T> HandleForStatus(DataRecord record)
167174
case DataRecord.DataRecordStatus.EXPIRED:
168175
throw new IdempotencyInconsistentStateException("saveInProgress and getRecord return inconsistent results");
169176
case DataRecord.DataRecordStatus.INPROGRESS:
177+
if (record.InProgressExpiryTimestamp.HasValue && record.InProgressExpiryTimestamp.Value < DateTimeOffset.Now.ToUnixTimeMilliseconds())
178+
{
179+
throw new IdempotencyInconsistentStateException("Item should have been expired in-progress because it already time-outed.");
180+
}
170181
throw new IdempotencyAlreadyInProgressException("Execution already in progress with idempotency key: " +
171182
record.IdempotencyKey);
172183
case DataRecord.DataRecordStatus.COMPLETED:
@@ -234,4 +245,17 @@ private async Task<T> GetFunctionResponse()
234245

235246
return response;
236247
}
248+
249+
/// <summary>
250+
/// Tries to determine the remaining time available for the current lambda invocation.
251+
/// Currently, it only works if the idempotent handler decorator is used or using {Idempotency#registerLambdaContext(Context)}
252+
/// </summary>
253+
/// <returns>the remaining time in milliseconds or empty if the context was not provided/found</returns>
254+
private double? GetRemainingTimeInMillis() {
255+
if (_lambdaContext != null) {
256+
// why TotalMilliseconds? Because it must be the complete duration of the timespan expressed in milliseconds
257+
return _lambdaContext.RemainingTime.TotalMilliseconds;
258+
}
259+
return null;
260+
}
237261
}

libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,22 +113,31 @@ public virtual async Task SaveSuccess(JsonDocument data, object result, DateTime
113113
/// </summary>
114114
/// <param name="data">Payload</param>
115115
/// <param name="now">The current date time</param>
116+
/// <param name="remainingTimeInMs">The remaining time from lambda execution</param>
116117
/// <exception cref="IdempotencyItemAlreadyExistsException"></exception>
117-
public virtual async Task SaveInProgress(JsonDocument data, DateTimeOffset now)
118+
public virtual async Task SaveInProgress(JsonDocument data, DateTimeOffset now, double? remainingTimeInMs)
118119
{
119120
var idempotencyKey = GetHashedIdempotencyKey(data);
120121

121122
if (RetrieveFromCache(idempotencyKey, now) != null)
122123
{
123124
throw new IdempotencyItemAlreadyExistsException();
124125
}
126+
127+
long? inProgressExpirationMsTimestamp = null;
128+
if (remainingTimeInMs.HasValue)
129+
{
130+
inProgressExpirationMsTimestamp = now.AddMilliseconds(remainingTimeInMs.Value).ToUnixTimeMilliseconds();
131+
}
125132

126133
var record = new DataRecord(
127134
idempotencyKey,
128135
DataRecord.DataRecordStatus.INPROGRESS,
129136
GetExpiryEpochSecond(now),
130137
null,
131-
GetHashedPayload(data)
138+
GetHashedPayload(data),
139+
inProgressExpirationMsTimestamp
140+
132141
);
133142
await PutRecord(record, now);
134143
}

libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DataRecord.cs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,27 +35,48 @@ public class DataRecord
3535
/// <param name="expiryTimestamp">Unix timestamp of when record expires</param>
3636
/// <param name="responseData">JSON serialized invocation results</param>
3737
/// <param name="payloadHash">A hash representation of the entire event</param>
38+
/// <param name="inProgressExpiryTimestamp">Unix timestamp of in-progress field for the remaining lambda execution time</param>
3839
public DataRecord(string idempotencyKey,
3940
DataRecordStatus status,
4041
long expiryTimestamp,
4142
string responseData,
42-
string payloadHash)
43+
string payloadHash,
44+
long? inProgressExpiryTimestamp = null)
4345
{
4446
IdempotencyKey = idempotencyKey;
4547
_status = status.ToString();
4648
ExpiryTimestamp = expiryTimestamp;
4749
ResponseData = responseData;
4850
PayloadHash = payloadHash;
51+
InProgressExpiryTimestamp = inProgressExpiryTimestamp;
4952
}
5053

5154
/// <summary>
5255
/// A hash representation of either the entire event or a specific configured subset of the event
5356
/// </summary>
5457
public string IdempotencyKey { get; }
5558
/// <summary>
56-
/// Unix timestamp of when record expires
59+
/// Unix timestamp of when record expires.
60+
/// This field is controlling how long the result of the idempotent
61+
/// event is cached. It is stored in _seconds since epoch_.
62+
/// DynamoDB's TTL mechanism is used to remove the record once the
63+
/// expiry has been reached, and subsequent execution of the request
64+
/// will be permitted. The user must configure this on their table.
5765
/// </summary>
5866
public long ExpiryTimestamp { get; }
67+
68+
/// <summary>
69+
/// The in-progress field is set to the remaining lambda execution time
70+
/// when the record is created.
71+
/// This field is stored in _milliseconds since epoch_.
72+
///
73+
/// This ensures that:
74+
/// 1/ other concurrently executing requests are blocked from starting
75+
/// 2/ if a lambda times out, subsequent requests will be allowed again, despite
76+
/// the fact that the idempotency record is already in the table
77+
/// </summary>
78+
public long? InProgressExpiryTimestamp { get; }
79+
5980
/// <summary>
6081
/// JSON serialized invocation results
6182
/// </summary>

libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ public class DynamoDBPersistenceStore : BasePersistenceStore
5050
/// Expiry attribute
5151
/// </summary>
5252
private readonly string _expiryAttr;
53+
54+
/// <summary>
55+
/// In progress expiry attribute
56+
/// </summary>
57+
private readonly string _inProgressExpiryAttr;
58+
5359
/// <summary>
5460
/// Status attribute
5561
/// </summary>
@@ -75,6 +81,7 @@ public class DynamoDBPersistenceStore : BasePersistenceStore
7581
/// <param name="staticPkValue"></param>
7682
/// <param name="sortKeyAttr"></param>
7783
/// <param name="expiryAttr"></param>
84+
/// <param name="inProgressExpiryAttr"></param>
7885
/// <param name="statusAttr"></param>
7986
/// <param name="dataAttr"></param>
8087
/// <param name="validationAttr"></param>
@@ -84,6 +91,7 @@ internal DynamoDBPersistenceStore(string tableName,
8491
string staticPkValue,
8592
string sortKeyAttr,
8693
string expiryAttr,
94+
string inProgressExpiryAttr,
8795
string statusAttr,
8896
string dataAttr,
8997
string validationAttr,
@@ -94,6 +102,7 @@ internal DynamoDBPersistenceStore(string tableName,
94102
_staticPkValue = staticPkValue;
95103
_sortKeyAttr = sortKeyAttr;
96104
_expiryAttr = expiryAttr;
105+
_inProgressExpiryAttr = inProgressExpiryAttr;
97106
_statusAttr = statusAttr;
98107
_dataAttr = dataAttr;
99108
_validationAttr = validationAttr;
@@ -139,8 +148,7 @@ public override async Task<DataRecord> GetRecord(string idempotencyKey)
139148

140149
return ItemToRecord(response.Item);
141150
}
142-
143-
151+
144152
/// <inheritdoc />
145153
public override async Task PutRecord(DataRecord record, DateTimeOffset now)
146154
{
@@ -155,6 +163,13 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now)
155163
{ _statusAttr, new AttributeValue(record.Status.ToString()) }
156164
};
157165

166+
if (record.InProgressExpiryTimestamp.HasValue) {
167+
item.Add(_inProgressExpiryAttr, new AttributeValue
168+
{
169+
N = record.InProgressExpiryTimestamp.Value.ToString()
170+
});
171+
}
172+
158173
if (PayloadValidationEnabled)
159174
{
160175
item.Add(_validationAttr, new AttributeValue(record.PayloadHash));
@@ -165,18 +180,22 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now)
165180
var expressionAttributeNames = new Dictionary<string, string>
166181
{
167182
{"#id", _keyAttr},
168-
{"#expiry", _expiryAttr}
183+
{"#expiry", _expiryAttr},
184+
{"#in_progress_expiry", _inProgressExpiryAttr},
185+
{"#status", _statusAttr}
169186
};
170187

171188
var request = new PutItemRequest
172189
{
173190
TableName = _tableName,
174191
Item = item,
175-
ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now",
192+
ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)",
176193
ExpressionAttributeNames = expressionAttributeNames,
177194
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
178195
{
179-
{":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}}
196+
{":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}},
197+
{":now_milliseconds", new AttributeValue {N = now.ToUnixTimeMilliseconds().ToString()}},
198+
{":inprogress", new AttributeValue {S = Enum.GetName(DataRecord.DataRecordStatus.INPROGRESS) }}
180199
}
181200
};
182201
await _dynamoDbClient!.PutItemAsync(request);
@@ -247,12 +266,15 @@ private DataRecord ItemToRecord(Dictionary<string, AttributeValue> item)
247266
// data and validation payload may be null
248267
var hasDataAttribute = item.TryGetValue(_dataAttr, out var data);
249268
var hasValidationAttribute = item.TryGetValue(_validationAttr, out var validation);
269+
var hasInProgressExpiryAttr = item.TryGetValue(_inProgressExpiryAttr, out var inProgExp);
270+
250271

251272
return new DataRecord(item[_sortKeyAttr ?? _keyAttr].S,
252273
Enum.Parse<DataRecord.DataRecordStatus>(item[_statusAttr].S),
253274
long.Parse(item[_expiryAttr].N),
254275
hasDataAttribute ? data?.S : null,
255-
hasValidationAttribute ? validation?.S : null);
276+
hasValidationAttribute ? validation?.S : null,
277+
hasInProgressExpiryAttr ? long.Parse(inProgExp.N) : null);
256278
}
257279

258280
/// <summary>
@@ -311,6 +333,12 @@ public class DynamoDBPersistenceStoreBuilder
311333
/// Expiry attribute
312334
/// </summary>
313335
private string _expiryAttr = "expiration";
336+
337+
/// <summary>
338+
/// In progress expiry attribute
339+
/// </summary>
340+
private string _inProgressExpiryAttr = "in_progress_expiration";
341+
314342
/// <summary>
315343
/// Status attribute
316344
/// </summary>
@@ -346,7 +374,8 @@ public DynamoDBPersistenceStore Build()
346374
_keyAttr,
347375
_staticPkValue,
348376
_sortKeyAttr,
349-
_expiryAttr,
377+
_expiryAttr,
378+
_inProgressExpiryAttr,
350379
_statusAttr,
351380
_dataAttr,
352381
_validationAttr,
@@ -408,6 +437,16 @@ public DynamoDBPersistenceStoreBuilder WithExpiryAttr(string expiryAttr)
408437
_expiryAttr = expiryAttr;
409438
return this;
410439
}
440+
441+
/// <summary>
442+
/// DynamoDB attribute name for in progress expiry timestamp (optional), by default "in_progress_expiration"
443+
/// </summary>
444+
/// <param name="inProgressExpiryAttr">name of the attribute in the table</param>
445+
/// <returns>the builder instance (to chain operations)</returns>
446+
public DynamoDBPersistenceStoreBuilder WithInProgressExpiryAttr(string inProgressExpiryAttr) {
447+
_inProgressExpiryAttr = inProgressExpiryAttr;
448+
return this;
449+
}
411450

412451
/// <summary>
413452
/// DynamoDB attribute name for status (optional), by default "status"

libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/IPersistenceStore.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,13 @@ public interface IPersistenceStore
3535
Task<DataRecord> GetRecord(string idempotencyKey);
3636

3737
/// <summary>
38-
/// Add a DataRecord to persistence store if it does not already exist with that key
38+
/// Add a DataRecord to persistence store if it does not already exist with that key.
39+
/// Stores the given idempotency record in the DDB store. If there
40+
/// is an existing record that has expired - either due to the
41+
/// cache expiry or due to the in_progress_expiry - the record
42+
/// will be overwritten and the idempotent operation can continue.
43+
/// Note: This method writes only expiry and status information - not
44+
/// the results of the operation itself.
3945
/// </summary>
4046
/// <param name="record">record DataRecord instance</param>
4147
/// <param name="now"></param>

0 commit comments

Comments
 (0)