Skip to content

Commit 3ad0f5c

Browse files
authored
improv(idempotency): consolidate internal implementation (#1642)
* improv(idempotency): consolidate internal implementation * chore(idempotency): removed unused code
1 parent 5248809 commit 3ad0f5c

File tree

3 files changed

+304
-391
lines changed

3 files changed

+304
-391
lines changed

packages/idempotency/src/IdempotencyHandler.ts

Lines changed: 211 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
import type { JSONValue } from '@aws-lambda-powertools/commons';
1+
import type {
2+
JSONValue,
3+
MiddyLikeRequest,
4+
} from '@aws-lambda-powertools/commons';
25
import type { AnyFunction, IdempotencyHandlerOptions } from './types';
36
import { IdempotencyRecordStatus } from './types';
47
import {
@@ -34,7 +37,7 @@ export class IdempotencyHandler<Func extends AnyFunction> {
3437
*
3538
* This is the argument that is used for the idempotency.
3639
*/
37-
readonly #functionPayloadToBeHashed: JSONValue;
40+
#functionPayloadToBeHashed: JSONValue;
3841
/**
3942
* Reference to the function to be made idempotent.
4043
*/
@@ -68,9 +71,17 @@ export class IdempotencyHandler<Func extends AnyFunction> {
6871
});
6972
}
7073

74+
/**
75+
* Takes an idempotency key and returns the idempotency record from the persistence layer.
76+
*
77+
* If the idempotency record is not COMPLETE, then it will throw an error based on the status of the record.
78+
*
79+
* @param idempotencyRecord The idempotency record stored in the persistence layer
80+
* @returns The result of the function if the idempotency record is in a terminal state
81+
*/
7182
public static determineResultFromIdempotencyRecord(
7283
idempotencyRecord: IdempotencyRecord
73-
): Promise<unknown> | unknown {
84+
): JSONValue {
7485
if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.EXPIRED) {
7586
throw new IdempotencyInconsistentStateError(
7687
'Item has expired during processing and may not longer be valid.'
@@ -96,50 +107,55 @@ export class IdempotencyHandler<Func extends AnyFunction> {
96107
return idempotencyRecord.getResponse();
97108
}
98109

110+
/**
111+
* Execute the handler and return the result.
112+
*
113+
* If the handler fails, the idempotency record will be deleted.
114+
* If it succeeds, the idempotency record will be updated with the result.
115+
*
116+
* @returns The result of the function execution
117+
*/
99118
public async getFunctionResult(): Promise<ReturnType<Func>> {
100119
let result;
101120
try {
102121
result = await this.#functionToMakeIdempotent(...this.#functionArguments);
103-
} catch (e) {
104-
try {
105-
await this.#persistenceStore.deleteRecord(
106-
this.#functionPayloadToBeHashed
107-
);
108-
} catch (e) {
109-
throw new IdempotencyPersistenceLayerError(
110-
'Failed to delete record from idempotency store',
111-
e as Error
112-
);
113-
}
114-
throw e;
115-
}
116-
try {
117-
await this.#persistenceStore.saveSuccess(
118-
this.#functionPayloadToBeHashed,
119-
result
120-
);
121-
} catch (e) {
122-
throw new IdempotencyPersistenceLayerError(
123-
'Failed to update success record to idempotency store',
124-
e as Error
125-
);
122+
} catch (error) {
123+
await this.#deleteInProgressRecord();
124+
throw error;
126125
}
126+
await this.#saveSuccessfullResult(result);
127127

128128
return result;
129129
}
130130

131131
/**
132-
* Main entry point for the handler
132+
* Entry point to handle the idempotency logic.
133+
*
134+
* Before the handler is executed, we need to check if there is already an
135+
* execution in progress for the given idempotency key. If there is, we
136+
* need to determine its status and return the appropriate response or
137+
* throw an error.
138+
*
139+
* If there is no execution in progress, we need to save a record to the
140+
* idempotency store to indicate that an execution is in progress.
133141
*
134142
* In some rare cases, when the persistent state changes in small time
135143
* window, we might get an `IdempotencyInconsistentStateError`. In such
136144
* cases we can safely retry the handling a few times.
137145
*/
138146
public async handle(): Promise<ReturnType<Func>> {
147+
// early return if we should skip idempotency completely
148+
if (this.shouldSkipIdempotency()) {
149+
return await this.#functionToMakeIdempotent(...this.#functionArguments);
150+
}
151+
139152
let e;
140153
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
141154
try {
142-
return await this.processIdempotency();
155+
const result = await this.#saveInProgressOrReturnExistingResult();
156+
if (result) return result as ReturnType<Func>;
157+
158+
return await this.getFunctionResult();
143159
} catch (error) {
144160
if (
145161
error instanceof IdempotencyInconsistentStateError &&
@@ -156,60 +172,183 @@ export class IdempotencyHandler<Func extends AnyFunction> {
156172
throw e;
157173
}
158174

159-
public async processIdempotency(): Promise<ReturnType<Func>> {
160-
// early return if we should skip idempotency completely
175+
/**
176+
* Handle the idempotency operations needed after the handler has returned.
177+
*
178+
* When the handler returns successfully, we need to update the record in the
179+
* idempotency store to indicate that the execution has completed and
180+
* store its result.
181+
*
182+
* To avoid duplication of code, we expose this method so that it can be
183+
* called from the `after` phase of the Middy middleware.
184+
*
185+
* @param response The response returned by the handler.
186+
*/
187+
public async handleMiddyAfter(response: unknown): Promise<void> {
188+
await this.#saveSuccessfullResult(response as ReturnType<Func>);
189+
}
190+
191+
/**
192+
* Handle the idempotency operations needed after the handler has returned.
193+
*
194+
* Before the handler is executed, we need to check if there is already an
195+
* execution in progress for the given idempotency key. If there is, we
196+
* need to determine its status and return the appropriate response or
197+
* throw an error.
198+
*
199+
* If there is no execution in progress, we need to save a record to the
200+
* idempotency store to indicate that an execution is in progress.
201+
*
202+
* In some rare cases, when the persistent state changes in small time
203+
* window, we might get an `IdempotencyInconsistentStateError`. In such
204+
* cases we can safely retry the handling a few times.
205+
*
206+
* @param request The request object passed to the handler.
207+
* @param callback Callback function to cleanup pending middlewares when returning early.
208+
*/
209+
public async handleMiddyBefore(
210+
request: MiddyLikeRequest,
211+
callback: (request: MiddyLikeRequest) => Promise<void>
212+
): Promise<ReturnType<Func> | void> {
213+
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
214+
try {
215+
const result = await this.#saveInProgressOrReturnExistingResult();
216+
if (result) {
217+
await callback(request);
218+
219+
return result as ReturnType<Func>;
220+
}
221+
break;
222+
} catch (error) {
223+
if (
224+
error instanceof IdempotencyInconsistentStateError &&
225+
retryNo < MAX_RETRIES
226+
) {
227+
// Retry
228+
continue;
229+
}
230+
// Retries exhausted or other error
231+
throw error;
232+
}
233+
}
234+
}
235+
236+
/**
237+
* Handle the idempotency operations needed when an error is thrown in the handler.
238+
*
239+
* When an error is thrown in the handler, we need to delete the record from the
240+
* idempotency store.
241+
*
242+
* To avoid duplication of code, we expose this method so that it can be
243+
* called from the `onError` phase of the Middy middleware.
244+
*/
245+
public async handleMiddyOnError(): Promise<void> {
246+
await this.#deleteInProgressRecord();
247+
}
248+
249+
/**
250+
* Setter for the payload to be hashed to generate the idempotency key.
251+
*
252+
* This is useful if you want to use a different payload than the one
253+
* used to instantiate the `IdempotencyHandler`, for example when using
254+
* it within a Middy middleware.
255+
*
256+
* @param functionPayloadToBeHashed The payload to be hashed to generate the idempotency key
257+
*/
258+
public setFunctionPayloadToBeHashed(
259+
functionPayloadToBeHashed: JSONValue
260+
): void {
261+
this.#functionPayloadToBeHashed = functionPayloadToBeHashed;
262+
}
263+
264+
/**
265+
* Avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
266+
*/
267+
public shouldSkipIdempotency(): boolean {
268+
if (!this.#idempotencyConfig.isEnabled()) return true;
269+
161270
if (
162-
IdempotencyHandler.shouldSkipIdempotency(
163-
this.#idempotencyConfig.eventKeyJmesPath,
164-
this.#idempotencyConfig.throwOnNoIdempotencyKey,
165-
this.#functionPayloadToBeHashed
166-
)
271+
this.#idempotencyConfig.eventKeyJmesPath !== '' &&
272+
!this.#idempotencyConfig.throwOnNoIdempotencyKey
167273
) {
168-
return await this.#functionToMakeIdempotent(...this.#functionArguments);
274+
const selection = search(
275+
this.#functionPayloadToBeHashed,
276+
this.#idempotencyConfig.eventKeyJmesPath
277+
);
278+
279+
return selection === undefined || selection === null;
280+
} else {
281+
return false;
169282
}
283+
}
170284

285+
/**
286+
* Delete an in progress record from the idempotency store.
287+
*
288+
* This is called when the handler throws an error.
289+
*/
290+
#deleteInProgressRecord = async (): Promise<void> => {
171291
try {
172-
await this.#persistenceStore.saveInProgress(
173-
this.#functionPayloadToBeHashed,
174-
this.#idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
292+
await this.#persistenceStore.deleteRecord(
293+
this.#functionPayloadToBeHashed
175294
);
176295
} catch (e) {
177-
if (e instanceof IdempotencyItemAlreadyExistsError) {
178-
const idempotencyRecord: IdempotencyRecord =
179-
await this.#persistenceStore.getRecord(
180-
this.#functionPayloadToBeHashed
181-
);
296+
throw new IdempotencyPersistenceLayerError(
297+
'Failed to delete record from idempotency store',
298+
e as Error
299+
);
300+
}
301+
};
182302

183-
return IdempotencyHandler.determineResultFromIdempotencyRecord(
184-
idempotencyRecord
185-
) as ReturnType<Func>;
186-
} else {
187-
throw new IdempotencyPersistenceLayerError(
188-
'Failed to save in progress record to idempotency store',
189-
e as Error
303+
/**
304+
* Save an in progress record to the idempotency store or return an existing result.
305+
*
306+
* If the record already exists, return the result from the record.
307+
*/
308+
#saveInProgressOrReturnExistingResult =
309+
async (): Promise<JSONValue | void> => {
310+
try {
311+
await this.#persistenceStore.saveInProgress(
312+
this.#functionPayloadToBeHashed,
313+
this.#idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
190314
);
191-
}
192-
}
315+
} catch (e) {
316+
if (e instanceof IdempotencyItemAlreadyExistsError) {
317+
const idempotencyRecord: IdempotencyRecord =
318+
await this.#persistenceStore.getRecord(
319+
this.#functionPayloadToBeHashed
320+
);
193321

194-
return this.getFunctionResult();
195-
}
322+
return IdempotencyHandler.determineResultFromIdempotencyRecord(
323+
idempotencyRecord
324+
);
325+
} else {
326+
throw new IdempotencyPersistenceLayerError(
327+
'Failed to save in progress record to idempotency store',
328+
e as Error
329+
);
330+
}
331+
}
332+
};
196333

197334
/**
198-
* avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
199-
* static so {@link makeHandlerIdempotent} middleware can use it
200-
* TOOD: refactor so middy uses IdempotencyHandler internally wihtout reimplementing the logic
201-
* @param eventKeyJmesPath
202-
* @param throwOnNoIdempotencyKey
203-
* @param fullFunctionPayload
204-
* @private
205-
*/
206-
public static shouldSkipIdempotency(
207-
eventKeyJmesPath: string,
208-
throwOnNoIdempotencyKey: boolean,
209-
fullFunctionPayload: JSONValue
210-
): boolean {
211-
return (eventKeyJmesPath &&
212-
!throwOnNoIdempotencyKey &&
213-
!search(fullFunctionPayload, eventKeyJmesPath)) as boolean;
214-
}
335+
* Save a successful result to the idempotency store.
336+
*
337+
* This is called when the handler returns successfully.
338+
*
339+
* @param result The result returned by the handler.
340+
*/
341+
#saveSuccessfullResult = async (result: ReturnType<Func>): Promise<void> => {
342+
try {
343+
await this.#persistenceStore.saveSuccess(
344+
this.#functionPayloadToBeHashed,
345+
result
346+
);
347+
} catch (e) {
348+
throw new IdempotencyPersistenceLayerError(
349+
'Failed to update success record to idempotency store',
350+
e as Error
351+
);
352+
}
353+
};
215354
}

0 commit comments

Comments
 (0)