Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions examples/2016-10-31/stream_processor/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ Resources:
Handler: index.handler
Runtime: nodejs10.x
CodeUri: src/
Policies:
- SNSPublishMessagePolicy:
TopicName: !GetAtt MySnsTopic.TopicName
Events:
Stream:
Type: Kinesis
Expand All @@ -24,6 +21,7 @@ Resources:
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Type: SNS
Destination: !Ref MySnsTopic

Stream:
Expand Down
41 changes: 37 additions & 4 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from samtranslator.model.lambda_ import LambdaEventSourceMapping
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.model.exceptions import InvalidEventException
from samtranslator.model.iam import IAMRolePolicies


class PullEventSource(ResourceMacro):
Expand Down Expand Up @@ -74,26 +75,58 @@ def to_cloudformation(self, **kwargs):
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor

destination_config_policy = None
if self.DestinationConfig:
# `Type` property is for sam to attach the right policies
destination_type = self.DestinationConfig.get('OnFailure').get('Type')

# SAM attaches the policies for SQS or SNS only if 'Type' is given
if destination_type:
# the values 'SQS' and 'SNS' are allowed. No intrinsics are allowed
if destination_type not in ['SQS', 'SNS']:
raise InvalidEventException(self.logical_id, "The only valid values for 'Type' are 'SQS' and 'SNS'")
if self.DestinationConfig.get('OnFailure') is None:
raise InvalidEventException(self.logical_id, "'OnFailure' is a required field for "
"'DestinationConfig'")
if destination_type == 'SQS':
queue_arn = self.DestinationConfig.get('OnFailure').get('Destination')
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(queue_arn,
self.logical_id)
elif destination_type == "SNS":
sns_topic_arn = self.DestinationConfig.get('OnFailure').get('Destination')
destination_config_policy = IAMRolePolicies(). sns_publish_role_policy(sns_topic_arn,
self.logical_id)

lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig

if 'Condition' in function.resource_attributes:
lambda_eventsourcemapping.set_resource_attribute('Condition', function.resource_attributes['Condition'])

if 'role' in kwargs:
self._link_policy(kwargs['role'])
self._link_policy(kwargs['role'], destination_config_policy)

return resources

def _link_policy(self, role):
def _link_policy(self, role, destination_config_policy=None):
"""If this source triggers a Lambda function whose execution role is auto-generated by SAM, add the
appropriate managed policy to this Role.

:param model.iam.IAMROle role: the execution role generated for the function
:param model.iam.IAMRole role: the execution role generated for the function
"""
policy_arn = self.get_policy_arn()
if role is not None and policy_arn not in role.ManagedPolicyArns:
role.ManagedPolicyArns.append(policy_arn)
# add SQS or SNS policy only if role is present in kwargs
if role is not None and destination_config_policy is not None and destination_config_policy:
if role.Policies is None:
role.Policies = []
role.Policies.append(destination_config_policy)
if role.Policies and destination_config_policy not in role.Policies:
# do not add the policy if the same policy document is already present
if not destination_config_policy.get('PolicyDocument') in [d['PolicyDocument'] for d in role.Policies]:
role.Policies.append(destination_config_policy)


class Kinesis(PullEventSource):
Expand Down
28 changes: 28 additions & 0 deletions samtranslator/model/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,31 @@ def dead_letter_queue_policy(cls, action, resource):
}]
}
}

@classmethod
def sqs_send_message_role_policy(cls, queue_arn, logical_id):
document = {
'PolicyName': logical_id + 'SQSPolicy',
'PolicyDocument': {
'Statement': [{
'Action': 'sqs:SendMessage',
'Effect': 'Allow',
'Resource': queue_arn
}]
}
}
return document

@classmethod
def sns_publish_role_policy(cls, topic_arn, logical_id):
document = {
'PolicyName': logical_id + 'SNSPolicy',
'PolicyDocument': {
'Statement': [{
'Action': 'sns:publish',
'Effect': 'Allow',
'Resource': topic_arn
}]
}
}
return document
10 changes: 8 additions & 2 deletions tests/translator/input/function_with_event_source_mapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Resources:
Properties:
Handler: index.handler
InlineCode: |

exports.handler = async (event) => {
return {
statusCode: 200,
Expand All @@ -38,6 +37,10 @@ Resources:
Stream: !GetAtt KinesisStream1.Arn
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
StartingPosition: LATEST
DestinationConfig:
OnFailure:
Type: SNS
Destination: !Ref MySnsTopic
DynamoDBStreamEvent:
Type: DynamoDB
Properties:
Expand All @@ -51,6 +54,7 @@ Resources:
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt MySqsQueue.Arn

KinesisStream:
Expand Down Expand Up @@ -78,4 +82,6 @@ Resources:
StreamViewType: NEW_IMAGE

MySqsQueue:
Type: AWS::SQS::Queue
Type: AWS::SQS::Queue
MySnsTopic:
Type: AWS::SNS::Topic
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MySnsTopic": {
"Type": "AWS::SNS::Topic"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand Down Expand Up @@ -85,6 +88,37 @@
}
]
}
},
{
"PolicyName": "MyFunctionForBatchingExampleDynamoDBStreamEventSQSPolicy",
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
}
}
]
}
},
{
"PolicyDocument": {
"Statement": [
{
"Action": "sns:publish",
"Effect": "Allow",
"Resource": {
"Ref": "MySnsTopic"
}
}
]
},
"PolicyName": "MyFunctionForBatchingExampleStreamEventSNSPolicy"
}
],
"Tags": [
Expand Down Expand Up @@ -135,7 +169,8 @@
"MySqsQueue",
"Arn"
]
}
},
"Type": "SQS"
}
},
"EventSourceArn": {
Expand All @@ -155,7 +190,7 @@
"Properties": {
"Handler": "index.handler",
"Code": {
"ZipFile": "\nexports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
"ZipFile": "exports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down Expand Up @@ -187,7 +222,15 @@
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
},
"Type": "SNS"
}
}
}
},
"KinesisStream": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MySnsTopic": {
"Type": "AWS::SNS::Topic"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand Down Expand Up @@ -85,6 +88,37 @@
}
]
}
},
{
"PolicyName": "MyFunctionForBatchingExampleDynamoDBStreamEventSQSPolicy",
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
}
}
]
}
},
{
"PolicyDocument": {
"Statement": [
{
"Action": "sns:publish",
"Effect": "Allow",
"Resource": {
"Ref": "MySnsTopic"
}
}
]
},
"PolicyName": "MyFunctionForBatchingExampleStreamEventSNSPolicy"
}
],
"Tags": [
Expand Down Expand Up @@ -135,7 +169,8 @@
"MySqsQueue",
"Arn"
]
}
},
"Type": "SQS"
}
},
"EventSourceArn": {
Expand All @@ -155,7 +190,7 @@
"Properties": {
"Handler": "index.handler",
"Code": {
"ZipFile": "\nexports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
"ZipFile": "exports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down Expand Up @@ -187,7 +222,15 @@
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "LATEST"
"StartingPosition": "LATEST",
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Ref": "MySnsTopic"
},
"Type": "SNS"
}
}
}
},
"KinesisStream": {
Expand Down
Loading