Skip to content

Commit bf00d5b

Browse files
authored
improv(batch): improve errors (#1648)
* improv(batch): improve errors * tests: updated tests with assertions
1 parent 3ad0f5c commit bf00d5b

13 files changed

+92
-98
lines changed

packages/batch/src/AsyncBatchProcessor.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
2+
import { BatchProcessingError } from './errors';
23
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';
34

45
/**
@@ -24,7 +25,9 @@ class AsyncBatchProcessor extends BasePartialBatchProcessor {
2425
* @returns response of success or failure
2526
*/
2627
public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse {
27-
throw new Error('Not implemented. Use asyncProcess() instead.');
28+
throw new BatchProcessingError(
29+
'Not implemented. Use asyncProcess() instead.'
30+
);
2831
}
2932
}
3033

packages/batch/src/BasePartialBatchProcessor.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type {
55
} from 'aws-lambda';
66
import { BasePartialProcessor } from './BasePartialProcessor';
77
import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants';
8-
import { BatchProcessingError } from './errors';
8+
import { FullBatchFailureError } from './errors';
99
import type {
1010
EventSourceDataClassTypes,
1111
PartialItemFailureResponse,
@@ -46,12 +46,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
4646
}
4747

4848
if (this.entireBatchFailed()) {
49-
throw new BatchProcessingError(
50-
'All records failed processing. ' +
51-
this.exceptions.length +
52-
' individual errors logged separately below.',
53-
this.exceptions
54-
);
49+
throw new FullBatchFailureError(this.errors);
5550
}
5651

5752
const messages: PartialItemFailures[] = this.getMessagesToReport();
@@ -110,7 +105,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
110105
* @returns true if all records resulted in exception results
111106
*/
112107
public entireBatchFailed(): boolean {
113-
return this.exceptions.length == this.records.length;
108+
return this.errors.length == this.records.length;
114109
}
115110

116111
/**
@@ -135,7 +130,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
135130
public prepare(): void {
136131
this.successMessages.length = 0;
137132
this.failureMessages.length = 0;
138-
this.exceptions.length = 0;
133+
this.errors.length = 0;
139134
this.batchResponse = DEFAULT_RESPONSE;
140135
}
141136

packages/batch/src/BasePartialProcessor.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import type {
1111
* Abstract class for batch processors.
1212
*/
1313
abstract class BasePartialProcessor {
14-
public exceptions: Error[];
14+
public errors: Error[];
1515

1616
public failureMessages: EventSourceDataClassTypes[];
1717

@@ -29,7 +29,7 @@ abstract class BasePartialProcessor {
2929
public constructor() {
3030
this.successMessages = [];
3131
this.failureMessages = [];
32-
this.exceptions = [];
32+
this.errors = [];
3333
this.records = [];
3434
this.handler = new Function();
3535
}
@@ -84,7 +84,7 @@ abstract class BasePartialProcessor {
8484
exception: Error
8585
): FailureResponse {
8686
const entry: FailureResponse = ['fail', exception.message, record];
87-
this.exceptions.push(exception);
87+
this.errors.push(exception);
8888
this.failureMessages.push(record);
8989

9090
return entry;

packages/batch/src/BatchProcessor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
2+
import { BatchProcessingError } from './errors';
23
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';
34

45
/**
@@ -8,7 +9,7 @@ class BatchProcessor extends BasePartialBatchProcessor {
89
public async asyncProcessRecord(
910
_record: BaseRecord
1011
): Promise<SuccessResponse | FailureResponse> {
11-
throw new Error('Not implemented. Use process() instead.');
12+
throw new BatchProcessingError('Not implemented. Use process() instead.');
1213
}
1314

1415
/**

packages/batch/src/SqsFifoPartialProcessor.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { BatchProcessor } from './BatchProcessor';
22
import { EventType } from './constants';
3+
import { SqsFifoShortCircuitError } from './errors';
34
import type { FailureResponse, SuccessResponse } from './types';
45

56
/**
@@ -52,10 +53,7 @@ class SqsFifoPartialProcessor extends BatchProcessor {
5253
for (const record of remainingRecords) {
5354
const data = this.toBatchType(record, this.eventType);
5455
processedRecords.push(
55-
this.failureHandler(
56-
data,
57-
new Error('A previous record failed processing')
58-
)
56+
this.failureHandler(data, new SqsFifoShortCircuitError())
5957
);
6058
}
6159

packages/batch/src/asyncProcessPartialResponse.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
2-
import { EventType } from './constants';
2+
import { UnexpectedBatchTypeError } from './errors';
33
import type {
44
BaseRecord,
55
BatchProcessingOptions,
@@ -19,13 +19,8 @@ const asyncProcessPartialResponse = async (
1919
processor: BasePartialBatchProcessor,
2020
options?: BatchProcessingOptions
2121
): Promise<PartialItemFailureResponse> => {
22-
if (!event.Records) {
23-
const eventTypes: string = Object.values(EventType).toString();
24-
throw new Error(
25-
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
26-
eventTypes +
27-
' event.'
28-
);
22+
if (!event.Records || !Array.isArray(event.Records)) {
23+
throw new UnexpectedBatchTypeError();
2924
}
3025

3126
processor.register(event.Records, recordHandler, options);

packages/batch/src/errors.ts

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,59 @@
1+
import { EventType } from './constants';
2+
13
/**
2-
* Base error type for batch processing
3-
* All errors thrown by major failures extend this base class
4+
* Base error thrown by the Batch Processing utility
45
*/
5-
class BaseBatchProcessingError extends Error {
6-
public childErrors: Error[];
7-
8-
public msg: string;
9-
10-
public constructor(msg: string, childErrors: Error[]) {
11-
super(msg);
12-
this.msg = msg;
13-
this.childErrors = childErrors;
6+
class BatchProcessingError extends Error {
7+
public constructor(message: string) {
8+
super(message);
9+
this.name = 'BatchProcessingError';
1410
}
11+
}
1512

16-
/**
17-
* Generates a list of errors that were generated by the major failure
18-
* @returns Formatted string listing all the errors that occurred
19-
*
20-
* @example
21-
* When all batch records fail to be processed, this will generate a string like:
22-
* All records failed processing. 3 individual errors logged separately below.
23-
* ,Failed to process record.
24-
* ,Failed to process record.
25-
* ,Failed to process record.
26-
*/
27-
public formatErrors(parentErrorString: string): string {
28-
const errorList: string[] = [parentErrorString + '\n'];
29-
30-
for (const error of this.childErrors) {
31-
errorList.push(error.message + '\n');
32-
}
13+
/**
14+
* Error thrown by the Batch Processing utility when all batch records failed to be processed
15+
*/
16+
class FullBatchFailureError extends BatchProcessingError {
17+
public recordErrors: Error[];
3318

34-
return '\n' + errorList;
19+
public constructor(childErrors: Error[]) {
20+
super('All records failed processing. See individual errors below.');
21+
this.recordErrors = childErrors;
22+
this.name = 'FullBatchFailureError';
3523
}
3624
}
3725

3826
/**
39-
* When all batch records failed to be processed
27+
* Error thrown by the Batch Processing utility when a SQS FIFO queue is short-circuited.
28+
* This happens when a record fails processing and the remaining records are not processed
29+
* to avoid out-of-order delivery.
4030
*/
41-
class BatchProcessingError extends BaseBatchProcessingError {
42-
public constructor(msg: string, childErrors: Error[]) {
43-
super(msg, childErrors);
44-
const parentErrorString: string = this.message;
45-
this.message = this.formatErrors(parentErrorString);
31+
class SqsFifoShortCircuitError extends BatchProcessingError {
32+
public constructor() {
33+
super(
34+
'A previous record failed processing. The remaining records were not processed to avoid out-of-order delivery.'
35+
);
36+
this.name = 'SqsFifoShortCircuitError';
4637
}
4738
}
4839

49-
export { BaseBatchProcessingError, BatchProcessingError };
40+
/**
41+
* Error thrown by the Batch Processing utility when a partial processor receives an unexpected
42+
* batch type.
43+
*/
44+
class UnexpectedBatchTypeError extends BatchProcessingError {
45+
public constructor() {
46+
super(
47+
`Unexpected batch type. Possible values are: ${Object.values(
48+
EventType
49+
).join(', ')}`
50+
);
51+
this.name = 'UnexpectedBatchTypeError';
52+
}
53+
}
54+
export {
55+
BatchProcessingError,
56+
FullBatchFailureError,
57+
SqsFifoShortCircuitError,
58+
UnexpectedBatchTypeError,
59+
};

packages/batch/src/processPartialResponse.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
2-
import { EventType } from './constants';
2+
import { UnexpectedBatchTypeError } from './errors';
33
import type {
44
BaseRecord,
55
BatchProcessingOptions,
@@ -19,13 +19,8 @@ const processPartialResponse = (
1919
processor: BasePartialBatchProcessor,
2020
options?: BatchProcessingOptions
2121
): PartialItemFailureResponse => {
22-
if (!event.Records) {
23-
const eventTypes: string = Object.values(EventType).toString();
24-
throw new Error(
25-
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
26-
eventTypes +
27-
' event.'
28-
);
22+
if (!event.Records || !Array.isArray(event.Records)) {
23+
throw new UnexpectedBatchTypeError();
2924
}
3025

3126
processor.register(event.Records, recordHandler, options);

packages/batch/tests/unit/AsyncBatchProcessor.test.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { Context } from 'aws-lambda';
77
import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts';
88
import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor';
99
import { EventType } from '../../src/constants';
10-
import { BatchProcessingError } from '../../src/errors';
10+
import { BatchProcessingError, FullBatchFailureError } from '../../src/errors';
1111
import type { BatchProcessingOptions } from '../../src/types';
1212
import {
1313
dynamodbRecordFactory,
@@ -95,7 +95,7 @@ describe('Class: AsyncBatchProcessor', () => {
9595

9696
// Assess
9797
await expect(processor.asyncProcess()).rejects.toThrowError(
98-
BatchProcessingError
98+
FullBatchFailureError
9999
);
100100
});
101101
});
@@ -160,7 +160,7 @@ describe('Class: AsyncBatchProcessor', () => {
160160

161161
// Assess
162162
await expect(processor.asyncProcess()).rejects.toThrowError(
163-
BatchProcessingError
163+
FullBatchFailureError
164164
);
165165
});
166166
});
@@ -225,7 +225,7 @@ describe('Class: AsyncBatchProcessor', () => {
225225

226226
// Assess
227227
await expect(processor.asyncProcess()).rejects.toThrowError(
228-
BatchProcessingError
228+
FullBatchFailureError
229229
);
230230
});
231231
});
@@ -279,7 +279,7 @@ describe('Class: AsyncBatchProcessor', () => {
279279
// Act
280280
processor.register(records, asyncHandlerWithContext, badOptions);
281281
await expect(() => processor.asyncProcess()).rejects.toThrowError(
282-
BatchProcessingError
282+
FullBatchFailureError
283283
);
284284
});
285285
});
@@ -289,8 +289,6 @@ describe('Class: AsyncBatchProcessor', () => {
289289
const processor = new AsyncBatchProcessor(EventType.SQS);
290290

291291
// Act & Assess
292-
expect(() => processor.process()).toThrowError(
293-
'Not implemented. Use asyncProcess() instead.'
294-
);
292+
expect(() => processor.process()).toThrowError(BatchProcessingError);
295293
});
296294
});

packages/batch/tests/unit/BatchProcessor.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { Context } from 'aws-lambda';
77
import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts';
88
import { BatchProcessor } from '../../src/BatchProcessor';
99
import { EventType } from '../../src/constants';
10-
import { BatchProcessingError } from '../../src/errors';
10+
import { BatchProcessingError, FullBatchFailureError } from '../../src/errors';
1111
import type { BatchProcessingOptions } from '../../src/types';
1212
import {
1313
dynamodbRecordFactory,
@@ -92,7 +92,7 @@ describe('Class: BatchProcessor', () => {
9292

9393
// Act & Assess
9494
processor.register(records, sqsRecordHandler);
95-
expect(() => processor.process()).toThrowError(BatchProcessingError);
95+
expect(() => processor.process()).toThrowError(FullBatchFailureError);
9696
});
9797
});
9898

@@ -154,7 +154,7 @@ describe('Class: BatchProcessor', () => {
154154
processor.register(records, kinesisRecordHandler);
155155

156156
// Assess
157-
expect(() => processor.process()).toThrowError(BatchProcessingError);
157+
expect(() => processor.process()).toThrowError(FullBatchFailureError);
158158
});
159159
});
160160

@@ -217,7 +217,7 @@ describe('Class: BatchProcessor', () => {
217217
processor.register(records, dynamodbRecordHandler);
218218

219219
// Assess
220-
expect(() => processor.process()).toThrowError(BatchProcessingError);
220+
expect(() => processor.process()).toThrowError(FullBatchFailureError);
221221
});
222222
});
223223

@@ -269,7 +269,7 @@ describe('Class: BatchProcessor', () => {
269269

270270
// Act
271271
processor.register(records, handlerWithContext, badOptions);
272-
expect(() => processor.process()).toThrowError(BatchProcessingError);
272+
expect(() => processor.process()).toThrowError(FullBatchFailureError);
273273
});
274274
});
275275

@@ -278,8 +278,8 @@ describe('Class: BatchProcessor', () => {
278278
const processor = new BatchProcessor(EventType.SQS);
279279

280280
// Act & Assess
281-
await expect(() => processor.asyncProcess()).rejects.toThrow(
282-
'Not implemented. Use process() instead.'
281+
await expect(() => processor.asyncProcess()).rejects.toThrowError(
282+
BatchProcessingError
283283
);
284284
});
285285
});

packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
*
44
* @group unit/batch/class/sqsfifobatchprocessor
55
*/
6-
import { SqsFifoPartialProcessor, processPartialResponse } from '../../src';
6+
import {
7+
SqsFifoPartialProcessor,
8+
processPartialResponse,
9+
SqsFifoShortCircuitError,
10+
} from '../../src';
711
import { sqsRecordFactory } from '../helpers/factories';
812
import { sqsRecordHandler } from '../helpers/handlers';
913

@@ -54,6 +58,7 @@ describe('Class: SqsFifoBatchProcessor', () => {
5458
expect(result['batchItemFailures'][1]['itemIdentifier']).toBe(
5559
thirdRecord.messageId
5660
);
61+
expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError);
5762
});
5863
});
5964
});

0 commit comments

Comments
 (0)