Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/2016-10-31/stream_processor/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Resources:
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Type: SNS
Destination: !Ref MySnsTopic

Stream:
Expand Down
41 changes: 37 additions & 4 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from samtranslator.model.lambda_ import LambdaEventSourceMapping
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.model.exceptions import InvalidEventException
from samtranslator.model.iam import IAMRolePolicies


class PullEventSource(ResourceMacro):
Expand Down Expand Up @@ -74,26 +75,58 @@ def to_cloudformation(self, **kwargs):
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor

destination_config_policy = None
if self.DestinationConfig:
# `Type` property is for sam to attach the right policies
destination_type = self.DestinationConfig.get('OnFailure').get('Type')

# SAM attaches the policies for SQS or SNS only if 'Type' is given
if destination_type:
# the values 'SQS' and 'SNS' are allowed. No intrinsics are allowed
if destination_type not in ['SQS', 'SNS']:
raise InvalidEventException(self.logical_id, "The only valid values for 'Type' are 'SQS' and 'SNS'")
if self.DestinationConfig.get('OnFailure') is None:
raise InvalidEventException(self.logical_id, "'OnFailure' is a required field for "
"'DestinationConfig'")
if destination_type == 'SQS':
queue_arn = self.DestinationConfig.get('OnFailure').get('Destination')
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(queue_arn,
self.logical_id)
elif destination_type == "SNS":
sns_topic_arn = self.DestinationConfig.get('OnFailure').get('Destination')
destination_config_policy = IAMRolePolicies(). sns_publish_role_policy(sns_topic_arn,
self.logical_id)

lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig

if 'Condition' in function.resource_attributes:
lambda_eventsourcemapping.set_resource_attribute('Condition', function.resource_attributes['Condition'])

if 'role' in kwargs:
self._link_policy(kwargs['role'])
self._link_policy(kwargs['role'], destination_config_policy)

return resources

def _link_policy(self, role):
def _link_policy(self, role, destination_config_policy=None):
"""If this source triggers a Lambda function whose execution role is auto-generated by SAM, add the
appropriate managed policy to this Role.

:param model.iam.IAMROle role: the execution role generated for the function
:param model.iam.IAMRole role: the execution role generated for the function
"""
policy_arn = self.get_policy_arn()
if role is not None and policy_arn not in role.ManagedPolicyArns:
role.ManagedPolicyArns.append(policy_arn)
# add SQS or SNS policy only if role is present in kwargs
if role is not None and destination_config_policy is not None and destination_config_policy:
if role.Policies is None:
role.Policies = []
role.Policies.append(destination_config_policy)
if role.Policies and destination_config_policy not in role.Policies:
# do not add the policy if the same policy document is already present
if not destination_config_policy.get('PolicyDocument') in [d['PolicyDocument'] for d in role.Policies]:
role.Policies.append(destination_config_policy)


class Kinesis(PullEventSource):
Expand Down
28 changes: 28 additions & 0 deletions samtranslator/model/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,31 @@ def dead_letter_queue_policy(cls, action, resource):
}]
}
}

@classmethod
def sqs_send_message_role_policy(cls, queue_arn, logical_id):
document = {
'PolicyName': logical_id + 'SQSPolicy',
'PolicyDocument': {
'Statement': [{
'Action': 'sqs:SendMessage',
'Effect': 'Allow',
'Resource': queue_arn
}]
}
}
return document

@classmethod
def sns_publish_role_policy(cls, topic_arn, logical_id):
document = {
'PolicyName': logical_id + 'SNSPolicy',
'PolicyDocument': {
'Statement': [{
'Action': 'sns:publish',
'Effect': 'Allow',
'Resource': topic_arn
}]
}
}
return document
10 changes: 8 additions & 2 deletions tests/translator/input/function_with_event_source_mapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Resources:
Properties:
Handler: index.handler
InlineCode: |

exports.handler = async (event) => {
return {
statusCode: 200,
Expand All @@ -38,6 +37,10 @@ Resources:
Stream: !GetAtt KinesisStream1.Arn
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
StartingPosition: LATEST
DestinationConfig:
OnFailure:
Type: SNS
Destination: !Ref MySnsTopic
DynamoDBStreamEvent:
Type: DynamoDB
Properties:
Expand All @@ -51,6 +54,7 @@ Resources:
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt MySqsQueue.Arn

KinesisStream:
Expand Down Expand Up @@ -78,4 +82,6 @@ Resources:
StreamViewType: NEW_IMAGE

MySqsQueue:
Type: AWS::SQS::Queue
Type: AWS::SQS::Queue
MySnsTopic:
Type: AWS::SNS::Topic
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MySnsTopic": {
"Type": "AWS::SNS::Topic"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand Down Expand Up @@ -85,6 +88,37 @@
}
]
}
},
{
"PolicyName": "MyFunctionForBatchingExampleDynamoDBStreamEventSQSPolicy",
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
}
}
]
}
},
{
"PolicyDocument": {
"Statement": [
{
"Action": "sns:publish",
"Effect": "Allow",
"Resource": {
"Ref": "MySnsTopic"
}
}
]
},
"PolicyName": "MyFunctionForBatchingExampleStreamEventSNSPolicy"
}
],
"Tags": [
Expand Down Expand Up @@ -135,7 +169,8 @@
"MySqsQueue",
"Arn"
]
}
},
"Type": "SQS"
}
},
"EventSourceArn": {
Expand All @@ -155,7 +190,7 @@
"Properties": {
"Handler": "index.handler",
"Code": {
"ZipFile": "\nexports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
"ZipFile": "exports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down Expand Up @@ -187,7 +222,15 @@
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
},
"Type": "SNS"
}
}
}
},
"KinesisStream": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MySnsTopic": {
"Type": "AWS::SNS::Topic"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand Down Expand Up @@ -85,6 +88,37 @@
}
]
}
},
{
"PolicyName": "MyFunctionForBatchingExampleDynamoDBStreamEventSQSPolicy",
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
}
}
]
}
},
{
"PolicyDocument": {
"Statement": [
{
"Action": "sns:publish",
"Effect": "Allow",
"Resource": {
"Ref": "MySnsTopic"
}
}
]
},
"PolicyName": "MyFunctionForBatchingExampleStreamEventSNSPolicy"
}
],
"Tags": [
Expand Down Expand Up @@ -135,7 +169,8 @@
"MySqsQueue",
"Arn"
]
}
},
"Type": "SQS"
}
},
"EventSourceArn": {
Expand All @@ -155,7 +190,7 @@
"Properties": {
"Handler": "index.handler",
"Code": {
"ZipFile": "\nexports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
"ZipFile": "exports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down Expand Up @@ -187,7 +222,15 @@
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
},
"Type": "SNS"
}
}
}
},
"KinesisStream": {
Expand Down
49 changes: 46 additions & 3 deletions tests/translator/output/function_with_event_source_mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MySnsTopic": {
"Type": "AWS::SNS::Topic"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand Down Expand Up @@ -85,6 +88,37 @@
}
]
}
},
{
"PolicyName": "MyFunctionForBatchingExampleDynamoDBStreamEventSQSPolicy",
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
}
}
]
}
},
{
"PolicyDocument": {
"Statement": [
{
"Action": "sns:publish",
"Effect": "Allow",
"Resource": {
"Ref": "MySnsTopic"
}
}
]
},
"PolicyName": "MyFunctionForBatchingExampleStreamEventSNSPolicy"
}
],
"Tags": [
Expand Down Expand Up @@ -135,7 +169,8 @@
"MySqsQueue",
"Arn"
]
}
},
"Type": "SQS"
}
},
"EventSourceArn": {
Expand All @@ -155,7 +190,7 @@
"Properties": {
"Handler": "index.handler",
"Code": {
"ZipFile": "\nexports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
"ZipFile": "exports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down Expand Up @@ -187,7 +222,15 @@
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
},
"Type": "SNS"
}
}
}
},
"KinesisStream": {
Expand Down
Loading