@@ -34,13 +34,17 @@ class PullEventSource(ResourceMacro):
34
34
"Broker" : PropertyType (False , is_str ()),
35
35
"Queues" : PropertyType (False , is_type (list )),
36
36
"SourceAccessConfigurations" : PropertyType (False , is_type (list )),
37
+ "SecretsManagerKmsKeyId" : PropertyType (False , is_str ()),
37
38
"TumblingWindowInSeconds" : PropertyType (False , is_type (int )),
38
39
"FunctionResponseTypes" : PropertyType (False , is_type (list )),
39
40
}
40
41
41
42
def get_policy_arn (self ):
42
43
raise NotImplementedError ("Subclass must implement this method" )
43
44
45
+ def get_policy_statements (self ):
46
+ raise NotImplementedError ("Subclass must implement this method" )
47
+
44
48
def to_cloudformation (self , ** kwargs ):
45
49
"""Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
46
50
policy to the function's execution role, if such a role is provided.
@@ -133,8 +137,17 @@ def _link_policy(self, role, destination_config_policy=None):
133
137
:param model.iam.IAMRole role: the execution role generated for the function
134
138
"""
135
139
policy_arn = self .get_policy_arn ()
136
- if role is not None and policy_arn not in role .ManagedPolicyArns :
137
- role .ManagedPolicyArns .append (policy_arn )
140
+ policy_statements = self .get_policy_statements ()
141
+ if role is not None :
142
+ if policy_arn is not None and policy_arn not in role .ManagedPolicyArns :
143
+ role .ManagedPolicyArns .append (policy_arn )
144
+ if policy_statements is not None :
145
+ if role .Policies is None :
146
+ role .Policies = []
147
+ for policy in policy_statements :
148
+ if policy not in role .Policies :
149
+ if not policy .get ("PolicyDocument" ) in [d ["PolicyDocument" ] for d in role .Policies ]:
150
+ role .Policies .append (policy )
138
151
# add SQS or SNS policy only if role is present in kwargs
139
152
if role is not None and destination_config_policy is not None and destination_config_policy :
140
153
if role .Policies is None :
@@ -154,6 +167,9 @@ class Kinesis(PullEventSource):
154
167
def get_policy_arn (self ):
155
168
return ArnGenerator .generate_aws_managed_policy_arn ("service-role/AWSLambdaKinesisExecutionRole" )
156
169
170
+ def get_policy_statements (self ):
171
+ return None
172
+
157
173
158
174
class DynamoDB (PullEventSource ):
159
175
"""DynamoDB Streams event source."""
@@ -163,6 +179,9 @@ class DynamoDB(PullEventSource):
163
179
def get_policy_arn (self ):
164
180
return ArnGenerator .generate_aws_managed_policy_arn ("service-role/AWSLambdaDynamoDBExecutionRole" )
165
181
182
+ def get_policy_statements (self ):
183
+ return None
184
+
166
185
167
186
class SQS (PullEventSource ):
168
187
"""SQS Queue event source."""
@@ -172,6 +191,9 @@ class SQS(PullEventSource):
172
191
def get_policy_arn (self ):
173
192
return ArnGenerator .generate_aws_managed_policy_arn ("service-role/AWSLambdaSQSQueueExecutionRole" )
174
193
194
+ def get_policy_statements (self ):
195
+ return None
196
+
175
197
176
198
class MSK (PullEventSource ):
177
199
"""MSK event source."""
@@ -181,11 +203,69 @@ class MSK(PullEventSource):
181
203
def get_policy_arn (self ):
182
204
return ArnGenerator .generate_aws_managed_policy_arn ("service-role/AWSLambdaMSKExecutionRole" )
183
205
206
+ def get_policy_statements (self ):
207
+ return None
208
+
184
209
185
210
class MQ (PullEventSource ):
186
211
"""MQ event source."""
187
212
188
213
resource_type = "MQ"
189
214
190
215
def get_policy_arn (self ):
191
- return ArnGenerator .generate_aws_managed_policy_arn ("service-role/AWSLambdaAMQExecutionRole" )
216
+ return None
217
+
218
+ def get_policy_statements (self ):
219
+ if not self .SourceAccessConfigurations :
220
+ raise InvalidEventException (
221
+ self .relative_id ,
222
+ "No SourceAccessConfigurations for ActiveMQ provided." ,
223
+ )
224
+ if not type (self .SourceAccessConfigurations ) is list :
225
+ raise InvalidEventException (
226
+ self .relative_id ,
227
+ "Provided SourceAccessConfigurations cannot be parsed into a list." ,
228
+ )
229
+ # MQ only supports SourceAccessConfigurations with list size of 1
230
+ if not (len (self .SourceAccessConfigurations ) == 1 ):
231
+ raise InvalidEventException (
232
+ self .relative_id ,
233
+ "SourceAccessConfigurations for ActiveMQ only supports single configuration entry." ,
234
+ )
235
+ if not self .SourceAccessConfigurations [0 ].get ("URI" ):
236
+ raise InvalidEventException (
237
+ self .relative_id ,
238
+ "No URI property specified in SourceAccessConfigurations for ActiveMQ." ,
239
+ )
240
+ document = {
241
+ "PolicyName" : "SamAutoGeneratedAMQPolicy" ,
242
+ "PolicyDocument" : {
243
+ "Statement" : [
244
+ {
245
+ "Action" : [
246
+ "secretsmanager:GetSecretValue" ,
247
+ ],
248
+ "Effect" : "Allow" ,
249
+ "Resource" : self .SourceAccessConfigurations [0 ].get ("URI" ),
250
+ },
251
+ {
252
+ "Action" : [
253
+ "mq:DescribeBroker" ,
254
+ ],
255
+ "Effect" : "Allow" ,
256
+ "Resource" : self .Broker ,
257
+ },
258
+ ]
259
+ },
260
+ }
261
+ if self .SecretsManagerKmsKeyId :
262
+ kms_policy = {
263
+ "Action" : "kms:Decrypt" ,
264
+ "Effect" : "Allow" ,
265
+ "Resource" : {
266
+ "Fn::Sub" : "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
267
+ + self .SecretsManagerKmsKeyId
268
+ },
269
+ }
270
+ document ["PolicyDocument" ]["Statement" ].append (kms_policy )
271
+ return [document ]
0 commit comments