Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions packages/parser/src/schemas/dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { KinesisEnvelope } from '../envelopes/kinesis.js';
import type { DynamoDBMarshalled } from '../helpers/dynamodb.js';
import type { DynamoDBStreamEvent } from '../types/schema.js';

const DynamoDBStreamChangeRecordBase = z.object({
const DynamoDBStreamChangeRecordBaseSchema = z.object({
ApproximateCreationDateTime: z.number().optional(),
Keys: z.record(z.string(), z.record(z.string(), z.any())),
NewImage: z.record(z.string(), z.any()).optional(),
Expand All @@ -19,19 +19,28 @@ const DynamoDBStreamChangeRecordBase = z.object({
]),
});

const DynamoDBStreamToKinesisChangeRecord = DynamoDBStreamChangeRecordBase.omit(
{
type DynamoDBStreamChangeRecordBase = z.infer<
typeof DynamoDBStreamChangeRecordBaseSchema
>;

const DynamoDBStreamToKinesisChangeRecordSchema =
DynamoDBStreamChangeRecordBaseSchema.omit({
SequenceNumber: true,
StreamViewType: true,
}
);
});

type DynamoDBStreamToKinesisChangeRecord = z.infer<
typeof DynamoDBStreamToKinesisChangeRecordSchema
>;

const unmarshallDynamoDBTransform = (
object:
| z.infer<typeof DynamoDBStreamChangeRecordBase>
| z.infer<typeof DynamoDBStreamToKinesisChangeRecord>,
const unmarshallDynamoDBTransform = <
T extends
| DynamoDBStreamChangeRecordBase
| DynamoDBStreamToKinesisChangeRecord,
>(
object: T,
ctx: z.RefinementCtx
) => {
): T => {
const result = { ...object };

const unmarshallAttributeValue = (
Expand Down Expand Up @@ -73,24 +82,27 @@ const unmarshallDynamoDBTransform = (
return result;
};

const DynamoDBStreamChangeRecord = DynamoDBStreamChangeRecordBase.transform(
unmarshallDynamoDBTransform
);
const DynamoDBStreamChangeRecordSchema =
DynamoDBStreamChangeRecordBaseSchema.transform(
unmarshallDynamoDBTransform<
DynamoDBStreamChangeRecordBase | DynamoDBStreamToKinesisChangeRecord
>
);

const UserIdentity = z.object({
const UserIdentitySchema = z.object({
type: z.enum(['Service']),
principalId: z.literal('dynamodb.amazonaws.com'),
});

const DynamoDBStreamRecord = z.object({
const DynamoDBStreamRecordSchema = z.object({
eventID: z.string(),
eventName: z.enum(['INSERT', 'MODIFY', 'REMOVE']),
eventVersion: z.string(),
eventSource: z.literal('aws:dynamodb'),
awsRegion: z.string(),
eventSourceARN: z.string(),
dynamodb: DynamoDBStreamChangeRecord,
userIdentity: UserIdentity.optional(),
dynamodb: DynamoDBStreamChangeRecordSchema,
userIdentity: UserIdentitySchema.optional(),
});

/**
Expand Down Expand Up @@ -135,12 +147,12 @@ const DynamoDBStreamRecord = z.object({
* type CustomEvent = z.infer<typeof CustomSchema>;
* ```
*/
const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
const DynamoDBStreamToKinesisRecordSchema = DynamoDBStreamRecordSchema.extend({
recordFormat: z.literal('application/json'),
tableName: z.string(),
userIdentity: UserIdentity.nullish(),
dynamodb: DynamoDBStreamToKinesisChangeRecord.transform(
unmarshallDynamoDBTransform
userIdentity: UserIdentitySchema.nullish(),
dynamodb: DynamoDBStreamToKinesisChangeRecordSchema.transform(
unmarshallDynamoDBTransform<DynamoDBStreamToKinesisChangeRecord>
),
}).omit({
eventVersion: true,
Expand Down Expand Up @@ -256,8 +268,8 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
* @see {@link DynamoDBStreamEvent | DynamoDBStreamEvent}
* @see {@link https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html}
*/
const DynamoDBStreamSchema = z.object({
Records: z.array(DynamoDBStreamRecord).nonempty(),
const DynamoDBStreamSchemaSchema = z.object({
Records: z.array(DynamoDBStreamRecordSchema).nonempty(),
window: z
.object({
start: z.iso.datetime(),
Expand All @@ -272,11 +284,11 @@ const DynamoDBStreamSchema = z.object({
});

export {
DynamoDBStreamToKinesisRecord,
DynamoDBStreamToKinesisChangeRecord,
DynamoDBStreamSchema,
DynamoDBStreamRecord,
DynamoDBStreamChangeRecord,
DynamoDBStreamChangeRecordBase,
UserIdentity,
DynamoDBStreamToKinesisRecordSchema as DynamoDBStreamToKinesisRecord,
DynamoDBStreamToKinesisChangeRecordSchema as DynamoDBStreamToKinesisChangeRecord,
DynamoDBStreamSchemaSchema as DynamoDBStreamSchema,
DynamoDBStreamRecordSchema as DynamoDBStreamRecord,
DynamoDBStreamChangeRecordSchema as DynamoDBStreamChangeRecord,
DynamoDBStreamChangeRecordBaseSchema as DynamoDBStreamChangeRecordBase,
UserIdentitySchema as UserIdentity,
};
Loading