Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion examples/2016-10-31/stream_processor/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,33 @@ Resources:
Handler: index.handler
Runtime: nodejs10.x
CodeUri: src/
Policies:
- SNSPublishMessagePolicy:
TopicName: !GetAtt MySnsTopic.TopicName
Events:
Stream:
Type: Kinesis
Properties:
Stream: !GetAtt Stream.Arn
MaximumBatchingWindowInSeconds: 20
ParallelizationFactor: 8
MaximumRetryAttempts: 100
BisectBatchOnFunctionError: true
MaximumRecordAgeInSeconds: 604800
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Destination: !Ref MySnsTopic

Stream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

Outputs:
MySnsTopic:
Type: AWS::SNS::Topic

Outputs:
KinesisStream:
Description: "Kinesis Stream that will trigger Lambda function upon new records"
Value: !GetAtt Stream.Arn
2 changes: 1 addition & 1 deletion samtranslator/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.15.1'
__version__ = '1.16.0'
13 changes: 12 additions & 1 deletion samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ class PullEventSource(ResourceMacro):
'BatchSize': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(False, is_str()),
'Enabled': PropertyType(False, is_type(bool)),
'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int))
'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int)),
'MaximumRetryAttempts': PropertyType(False, is_type(int)),
'BisectBatchOnFunctionError': PropertyType(False, is_type(bool)),
'MaximumRecordAgeInSeconds': PropertyType(False, is_type(int)),
'DestinationConfig': PropertyType(False, is_type(dict)),
'ParallelizationFactor': PropertyType(False, is_type(int))
}

def get_policy_arn(self):
Expand Down Expand Up @@ -66,6 +71,12 @@ def to_cloudformation(self, **kwargs):
lambda_eventsourcemapping.BatchSize = self.BatchSize
lambda_eventsourcemapping.Enabled = self.Enabled
lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor

if 'Condition' in function.resource_attributes:
lambda_eventsourcemapping.set_resource_attribute('Condition', function.resource_attributes['Condition'])

Expand Down
5 changes: 5 additions & 0 deletions samtranslator/model/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class LambdaEventSourceMapping(Resource):
'EventSourceArn': PropertyType(True, is_str()),
'FunctionName': PropertyType(True, is_str()),
'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int)),
'MaximumRetryAttempts': PropertyType(False, is_type(int)),
'BisectBatchOnFunctionError': PropertyType(False, is_type(bool)),
'MaximumRecordAgeInSeconds': PropertyType(False, is_type(int)),
'DestinationConfig': PropertyType(False, is_type(dict)),
'ParallelizationFactor': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(False, is_str())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: EventSourceMapping example with MaximumBatchingWindowInSeconds property

Parameters:
MyBatchingWindowParam:
Expand All @@ -23,6 +22,9 @@ Resources:
}
}
Runtime: nodejs8.10
Policies:
- SQSSendMessagePolicy:
QueueName: !GetAtt MySqsQueue.QueueName
Events:
Stream:
Type: Kinesis
Expand All @@ -42,7 +44,14 @@ Resources:
Stream: !GetAtt DynamoDBTable.StreamArn
BatchSize: 100
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
ParallelizationFactor: 8
MaximumRetryAttempts: 100
BisectBatchOnFunctionError: true
MaximumRecordAgeInSeconds: 86400
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Destination: !GetAtt MySqsQueue.Arn

KinesisStream:
Type: AWS::Kinesis::Stream
Expand All @@ -66,4 +75,7 @@ Resources:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
StreamSpecification:
StreamViewType: NEW_IMAGE
StreamViewType: NEW_IMAGE

MySqsQueue:
Type: AWS::SQS::Queue
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
}
},
"Resources": {
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand All @@ -24,6 +27,30 @@
"StartingPosition": "LATEST"
}
},
"DynamoDBTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"KeyType": "HASH",
"AttributeName": "id"
}
],
"StreamSpecification": {
"StreamViewType": "NEW_IMAGE"
},
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"ProvisionedThroughput": {
"WriteCapacityUnits": 5,
"ReadCapacityUnits": 5
}
}
},
"MyFunctionForBatchingExampleRole": {
"Type": "AWS::IAM::Role",
"Properties": {
Expand All @@ -32,6 +59,34 @@
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole"
],
"Policies": [
{
"PolicyName": "MyFunctionForBatchingExampleRolePolicy0",
"PolicyDocument": {
"Statement": [
{
"Action": [
"sqs:SendMessage*"
],
"Resource": {
"Fn::Sub": [
"arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${queueName}",
{
"queueName": {
"Fn::GetAtt": [
"MySqsQueue",
"QueueName"
]
}
}
]
},
"Effect": "Allow"
}
]
}
}
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
Expand Down Expand Up @@ -62,17 +117,31 @@
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"BatchSize": 100,
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "TRIM_HORIZON",
"MaximumRecordAgeInSeconds": 86400,
"BatchSize": 100,
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
}
}
},
"EventSourceArn": {
"Fn::GetAtt": [
"DynamoDBTable",
"StreamArn"
]
}
},
"StartingPosition": "TRIM_HORIZON",
"ParallelizationFactor": 8,
"MaximumRetryAttempts": 100,
"BisectBatchOnFunctionError": true
}
},
"MyFunctionForBatchingExample": {
Expand All @@ -97,30 +166,6 @@
]
}
},
"DynamoDBTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"KeyType": "HASH",
"AttributeName": "id"
}
],
"StreamSpecification": {
"StreamViewType": "NEW_IMAGE"
},
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"ProvisionedThroughput": {
"WriteCapacityUnits": 5,
"ReadCapacityUnits": 5
}
}
},
"MyFunctionForBatchingExampleStreamEvent": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand All @@ -145,13 +190,5 @@
"ShardCount": 1
}
}
},
"Parameters": {
"MyBatchingWindowParam": {
"Default": 45,
"Type": "Number",
"Description": "parameter for batching window in seconds"
}
},
"Description": "EventSourceMapping example with MaximumBatchingWindowInSeconds property"
}
}
Loading