Skip to content

Commit b39df65

Browse files
author
Shreya
authored
feat: add sqs and sns policies based on destination config (#1299)
1 parent 5d794c0 commit b39df65

File tree

8 files changed

+223
-22
lines changed

8 files changed

+223
-22
lines changed

examples/2016-10-31/stream_processor/template.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ Resources:
88
Handler: index.handler
99
Runtime: nodejs10.x
1010
CodeUri: src/
11-
Policies:
12-
- SNSPublishMessagePolicy:
13-
TopicName: !GetAtt MySnsTopic.TopicName
1411
Events:
1512
Stream:
1613
Type: Kinesis
@@ -24,6 +21,7 @@ Resources:
2421
StartingPosition: TRIM_HORIZON
2522
DestinationConfig:
2623
OnFailure:
24+
Type: SNS
2725
Destination: !Ref MySnsTopic
2826

2927
Stream:

samtranslator/model/eventsources/pull.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from samtranslator.model.lambda_ import LambdaEventSourceMapping
55
from samtranslator.translator.arn_generator import ArnGenerator
66
from samtranslator.model.exceptions import InvalidEventException
7+
from samtranslator.model.iam import IAMRolePolicies
78

89

910
class PullEventSource(ResourceMacro):
@@ -74,26 +75,58 @@ def to_cloudformation(self, **kwargs):
7475
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
7576
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
7677
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
77-
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
7878
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor
7979

80+
destination_config_policy = None
81+
if self.DestinationConfig:
82+
# `Type` property is for sam to attach the right policies
83+
destination_type = self.DestinationConfig.get('OnFailure').get('Type')
84+
85+
# SAM attaches the policies for SQS or SNS only if 'Type' is given
86+
if destination_type:
87+
# the values 'SQS' and 'SNS' are allowed. No intrinsics are allowed
88+
if destination_type not in ['SQS', 'SNS']:
89+
raise InvalidEventException(self.logical_id, "The only valid values for 'Type' are 'SQS' and 'SNS'")
90+
if self.DestinationConfig.get('OnFailure') is None:
91+
raise InvalidEventException(self.logical_id, "'OnFailure' is a required field for "
92+
"'DestinationConfig'")
93+
if destination_type == 'SQS':
94+
queue_arn = self.DestinationConfig.get('OnFailure').get('Destination')
95+
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(queue_arn,
96+
self.logical_id)
97+
elif destination_type == "SNS":
98+
sns_topic_arn = self.DestinationConfig.get('OnFailure').get('Destination')
99+
destination_config_policy = IAMRolePolicies(). sns_publish_role_policy(sns_topic_arn,
100+
self.logical_id)
101+
102+
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
103+
80104
if 'Condition' in function.resource_attributes:
81105
lambda_eventsourcemapping.set_resource_attribute('Condition', function.resource_attributes['Condition'])
82106

83107
if 'role' in kwargs:
84-
self._link_policy(kwargs['role'])
108+
self._link_policy(kwargs['role'], destination_config_policy)
85109

86110
return resources
87111

88-
def _link_policy(self, role):
112+
def _link_policy(self, role, destination_config_policy=None):
89113
"""If this source triggers a Lambda function whose execution role is auto-generated by SAM, add the
90114
appropriate managed policy to this Role.
91115
92-
:param model.iam.IAMROle role: the execution role generated for the function
116+
:param model.iam.IAMRole role: the execution role generated for the function
93117
"""
94118
policy_arn = self.get_policy_arn()
95119
if role is not None and policy_arn not in role.ManagedPolicyArns:
96120
role.ManagedPolicyArns.append(policy_arn)
121+
# add SQS or SNS policy only if role is present in kwargs
122+
if role is not None and destination_config_policy is not None and destination_config_policy:
123+
if role.Policies is None:
124+
role.Policies = []
125+
role.Policies.append(destination_config_policy)
126+
if role.Policies and destination_config_policy not in role.Policies:
127+
# do not add the policy if the same policy document is already present
128+
if not destination_config_policy.get('PolicyDocument') in [d['PolicyDocument'] for d in role.Policies]:
129+
role.Policies.append(destination_config_policy)
97130

98131

99132
class Kinesis(PullEventSource):

samtranslator/model/iam.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,31 @@ def dead_letter_queue_policy(cls, action, resource):
6363
}]
6464
}
6565
}
66+
67+
@classmethod
68+
def sqs_send_message_role_policy(cls, queue_arn, logical_id):
69+
document = {
70+
'PolicyName': logical_id + 'SQSPolicy',
71+
'PolicyDocument': {
72+
'Statement': [{
73+
'Action': 'sqs:SendMessage',
74+
'Effect': 'Allow',
75+
'Resource': queue_arn
76+
}]
77+
}
78+
}
79+
return document
80+
81+
@classmethod
82+
def sns_publish_role_policy(cls, topic_arn, logical_id):
83+
document = {
84+
'PolicyName': logical_id + 'SNSPolicy',
85+
'PolicyDocument': {
86+
'Statement': [{
87+
'Action': 'sns:publish',
88+
'Effect': 'Allow',
89+
'Resource': topic_arn
90+
}]
91+
}
92+
}
93+
return document

tests/translator/input/function_with_event_source_mapping.yaml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ Resources:
1313
Properties:
1414
Handler: index.handler
1515
InlineCode: |
16-
1716
exports.handler = async (event) => {
1817
return {
1918
statusCode: 200,
@@ -38,6 +37,10 @@ Resources:
3837
Stream: !GetAtt KinesisStream1.Arn
3938
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
4039
StartingPosition: LATEST
40+
DestinationConfig:
41+
OnFailure:
42+
Type: SNS
43+
Destination: !Ref MySnsTopic
4144
DynamoDBStreamEvent:
4245
Type: DynamoDB
4346
Properties:
@@ -51,6 +54,7 @@ Resources:
5154
StartingPosition: TRIM_HORIZON
5255
DestinationConfig:
5356
OnFailure:
57+
Type: SQS
5458
Destination: !GetAtt MySqsQueue.Arn
5559

5660
KinesisStream:
@@ -78,4 +82,6 @@ Resources:
7882
StreamViewType: NEW_IMAGE
7983

8084
MySqsQueue:
81-
Type: AWS::SQS::Queue
85+
Type: AWS::SQS::Queue
86+
MySnsTopic:
87+
Type: AWS::SNS::Topic

tests/translator/output/aws-cn/function_with_event_source_mapping.json

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
"MySqsQueue": {
1212
"Type": "AWS::SQS::Queue"
1313
},
14+
"MySnsTopic": {
15+
"Type": "AWS::SNS::Topic"
16+
},
1417
"MyFunctionForBatchingExampleStream": {
1518
"Type": "AWS::Lambda::EventSourceMapping",
1619
"Properties": {
@@ -85,6 +88,37 @@
8588
}
8689
]
8790
}
91+
},
92+
{
93+
"PolicyName": "MyFunctionForBatchingExampleDynamoDBStreamEventSQSPolicy",
94+
"PolicyDocument": {
95+
"Statement": [
96+
{
97+
"Action": "sqs:SendMessage",
98+
"Effect": "Allow",
99+
"Resource": {
100+
"Fn::GetAtt": [
101+
"MySqsQueue",
102+
"Arn"
103+
]
104+
}
105+
}
106+
]
107+
}
108+
},
109+
{
110+
"PolicyDocument": {
111+
"Statement": [
112+
{
113+
"Action": "sns:publish",
114+
"Effect": "Allow",
115+
"Resource": {
116+
"Ref": "MySnsTopic"
117+
}
118+
}
119+
]
120+
},
121+
"PolicyName": "MyFunctionForBatchingExampleStreamEventSNSPolicy"
88122
}
89123
],
90124
"Tags": [
@@ -135,7 +169,8 @@
135169
"MySqsQueue",
136170
"Arn"
137171
]
138-
}
172+
},
173+
"Type": "SQS"
139174
}
140175
},
141176
"EventSourceArn": {
@@ -155,7 +190,7 @@
155190
"Properties": {
156191
"Handler": "index.handler",
157192
"Code": {
158-
"ZipFile": "\nexports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
193+
"ZipFile": "exports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
159194
},
160195
"Role": {
161196
"Fn::GetAtt": [
@@ -187,7 +222,15 @@
187222
"FunctionName": {
188223
"Ref": "MyFunctionForBatchingExample"
189224
},
190-
"StartingPosition": "LATEST"
225+
"StartingPosition": "LATEST",
226+
"DestinationConfig": {
227+
"OnFailure": {
228+
"Destination": {
229+
"Ref": "MySnsTopic"
230+
},
231+
"Type": "SNS"
232+
}
233+
}
191234
}
192235
},
193236
"KinesisStream": {

tests/translator/output/aws-us-gov/function_with_event_source_mapping.json

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
"MySqsQueue": {
1212
"Type": "AWS::SQS::Queue"
1313
},
14+
"MySnsTopic": {
15+
"Type": "AWS::SNS::Topic"
16+
},
1417
"MyFunctionForBatchingExampleStream": {
1518
"Type": "AWS::Lambda::EventSourceMapping",
1619
"Properties": {
@@ -85,6 +88,37 @@
8588
}
8689
]
8790
}
91+
},
92+
{
93+
"PolicyName": "MyFunctionForBatchingExampleDynamoDBStreamEventSQSPolicy",
94+
"PolicyDocument": {
95+
"Statement": [
96+
{
97+
"Action": "sqs:SendMessage",
98+
"Effect": "Allow",
99+
"Resource": {
100+
"Fn::GetAtt": [
101+
"MySqsQueue",
102+
"Arn"
103+
]
104+
}
105+
}
106+
]
107+
}
108+
},
109+
{
110+
"PolicyDocument": {
111+
"Statement": [
112+
{
113+
"Action": "sns:publish",
114+
"Effect": "Allow",
115+
"Resource": {
116+
"Ref": "MySnsTopic"
117+
}
118+
}
119+
]
120+
},
121+
"PolicyName": "MyFunctionForBatchingExampleStreamEventSNSPolicy"
88122
}
89123
],
90124
"Tags": [
@@ -135,7 +169,8 @@
135169
"MySqsQueue",
136170
"Arn"
137171
]
138-
}
172+
},
173+
"Type": "SQS"
139174
}
140175
},
141176
"EventSourceArn": {
@@ -155,7 +190,7 @@
155190
"Properties": {
156191
"Handler": "index.handler",
157192
"Code": {
158-
"ZipFile": "\nexports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
193+
"ZipFile": "exports.handler = async (event) => {\n return {\n statusCode: 200,\n body: JSON.stringify(event),\n headers: {}\n }\n}\n"
159194
},
160195
"Role": {
161196
"Fn::GetAtt": [
@@ -187,7 +222,15 @@
187222
"FunctionName": {
188223
"Ref": "MyFunctionForBatchingExample"
189224
},
190-
"StartingPosition": "LATEST"
225+
"StartingPosition": "LATEST",
226+
"DestinationConfig": {
227+
"OnFailure": {
228+
"Destination": {
229+
"Ref": "MySnsTopic"
230+
},
231+
"Type": "SNS"
232+
}
233+
}
191234
}
192235
},
193236
"KinesisStream": {

0 commit comments

Comments
 (0)