Skip to content

Commit 0205a87

Browse files
kiitosudreamorosi
andauthored
feat(parser): add support for tumbling windows in Kinesis and DynamoDB events (#3931)
Co-authored-by: Andrea Amorosi <[email protected]>
1 parent a2cb47d commit 0205a87

File tree

4 files changed

+246
-78
lines changed

4 files changed

+246
-78
lines changed

packages/parser/src/schemas/dynamodb.ts

Lines changed: 101 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -151,69 +151,103 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
151151
* @example
152152
* ```json
153153
* {
154-
* "Records": [
155-
* {
156-
* "eventID": "1",
157-
* "eventVersion": "1.0",
158-
* "dynamodb": {
159-
* "ApproximateCreationDateTime": 1693997155.0,
160-
* "Keys": {
161-
* "Id": {
162-
* "N": "101"
163-
* }
164-
* },
165-
* "NewImage": {
166-
* "Message": {
167-
* "S": "New item!"
168-
* },
169-
* "Id": {
170-
* "N": "101"
171-
* }
154+
* "Records":[{
155+
* "eventID":"1",
156+
* "eventName":"INSERT",
157+
* "eventVersion":"1.0",
158+
* "eventSource":"aws:dynamodb",
159+
* "awsRegion":"us-east-1",
160+
* "dynamodb":{
161+
* "Keys":{
162+
* "Id":{
163+
* "N":"101"
164+
* }
165+
* },
166+
* "NewImage":{
167+
* "Message":{
168+
* "S":"New item!"
172169
* },
173-
* "StreamViewType": "NEW_AND_OLD_IMAGES",
174-
* "SequenceNumber": "111",
175-
* "SizeBytes": 26
170+
* "Id":{
171+
* "N":"101"
172+
* }
176173
* },
177-
* "awsRegion": "us-west-2",
178-
* "eventName": "INSERT",
179-
* "eventSourceARN": "eventsource_arn",
180-
* "eventSource": "aws:dynamodb"
174+
* "SequenceNumber":"111",
175+
* "SizeBytes":26,
176+
* "StreamViewType":"NEW_AND_OLD_IMAGES"
181177
* },
182-
* {
183-
* "eventID": "2",
184-
* "eventVersion": "1.0",
185-
* "dynamodb": {
186-
* "OldImage": {
187-
* "Message": {
188-
* "S": "New item!"
189-
* },
190-
* "Id": {
191-
* "N": "101"
192-
* }
178+
* "eventSourceARN":"stream-ARN"
179+
* },
180+
* {
181+
* "eventID":"2",
182+
* "eventName":"MODIFY",
183+
* "eventVersion":"1.0",
184+
* "eventSource":"aws:dynamodb",
185+
* "awsRegion":"us-east-1",
186+
* "dynamodb":{
187+
* "Keys":{
188+
* "Id":{
189+
* "N":"101"
190+
* }
191+
* },
192+
* "NewImage":{
193+
* "Message":{
194+
* "S":"This item has changed"
193195
* },
194-
* "SequenceNumber": "222",
195-
* "Keys": {
196-
* "Id": {
197-
* "N": "101"
198-
* }
196+
* "Id":{
197+
* "N":"101"
198+
* }
199+
* },
200+
* "OldImage":{
201+
* "Message":{
202+
* "S":"New item!"
199203
* },
200-
* "SizeBytes": 59,
201-
* "NewImage": {
202-
* "Message": {
203-
* "S": "This item has changed"
204-
* },
205-
* "Id": {
206-
* "N": "101"
207-
* }
204+
* "Id":{
205+
* "N":"101"
206+
* }
207+
* },
208+
* "SequenceNumber":"222",
209+
* "SizeBytes":59,
210+
* "StreamViewType":"NEW_AND_OLD_IMAGES"
211+
* },
212+
* "eventSourceARN":"stream-ARN"
213+
* },
214+
* {
215+
* "eventID":"3",
216+
* "eventName":"REMOVE",
217+
* "eventVersion":"1.0",
218+
* "eventSource":"aws:dynamodb",
219+
* "awsRegion":"us-east-1",
220+
* "dynamodb":{
221+
* "Keys":{
222+
* "Id":{
223+
* "N":"101"
224+
* }
225+
* },
226+
* "OldImage":{
227+
* "Message":{
228+
* "S":"This item has changed"
208229
* },
209-
* "StreamViewType": "NEW_AND_OLD_IMAGES"
230+
* "Id":{
231+
* "N":"101"
232+
* }
210233
* },
211-
* "awsRegion": "us-west-2",
212-
* "eventName": "MODIFY",
213-
* "eventSourceARN": "source_arn",
214-
* "eventSource": "aws:dynamodb"
215-
* }
216-
* ]
234+
* "SequenceNumber":"333",
235+
* "SizeBytes":38,
236+
* "StreamViewType":"NEW_AND_OLD_IMAGES"
237+
* },
238+
* "eventSourceARN":"stream-ARN"
239+
* }],
240+
* "window": {
241+
* "start": "2020-07-30T17:00:00Z",
242+
* "end": "2020-07-30T17:05:00Z"
243+
* },
244+
* "state": {
245+
* "1": "state1"
246+
* },
247+
* "shardId": "shard123456789",
248+
* "eventSourceARN": "stream-ARN",
249+
* "isFinalInvokeForWindow": false,
250+
* "isWindowTerminatedEarly": false
217251
* }
218252
* ```
219253
*
@@ -222,6 +256,17 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
222256
*/
223257
const DynamoDBStreamSchema = z.object({
224258
Records: z.array(DynamoDBStreamRecord).min(1),
259+
window: z
260+
.object({
261+
start: z.string().datetime(),
262+
end: z.string().datetime(),
263+
})
264+
.optional(),
265+
state: z.record(z.string(), z.string()).optional(),
266+
shardId: z.string().optional(),
267+
eventSourceARN: z.string().optional(),
268+
isFinalInvokeForWindow: z.boolean().optional(),
269+
isWindowTerminatedEarly: z.boolean().optional(),
225270
});
226271

227272
export {

packages/parser/src/schemas/kinesis.ts

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,41 +70,48 @@ const KinesisDynamoDBStreamSchema = z.object({
7070
* "partitionKey": "1",
7171
* "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
7272
* "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
73-
* "approximateArrivalTimestamp": 1545084650.987
73+
* "approximateArrivalTimestamp": 1607497475.000
7474
* },
7575
* "eventSource": "aws:kinesis",
7676
* "eventVersion": "1.0",
7777
* "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
7878
* "eventName": "aws:kinesis:record",
79-
* "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
80-
* "awsRegion": "us-east-2",
81-
* "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
82-
* },
83-
* {
84-
* "kinesis": {
85-
* "kinesisSchemaVersion": "1.0",
86-
* "partitionKey": "1",
87-
* "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
88-
* "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
89-
* "approximateArrivalTimestamp": 1545084711.166
90-
* },
91-
* "eventSource": "aws:kinesis",
92-
* "eventVersion": "1.0",
93-
* "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
94-
* "eventName": "aws:kinesis:record",
95-
* "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
96-
* "awsRegion": "us-east-2",
97-
* "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
79+
* "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
80+
* "awsRegion": "us-east-1",
81+
* "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
9882
* }
99-
* ]
83+
* ],
84+
* "window": {
85+
* "start": "2020-12-09T07:04:00Z",
86+
* "end": "2020-12-09T07:06:00Z"
87+
* },
88+
* "state": {
89+
* "1": 282,
90+
* "2": 715
91+
* },
92+
* "shardId": "shardId-000000000006",
93+
* "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
94+
* "isFinalInvokeForWindow": false,
95+
* "isWindowTerminatedEarly": false
10096
* }
10197
*```
10298
* @see {@link types.KinesisDataStreamEvent | KinesisDataStreamEvent}
103-
* @see {@link https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-event-example}
99+
* @see {@link https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html#streams-tumbling-processing}
104100
*
105101
*/
106102
const KinesisDataStreamSchema = z.object({
107103
Records: z.array(KinesisDataStreamRecord).min(1),
104+
window: z
105+
.object({
106+
start: z.string().datetime(),
107+
end: z.string().datetime(),
108+
})
109+
.optional(),
110+
state: z.record(z.string(), z.unknown()).optional(),
111+
shardId: z.string().optional(),
112+
eventSourceARN: z.string().optional(),
113+
isFinalInvokeForWindow: z.boolean().optional(),
114+
isWindowTerminatedEarly: z.boolean().optional(),
108115
});
109116

110117
export {

packages/parser/tests/unit/schema/dynamodb.test.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,85 @@ describe('Schema: DynamoDB', () => {
112112
// Act & Assess
113113
expect(() => DynamoDBStreamSchema.parse(event)).toThrow();
114114
});
115+
116+
it('parses a DynamoDB Stream with tumbling window event', () => {
117+
// Prepare
118+
const event = structuredClone(baseEvent);
119+
event.window = {
120+
start: '2020-07-30T17:00:00Z',
121+
end: '2020-07-30T17:05:00Z',
122+
};
123+
event.state = {
124+
'1': 'state1',
125+
};
126+
event.shardId = 'shard123456789';
127+
event.eventSourceARN = 'stream-ARN';
128+
event.isFinalInvokeForWindow = false;
129+
event.isWindowTerminatedEarly = false;
130+
131+
// Act
132+
const result = DynamoDBStreamSchema.parse(event);
133+
134+
// Assess
135+
expect(result).toStrictEqual({
136+
Records: [
137+
{
138+
eventID: '1',
139+
eventVersion: '1.0',
140+
dynamodb: {
141+
ApproximateCreationDateTime: 1693997155.0,
142+
Keys: {
143+
Id: 101,
144+
},
145+
NewImage: {
146+
Message: 'New item!',
147+
Id: 101,
148+
},
149+
StreamViewType: 'NEW_IMAGE',
150+
SequenceNumber: '111',
151+
SizeBytes: 26,
152+
},
153+
awsRegion: 'us-west-2',
154+
eventName: 'INSERT',
155+
eventSourceARN: 'eventsource_arn',
156+
eventSource: 'aws:dynamodb',
157+
},
158+
{
159+
eventID: '2',
160+
eventVersion: '1.0',
161+
dynamodb: {
162+
OldImage: {
163+
Message: 'New item!',
164+
Id: 101,
165+
},
166+
SequenceNumber: '222',
167+
Keys: {
168+
Id: 101,
169+
},
170+
SizeBytes: 59,
171+
NewImage: {
172+
Message: 'This item has changed',
173+
Id: 101,
174+
},
175+
StreamViewType: 'NEW_AND_OLD_IMAGES',
176+
},
177+
awsRegion: 'us-west-2',
178+
eventName: 'MODIFY',
179+
eventSourceARN: 'source_arn',
180+
eventSource: 'aws:dynamodb',
181+
},
182+
],
183+
window: {
184+
start: '2020-07-30T17:00:00Z',
185+
end: '2020-07-30T17:05:00Z',
186+
},
187+
state: {
188+
'1': 'state1',
189+
},
190+
shardId: 'shard123456789',
191+
eventSourceARN: 'stream-ARN',
192+
isFinalInvokeForWindow: false,
193+
isWindowTerminatedEarly: false,
194+
});
195+
});
115196
});

packages/parser/tests/unit/schema/kinesis.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,41 @@ describe('Schema: Kinesis', () => {
167167
expect(parsed).toStrictEqual(transformedInput);
168168
});
169169

170+
it('parses Kinesis event with tumbling window', () => {
171+
// Prepare
172+
const testEvent = structuredClone(kinesisStreamEvent);
173+
testEvent.window = {
174+
start: '2020-07-30T17:00:00Z',
175+
end: '2020-07-30T17:05:00Z',
176+
};
177+
testEvent.state = {
178+
'1': 'state1',
179+
};
180+
testEvent.shardId = 'shard123456789';
181+
testEvent.eventSourceARN = 'stream-ARN';
182+
testEvent.isFinalInvokeForWindow = false;
183+
testEvent.isWindowTerminatedEarly = false;
184+
185+
// Act
186+
const parsed = KinesisDataStreamSchema.parse(testEvent);
187+
188+
const transformedInput = {
189+
...testEvent,
190+
Records: testEvent.Records.map((record, index) => {
191+
return {
192+
...record,
193+
kinesis: {
194+
...record.kinesis,
195+
data: Buffer.from(record.kinesis.data, 'base64').toString(),
196+
},
197+
};
198+
}),
199+
};
200+
201+
// Assess
202+
expect(parsed).toStrictEqual(transformedInput);
203+
});
204+
170205
it('throws if cannot parse SQS record of KinesisFirehoseSqsRecord', () => {
171206
// Prepare
172207
const testEvent = getTestEvent<KinesisFireHoseSqsEvent>({

0 commit comments

Comments
 (0)