Skip to content

Add DynamoDB and Kinesis Time Window events and response models #441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion events/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,20 @@ type DynamoDBEvent struct {
Records []DynamoDBEventRecord `json:"Records"`
}

// DynamoDbEventRecord stores information about each record of a DynamoDb stream event
// DynamoDBTimeWindowEvent represents an Amazon Dynamodb event when using time windows
// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type DynamoDBTimeWindowEvent struct {
DynamoDBEvent
TimeWindowProperties
}

// DynamoDBTimeWindowEventResponse is the outer structure to report batch item failures for DynamoDBTimeWindowEvent.
type DynamoDBTimeWindowEventResponse struct {
TimeWindowEventResponseProperties
BatchItemFailures []DynamoDBBatchItemFailure `json:"batchItemFailures"`
}

// DynamoDBEventRecord stores information about each record of a DynamoDB stream event
type DynamoDBEventRecord struct {
// The region in which the GetRecords request was received.
AWSRegion string `json:"awsRegion"`
Expand Down
24 changes: 24 additions & 0 deletions events/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,27 @@ func TestDynamoDBEventMarshaling(t *testing.T) {
func TestDynamoDBEventMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, DynamoDBEvent{})
}

func TestDynamoDBTimeWindowEventMarshaling(t *testing.T) {
// 1. read JSON from file
inputJSON := test.ReadJSONFromFile(t, "./testdata/dynamodb-time-window-event.json")

// 2. de-serialize into Go object
var inputEvent DynamoDBTimeWindowEvent
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
t.Errorf("could not unmarshal event. details: %v", err)
}

// 3. serialize to JSON
outputJSON, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 4. check result
assert.JSONEq(t, string(inputJSON), string(outputJSON))
}

func TestDynamoDBTimeWindowEventMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, DynamoDBTimeWindowEvent{})
}
26 changes: 26 additions & 0 deletions events/epoch_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"time"
)

// RFC3339EpochTime serializes a time.Time in JSON as an ISO 8601 string.
type RFC3339EpochTime struct {
time.Time
}

// SecondsEpochTime serializes a time.Time in JSON as a UNIX epoch time in seconds
type SecondsEpochTime struct {
time.Time
Expand Down Expand Up @@ -57,3 +62,24 @@ func (e *MilliSecondsEpochTime) UnmarshalJSON(b []byte) error {
*e = MilliSecondsEpochTime{time.Unix(epoch/1000, (epoch%1000)*1000000)}
return nil
}

func (e RFC3339EpochTime) MarshalJSON() ([]byte, error) {
isoTimestampStr := e.Format(time.RFC3339)
return json.Marshal(isoTimestampStr)
}

func (e *RFC3339EpochTime) UnmarshalJSON(b []byte) error {
var isoTimestampStr string
err := json.Unmarshal(b, &isoTimestampStr)
if err != nil {
return err
}

parsed, err := time.Parse(time.RFC3339, isoTimestampStr)
if err != nil {
return err
}

*e = RFC3339EpochTime{parsed}
return nil
}
13 changes: 13 additions & 0 deletions events/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ type KinesisEvent struct {
Records []KinesisEventRecord `json:"Records"`
}

// KinesisTimeWindowEvent represents an Amazon Dynamodb event when using time windows
// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
type KinesisTimeWindowEvent struct {
KinesisEvent
TimeWindowProperties
}

// KinesisTimeWindowEventResponse is the outer structure to report batch item failures for KinesisTimeWindowEvent.
type KinesisTimeWindowEventResponse struct {
TimeWindowEventResponseProperties
BatchItemFailures []KinesisBatchItemFailure `json:"batchItemFailures"`
}

type KinesisEventRecord struct {
AwsRegion string `json:"awsRegion"` //nolint: stylecheck
EventID string `json:"eventID"`
Expand Down
24 changes: 24 additions & 0 deletions events/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,27 @@ func TestKinesisEventMarshaling(t *testing.T) {
func TestKinesisMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, KinesisEvent{})
}

func TestKinesisTimeWindowEventMarshaling(t *testing.T) {
// 1. read JSON from file
inputJSON := test.ReadJSONFromFile(t, "./testdata/kinesis-time-window-event.json")

// 2. de-serialize into Go object
var inputEvent KinesisTimeWindowEvent
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
t.Errorf("could not unmarshal event. details: %v", err)
}

// 3. serialize to JSON
outputJSON, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 4. check result
assert.JSONEq(t, string(inputJSON), string(outputJSON))
}

func TestKinesisTimeWindowEventMarshalingMalformedJson(t *testing.T) {
test.TestMalformedJson(t, KinesisTimeWindowEvent{})
}
104 changes: 104 additions & 0 deletions events/testdata/dynamodb-time-window-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"ApproximateCreationDateTime": 1480642020,
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"2",
"eventName":"MODIFY",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"ApproximateCreationDateTime": 1480642020,
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"222",
"SizeBytes":59,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"3",
"eventName":"REMOVE",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"ApproximateCreationDateTime": 1480642020,
"Keys":{
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"333",
"SizeBytes":38,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
],
"window": {
"start": "2020-07-30T17:00:00Z",
"end": "2020-07-30T17:05:00Z"
},
"state": {
"1": "state1"
},
"shardId": "shard123456789",
"eventSourceARN": "stream-ARN",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
33 changes: 33 additions & 0 deletions events/testdata/kinesis-time-window-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1607497475.000
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
}
],
"window": {
"start": "2020-12-09T07:04:00Z",
"end": "2020-12-09T07:06:00Z"
},
"state": {
"1": "state 1",
"2": "state 2"
},
"shardId": "shardId-000000000006",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
42 changes: 42 additions & 0 deletions events/time_window.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package events

// Window is the object that captures the time window for the records in the event when using the tumbling windows feature
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type Window struct {
Start RFC3339EpochTime `json:"start"`
End RFC3339EpochTime `json:"end"`
}

// TimeWindowProperties is the object that captures properties that relate to the tumbling windows feature
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type TimeWindowProperties struct {
// Time window for the records in the event.
Window Window `json:"window"`

// State being built up to this invoke in the time window.
State map[string]string `json:"state"`

// Shard id of the records
ShardID string `json:"shardId"`

// The event source ARN of the service that generated the event (eg. DynamoDB or Kinesis)
EventSourceARN string `json:"eventSourceARN"`

// Set to true for the last invoke of the time window.
// Subsequent invoke will start a new time window along with a fresh state.
IsFinalInvokeForWindow bool `json:"isFinalInvokeForWindow"`

// Set to true if window is terminated prematurely.
// Subsequent invoke will continue the same window with a fresh state.
IsWindowTerminatedEarly bool `json:"isWindowTerminatedEarly"`
}

// TimeWindowEventResponseProperties is the object that captures response properties that relate to the tumbling windows feature
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
type TimeWindowEventResponseProperties struct {
// State being built up to this invoke in the time window.
State map[string]string `json:"state"`
}