10
10
class PullEventSource (ResourceMacro ):
11
11
"""Base class for pull event sources for SAM Functions.
12
12
13
- The pull events are Kinesis Streams, DynamoDB Streams, Kafka Streams and SQS Queues. All of these correspond to an
13
+ The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, ActiveMQ Queues and SQS Queues. All of these correspond to an
14
14
EventSourceMapping in Lambda, and require that the execution role be given to Kinesis Streams, DynamoDB
15
15
Streams, or SQS Queues, respectively.
16
16
@@ -31,6 +31,9 @@ class PullEventSource(ResourceMacro):
31
31
"DestinationConfig" : PropertyType (False , is_type (dict )),
32
32
"ParallelizationFactor" : PropertyType (False , is_type (int )),
33
33
"Topics" : PropertyType (False , is_type (list )),
34
+ "Broker" : PropertyType (False , is_str ()),
35
+ "Queues" : PropertyType (False , is_type (list )),
36
+ "SourceAccessConfigurations" : PropertyType (False , is_type (list )),
34
37
}
35
38
36
39
def get_policy_arn (self ):
@@ -60,16 +63,17 @@ def to_cloudformation(self, **kwargs):
60
63
except NotImplementedError :
61
64
function_name_or_arn = function .get_runtime_attr ("arn" )
62
65
63
- if not self .Stream and not self .Queue :
66
+ if not self .Stream and not self .Queue and not self . Broker :
64
67
raise InvalidEventException (
65
- self .relative_id , "No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) provided."
68
+ self .relative_id ,
69
+ "No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for ActiveMQ) provided." ,
66
70
)
67
71
68
72
if self .Stream and not self .StartingPosition :
69
73
raise InvalidEventException (self .relative_id , "StartingPosition is required for Kinesis, DynamoDB and MSK." )
70
74
71
75
lambda_eventsourcemapping .FunctionName = function_name_or_arn
72
- lambda_eventsourcemapping .EventSourceArn = self .Stream or self .Queue
76
+ lambda_eventsourcemapping .EventSourceArn = self .Stream or self .Queue or self . Broker
73
77
lambda_eventsourcemapping .StartingPosition = self .StartingPosition
74
78
lambda_eventsourcemapping .BatchSize = self .BatchSize
75
79
lambda_eventsourcemapping .Enabled = self .Enabled
@@ -79,6 +83,8 @@ def to_cloudformation(self, **kwargs):
79
83
lambda_eventsourcemapping .MaximumRecordAgeInSeconds = self .MaximumRecordAgeInSeconds
80
84
lambda_eventsourcemapping .ParallelizationFactor = self .ParallelizationFactor
81
85
lambda_eventsourcemapping .Topics = self .Topics
86
+ lambda_eventsourcemapping .Queues = self .Queues
87
+ lambda_eventsourcemapping .SourceAccessConfigurations = self .SourceAccessConfigurations
82
88
83
89
destination_config_policy = None
84
90
if self .DestinationConfig :
@@ -170,3 +176,12 @@ class MSK(PullEventSource):
170
176
171
177
def get_policy_arn (self ):
172
178
return ArnGenerator .generate_aws_managed_policy_arn ("service-role/AWSLambdaMSKExecutionRole" )
179
+
180
+
181
+ class MQ (PullEventSource ):
182
+ """MQ event source."""
183
+
184
+ resource_type = "MQ"
185
+
186
+ def get_policy_arn (self ):
187
+ return ArnGenerator .generate_aws_managed_policy_arn ("service-role/AWSLambdaAMQExecutionRole" )
0 commit comments