@@ -21,6 +21,9 @@ class PullEventSource(ResourceMacro):
21
21
:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source
22
22
"""
23
23
24
+ # Event types that support `FilterCriteria`, stored as a list to keep the alphabetical order
25
+ RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DynamoDB" , "Kinesis" , "SQS" ]
26
+
24
27
resource_type = None
25
28
requires_stream_queue_broker = True
26
29
property_types = {
@@ -43,6 +46,7 @@ class PullEventSource(ResourceMacro):
43
46
"TumblingWindowInSeconds" : PropertyType (False , is_type (int )),
44
47
"FunctionResponseTypes" : PropertyType (False , is_type (list )),
45
48
"KafkaBootstrapServers" : PropertyType (False , is_type (list )),
49
+ "FilterCriteria" : PropertyType (False , is_type (dict )),
46
50
}
47
51
48
52
def get_policy_arn (self ):
@@ -102,6 +106,8 @@ def to_cloudformation(self, **kwargs):
102
106
lambda_eventsourcemapping .SourceAccessConfigurations = self .SourceAccessConfigurations
103
107
lambda_eventsourcemapping .TumblingWindowInSeconds = self .TumblingWindowInSeconds
104
108
lambda_eventsourcemapping .FunctionResponseTypes = self .FunctionResponseTypes
109
+ lambda_eventsourcemapping .FilterCriteria = self .FilterCriteria
110
+ self ._validate_filter_criteria ()
105
111
106
112
if self .KafkaBootstrapServers :
107
113
lambda_eventsourcemapping .SelfManagedEventSource = {
@@ -169,6 +175,20 @@ def _link_policy(self, role, destination_config_policy=None):
169
175
if not destination_config_policy .get ("PolicyDocument" ) in [d ["PolicyDocument" ] for d in role .Policies ]:
170
176
role .Policies .append (destination_config_policy )
171
177
178
+ def _validate_filter_criteria (self ):
179
+ if not self .FilterCriteria or is_intrinsic (self .FilterCriteria ):
180
+ return
181
+ if self .resource_type not in self .RESOURCE_TYPES_WITH_EVENT_FILTERING :
182
+ raise InvalidEventException (
183
+ self .relative_id ,
184
+ "FilterCriteria is only available for {} events." .format (
185
+ ", " .join (self .RESOURCE_TYPES_WITH_EVENT_FILTERING )
186
+ ),
187
+ )
188
+ # FilterCriteria is either empty or only has "Filters"
189
+ if list (self .FilterCriteria .keys ()) not in [[], ["Filters" ]]:
190
+ raise InvalidEventException (self .relative_id , "FilterCriteria field has a wrong format" )
191
+
172
192
173
193
class Kinesis (PullEventSource ):
174
194
"""Kinesis event source."""
0 commit comments