diff --git a/examples/2016-10-31/stream_processor/template.yaml b/examples/2016-10-31/stream_processor/template.yaml index 8222fea94b..5e7333fe6a 100644 --- a/examples/2016-10-31/stream_processor/template.yaml +++ b/examples/2016-10-31/stream_processor/template.yaml @@ -8,21 +8,33 @@ Resources: Handler: index.handler Runtime: nodejs10.x CodeUri: src/ + Policies: + - SNSPublishMessagePolicy: + TopicName: !GetAtt MySnsTopic.TopicName Events: Stream: Type: Kinesis Properties: Stream: !GetAtt Stream.Arn MaximumBatchingWindowInSeconds: 20 + ParallelizationFactor: 8 + MaximumRetryAttempts: 100 + BisectBatchOnFunctionError: true + MaximumRecordAgeInSeconds: 604800 StartingPosition: TRIM_HORIZON + DestinationConfig: + OnFailure: + Destination: !Ref MySnsTopic Stream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 -Outputs: + MySnsTopic: + Type: AWS::SNS::Topic +Outputs: KinesisStream: Description: "Kinesis Stream that will trigger Lambda function upon new records" Value: !GetAtt Stream.Arn diff --git a/samtranslator/__init__.py b/samtranslator/__init__.py index 2ed81080a1..20f9c7ea59 100644 --- a/samtranslator/__init__.py +++ b/samtranslator/__init__.py @@ -1 +1 @@ -__version__ = '1.15.1' +__version__ = '1.16.0' diff --git a/samtranslator/model/eventsources/pull.py b/samtranslator/model/eventsources/pull.py index f3138b6e24..da53084924 100644 --- a/samtranslator/model/eventsources/pull.py +++ b/samtranslator/model/eventsources/pull.py @@ -22,7 +22,12 @@ class PullEventSource(ResourceMacro): 'BatchSize': PropertyType(False, is_type(int)), 'StartingPosition': PropertyType(False, is_str()), 'Enabled': PropertyType(False, is_type(bool)), - 'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int)) + 'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int)), + 'MaximumRetryAttempts': PropertyType(False, is_type(int)), + 'BisectBatchOnFunctionError': PropertyType(False, is_type(bool)), + 'MaximumRecordAgeInSeconds': PropertyType(False, is_type(int)), + 'DestinationConfig': PropertyType(False, is_type(dict)), + 'ParallelizationFactor': PropertyType(False, is_type(int)) } def get_policy_arn(self): @@ -66,6 +71,12 @@ def to_cloudformation(self, **kwargs): lambda_eventsourcemapping.BatchSize = self.BatchSize lambda_eventsourcemapping.Enabled = self.Enabled lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds + lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts + lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError + lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds + lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig + lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor + if 'Condition' in function.resource_attributes: lambda_eventsourcemapping.set_resource_attribute('Condition', function.resource_attributes['Condition']) diff --git a/samtranslator/model/lambda_.py b/samtranslator/model/lambda_.py index 1d2ea489a9..c4e7de5148 100644 --- a/samtranslator/model/lambda_.py +++ b/samtranslator/model/lambda_.py @@ -66,6 +66,11 @@ class LambdaEventSourceMapping(Resource): 'EventSourceArn': PropertyType(True, is_str()), 'FunctionName': PropertyType(True, is_str()), 'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int)), + 'MaximumRetryAttempts': PropertyType(False, is_type(int)), + 'BisectBatchOnFunctionError': PropertyType(False, is_type(bool)), + 'MaximumRecordAgeInSeconds': PropertyType(False, is_type(int)), + 'DestinationConfig': PropertyType(False, is_type(dict)), + 'ParallelizationFactor': PropertyType(False, is_type(int)), 'StartingPosition': PropertyType(False, is_str()) } diff --git a/tests/translator/input/function_with_batch_window.yaml b/tests/translator/input/function_with_event_source_mapping.yaml similarity index 79% rename from tests/translator/input/function_with_batch_window.yaml rename to tests/translator/input/function_with_event_source_mapping.yaml index 606734458b..7f743f4f4c 100644 --- a/tests/translator/input/function_with_batch_window.yaml +++ b/tests/translator/input/function_with_event_source_mapping.yaml @@ -1,6 +1,5 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 -Description: EventSourceMapping example with MaximumBatchingWindowInSeconds property Parameters: MyBatchingWindowParam: @@ -23,6 +22,9 @@ Resources: } } Runtime: nodejs8.10 + Policies: + - SQSSendMessagePolicy: + QueueName: !GetAtt MySqsQueue.QueueName Events: Stream: Type: Kinesis @@ -42,7 +44,14 @@ Resources: Stream: !GetAtt DynamoDBTable.StreamArn BatchSize: 100 MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam + ParallelizationFactor: 8 + MaximumRetryAttempts: 100 + BisectBatchOnFunctionError: true + MaximumRecordAgeInSeconds: 86400 StartingPosition: TRIM_HORIZON + DestinationConfig: + OnFailure: + Destination: !GetAtt MySqsQueue.Arn KinesisStream: Type: AWS::Kinesis::Stream @@ -66,4 +75,7 @@ Resources: ReadCapacityUnits: 5 WriteCapacityUnits: 5 StreamSpecification: - StreamViewType: NEW_IMAGE \ No newline at end of file + StreamViewType: NEW_IMAGE + + MySqsQueue: + Type: AWS::SQS::Queue \ No newline at end of file diff --git a/tests/translator/output/aws-cn/function_with_batch_window.json b/tests/translator/output/aws-cn/function_with_event_source_mapping.json similarity index 74% rename from tests/translator/output/aws-cn/function_with_batch_window.json rename to tests/translator/output/aws-cn/function_with_event_source_mapping.json index ae15765cd8..0940f87207 100644 --- a/tests/translator/output/aws-cn/function_with_batch_window.json +++ b/tests/translator/output/aws-cn/function_with_event_source_mapping.json @@ -8,6 +8,9 @@ } }, "Resources": { + "MySqsQueue": { + "Type": "AWS::SQS::Queue" + }, "MyFunctionForBatchingExampleStream": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { @@ -24,6 +27,30 @@ "StartingPosition": "LATEST" } }, + "DynamoDBTable": { + "Type": "AWS::DynamoDB::Table", + "Properties": { + "KeySchema": [ + { + "KeyType": "HASH", + "AttributeName": "id" + } + ], + "StreamSpecification": { + "StreamViewType": "NEW_IMAGE" + }, + "AttributeDefinitions": [ + { + "AttributeName": "id", + "AttributeType": "S" + } + ], + "ProvisionedThroughput": { + "WriteCapacityUnits": 5, + "ReadCapacityUnits": 5 + } + } + }, "MyFunctionForBatchingExampleRole": { "Type": "AWS::IAM::Role", "Properties": { @@ -32,6 +59,34 @@ "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole", "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole" ], + "Policies": [ + { + "PolicyName": "MyFunctionForBatchingExampleRolePolicy0", + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "sqs:SendMessage*" + ], + "Resource": { + "Fn::Sub": [ + "arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${queueName}", + { + "queueName": { + "Fn::GetAtt": [ + "MySqsQueue", + "QueueName" + ] + } + } + ] + }, + "Effect": "Allow" + } + ] + } + } + ], "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ @@ -62,17 +117,31 @@ "MaximumBatchingWindowInSeconds": { "Ref": "MyBatchingWindowParam" }, - "BatchSize": 100, "FunctionName": { "Ref": "MyFunctionForBatchingExample" }, - "StartingPosition": "TRIM_HORIZON", + "MaximumRecordAgeInSeconds": 86400, + "BatchSize": 100, + "DestinationConfig": { + "OnFailure": { + "Destination": { + "Fn::GetAtt": [ + "MySqsQueue", + "Arn" + ] + } + } + }, "EventSourceArn": { "Fn::GetAtt": [ "DynamoDBTable", "StreamArn" ] - } + }, + "StartingPosition": "TRIM_HORIZON", + "ParallelizationFactor": 8, + "MaximumRetryAttempts": 100, + "BisectBatchOnFunctionError": true } }, "MyFunctionForBatchingExample": { @@ -97,30 +166,6 @@ ] } }, - "DynamoDBTable": { - "Type": "AWS::DynamoDB::Table", - "Properties": { - "KeySchema": [ - { - "KeyType": "HASH", - "AttributeName": "id" - } - ], - "StreamSpecification": { - "StreamViewType": "NEW_IMAGE" - }, - "AttributeDefinitions": [ - { - "AttributeName": "id", - "AttributeType": "S" - } - ], - "ProvisionedThroughput": { - "WriteCapacityUnits": 5, - "ReadCapacityUnits": 5 - } - } - }, "MyFunctionForBatchingExampleStreamEvent": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { @@ -145,13 +190,5 @@ "ShardCount": 1 } } - }, - "Parameters": { - "MyBatchingWindowParam": { - "Default": 45, - "Type": "Number", - "Description": "parameter for batching window in seconds" - } - }, - "Description": "EventSourceMapping example with MaximumBatchingWindowInSeconds property" + } } \ No newline at end of file diff --git a/tests/translator/output/aws-us-gov/function_with_batch_window.json b/tests/translator/output/aws-us-gov/function_with_event_source_mapping.json similarity index 74% rename from tests/translator/output/aws-us-gov/function_with_batch_window.json rename to tests/translator/output/aws-us-gov/function_with_event_source_mapping.json index 5dda34c670..bbf124ed6e 100644 --- a/tests/translator/output/aws-us-gov/function_with_batch_window.json +++ b/tests/translator/output/aws-us-gov/function_with_event_source_mapping.json @@ -8,6 +8,9 @@ } }, "Resources": { + "MySqsQueue": { + "Type": "AWS::SQS::Queue" + }, "MyFunctionForBatchingExampleStream": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { @@ -24,6 +27,30 @@ "StartingPosition": "LATEST" } }, + "DynamoDBTable": { + "Type": "AWS::DynamoDB::Table", + "Properties": { + "KeySchema": [ + { + "KeyType": "HASH", + "AttributeName": "id" + } + ], + "StreamSpecification": { + "StreamViewType": "NEW_IMAGE" + }, + "AttributeDefinitions": [ + { + "AttributeName": "id", + "AttributeType": "S" + } + ], + "ProvisionedThroughput": { + "WriteCapacityUnits": 5, + "ReadCapacityUnits": 5 + } + } + }, "MyFunctionForBatchingExampleRole": { "Type": "AWS::IAM::Role", "Properties": { @@ -32,6 +59,34 @@ "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole", "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole" ], + "Policies": [ + { + "PolicyName": "MyFunctionForBatchingExampleRolePolicy0", + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "sqs:SendMessage*" + ], + "Resource": { + "Fn::Sub": [ + "arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${queueName}", + { + "queueName": { + "Fn::GetAtt": [ + "MySqsQueue", + "QueueName" + ] + } + } + ] + }, + "Effect": "Allow" + } + ] + } + } + ], "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ @@ -62,17 +117,31 @@ "MaximumBatchingWindowInSeconds": { "Ref": "MyBatchingWindowParam" }, - "BatchSize": 100, "FunctionName": { "Ref": "MyFunctionForBatchingExample" }, - "StartingPosition": "TRIM_HORIZON", + "MaximumRecordAgeInSeconds": 86400, + "BatchSize": 100, + "DestinationConfig": { + "OnFailure": { + "Destination": { + "Fn::GetAtt": [ + "MySqsQueue", + "Arn" + ] + } + } + }, "EventSourceArn": { "Fn::GetAtt": [ "DynamoDBTable", "StreamArn" ] - } + }, + "StartingPosition": "TRIM_HORIZON", + "ParallelizationFactor": 8, + "MaximumRetryAttempts": 100, + "BisectBatchOnFunctionError": true } }, "MyFunctionForBatchingExample": { @@ -97,30 +166,6 @@ ] } }, - "DynamoDBTable": { - "Type": "AWS::DynamoDB::Table", - "Properties": { - "KeySchema": [ - { - "KeyType": "HASH", - "AttributeName": "id" - } - ], - "StreamSpecification": { - "StreamViewType": "NEW_IMAGE" - }, - "AttributeDefinitions": [ - { - "AttributeName": "id", - "AttributeType": "S" - } - ], - "ProvisionedThroughput": { - "WriteCapacityUnits": 5, - "ReadCapacityUnits": 5 - } - } - }, "MyFunctionForBatchingExampleStreamEvent": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { @@ -145,13 +190,5 @@ "ShardCount": 1 } } - }, - "Parameters": { - "MyBatchingWindowParam": { - "Default": 45, - "Type": "Number", - "Description": "parameter for batching window in seconds" - } - }, - "Description": "EventSourceMapping example with MaximumBatchingWindowInSeconds property" + } } \ No newline at end of file diff --git a/tests/translator/output/function_with_batch_window.json b/tests/translator/output/function_with_event_source_mapping.json similarity index 74% rename from tests/translator/output/function_with_batch_window.json rename to tests/translator/output/function_with_event_source_mapping.json index 1e6e928f01..bdc6cb199d 100644 --- a/tests/translator/output/function_with_batch_window.json +++ b/tests/translator/output/function_with_event_source_mapping.json @@ -8,6 +8,9 @@ } }, "Resources": { + "MySqsQueue": { + "Type": "AWS::SQS::Queue" + }, "MyFunctionForBatchingExampleStream": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { @@ -24,6 +27,30 @@ "StartingPosition": "LATEST" } }, + "DynamoDBTable": { + "Type": "AWS::DynamoDB::Table", + "Properties": { + "KeySchema": [ + { + "KeyType": "HASH", + "AttributeName": "id" + } + ], + "StreamSpecification": { + "StreamViewType": "NEW_IMAGE" + }, + "AttributeDefinitions": [ + { + "AttributeName": "id", + "AttributeType": "S" + } + ], + "ProvisionedThroughput": { + "WriteCapacityUnits": 5, + "ReadCapacityUnits": 5 + } + } + }, "MyFunctionForBatchingExampleRole": { "Type": "AWS::IAM::Role", "Properties": { @@ -32,6 +59,34 @@ "arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole", "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole" ], + "Policies": [ + { + "PolicyName": "MyFunctionForBatchingExampleRolePolicy0", + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "sqs:SendMessage*" + ], + "Resource": { + "Fn::Sub": [ + "arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${queueName}", + { + "queueName": { + "Fn::GetAtt": [ + "MySqsQueue", + "QueueName" + ] + } + } + ] + }, + "Effect": "Allow" + } + ] + } + } + ], "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ @@ -62,17 +117,31 @@ "MaximumBatchingWindowInSeconds": { "Ref": "MyBatchingWindowParam" }, - "BatchSize": 100, "FunctionName": { "Ref": "MyFunctionForBatchingExample" }, - "StartingPosition": "TRIM_HORIZON", + "MaximumRecordAgeInSeconds": 86400, + "BatchSize": 100, + "DestinationConfig": { + "OnFailure": { + "Destination": { + "Fn::GetAtt": [ + "MySqsQueue", + "Arn" + ] + } + } + }, "EventSourceArn": { "Fn::GetAtt": [ "DynamoDBTable", "StreamArn" ] - } + }, + "StartingPosition": "TRIM_HORIZON", + "ParallelizationFactor": 8, + "MaximumRetryAttempts": 100, + "BisectBatchOnFunctionError": true } }, "MyFunctionForBatchingExample": { @@ -97,30 +166,6 @@ ] } }, - "DynamoDBTable": { - "Type": "AWS::DynamoDB::Table", - "Properties": { - "KeySchema": [ - { - "KeyType": "HASH", - "AttributeName": "id" - } - ], - "StreamSpecification": { - "StreamViewType": "NEW_IMAGE" - }, - "AttributeDefinitions": [ - { - "AttributeName": "id", - "AttributeType": "S" - } - ], - "ProvisionedThroughput": { - "WriteCapacityUnits": 5, - "ReadCapacityUnits": 5 - } - } - }, "MyFunctionForBatchingExampleStreamEvent": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { @@ -145,13 +190,5 @@ "ShardCount": 1 } } - }, - "Parameters": { - "MyBatchingWindowParam": { - "Default": 45, - "Type": "Number", - "Description": "parameter for batching window in seconds" - } - }, - "Description": "EventSourceMapping example with MaximumBatchingWindowInSeconds property" + } } \ No newline at end of file diff --git a/tests/translator/test_translator.py b/tests/translator/test_translator.py index b29eb843b4..ea6b87d95c 100644 --- a/tests/translator/test_translator.py +++ b/tests/translator/test_translator.py @@ -257,7 +257,7 @@ class TestTranslatorEndToEnd(TestCase): 'api_with_apikey_default_override', 'api_with_apikey_required', 'api_with_path_parameters', - 'function_with_batch_window' + 'function_with_event_source_mapping' ], [ ("aws", "ap-southeast-1"), diff --git a/versions/2016-10-31.md b/versions/2016-10-31.md index 0070e18b77..4cdf8a1e44 100644 --- a/versions/2016-10-31.md +++ b/versions/2016-10-31.md @@ -461,6 +461,14 @@ StartingPosition | `string` | **Required.** One of `TRIM_HORIZON` or `LATEST`. BatchSize | `integer` | Maximum number of stream records to process per function invocation. Enabled | `boolean` | Indicates whether Lambda begins polling the event source. MaximumBatchingWindowInSeconds | `integer` | The maximum amount of time to gather records before invoking the function. +MaximumRetryAttempts | `integer` | The number of times to retry a record before it is bypassed. If an `OnFailure` destination is set, metadata describing the records will be sent to the destination. If no destination is set, the records will be bypassed +BisectBatchOnFunctionError | `boolean` | A boolean flag which determines whether a failed batch will be split in two after a failed invoke. +MaximumRecordAgeInSeconds | `integer` | The maximum age of a record that will be invoked by Lambda. If an `OnFailure` destination is set, metadata describing the records will be sent to the destination. If no destination is set, the records will be bypassed +DestinationConfig | [Destination Config Object](#destination-config-object) | Expired record metadata/retries and exhausted metadata is sent to this destination after they have passed the defined limits. +ParallelizationFactor | `integer` | Allocates multiple virtual shards, increasing the Lambda invokes by the given factor and speeding up the stream processing. + +**NOTE:** `SQSSendMessagePolicy` or `SNSPublishMessagePolicy` needs to be added in `Policies` for publishing messages to the `SQS` or `SNS` resource mentioned in `OnFailure` property + ##### Example: Kinesis event source object @@ -471,7 +479,14 @@ Properties: StartingPosition: TRIM_HORIZON BatchSize: 10 MaximumBatchingWindowInSeconds: 10 - Enabled: false + Enabled: true + ParallelizationFactor: 8 + MaximumRetryAttempts: 100 + BisectBatchOnFunctionError: true + MaximumRecordAgeInSeconds: 604800 + DestinationConfig: + OnFailure: + Destination: !GetAtt MySqsQueue.Arn ``` #### DynamoDB @@ -487,7 +502,13 @@ StartingPosition | `string` | **Required.** One of `TRIM_HORIZON` or `LATEST`. BatchSize | `integer` | Maximum number of stream records to process per function invocation. Enabled | `boolean` | Indicates whether Lambda begins polling the event source. MaximumBatchingWindowInSeconds | `integer` | The maximum amount of time to gather records before invoking the function. +MaximumRetryAttempts | `integer` | The number of times to retry a record before it is bypassed. If an `OnFailure` destination is set, metadata describing the records will be sent to the destination. If no destination is set, the records will be bypassed +BisectBatchOnFunctionError | `boolean` | A boolean flag which determines whether a failed batch will be split in two after a failed invoke. +MaximumRecordAgeInSeconds | `integer` | The maximum age of a record that will be invoked by Lambda. If an `OnFailure` destination is set, metadata describing the records will be sent to the destination. If no destination is set, the records will be bypassed +DestinationConfig | [DestinationConfig Object](destination-config-object) | Expired record metadata/retries and exhausted metadata is sent to this destination after they have passed the defined limits. +ParallelizationFactor | `integer` | Allocates multiple virtual shards, increasing the Lambda invokes by the given factor and speeding up the stream processing. +**NOTE:** `SQSSendMessagePolicy` or `SNSPublishMessagePolicy` needs to be added in `Policies` for publishing messages to the `SQS` or `SNS` resource mentioned in `OnFailure` property ##### Example: DynamoDB event source object ```yaml @@ -498,6 +519,13 @@ Properties: BatchSize: 10 MaximumBatchingWindowInSeconds: 10 Enabled: false + ParallelizationFactor: 8 + MaximumRetryAttempts: 100 + BisectBatchOnFunctionError: true + MaximumRecordAgeInSeconds: 86400 + DestinationConfig: + OnFailure: + Destination: !GetAtt MySqsQueue.Arn ``` #### SQS @@ -522,6 +550,23 @@ Properties: Enabled: false ``` +### Destination Config Object + +Expired record metatadata/retries exhausted metadata is sent to this destination after they have passed the defined limits. + +##### Properties +Property Name | Type | Description +---|:---:|--- +OnFailure | Map of `string` to `string` | On failure all the messages get redirected to the given destination arn. + +**NOTE:** `OnFailure` only supports `Destination` property. + +##### Example +```yaml + DestinationConfig: + OnFailure: + Destination: arn:aws:sqs:us-west-2:012345678901:my-queue +``` #### Api The object describing an event source with type `Api`.