@@ -46,6 +46,7 @@ class PullEventSource(ResourceMacro):
46
46
"FunctionResponseTypes" : PropertyType (False , is_type (list )),
47
47
"KafkaBootstrapServers" : PropertyType (False , is_type (list )),
48
48
"FilterCriteria" : PropertyType (False , is_type (dict )),
49
+ "ConsumerGroupId" : PropertyType (False , is_str ()),
49
50
}
50
51
51
52
def get_policy_arn (self ):
@@ -112,6 +113,17 @@ def to_cloudformation(self, **kwargs):
112
113
lambda_eventsourcemapping .SelfManagedEventSource = {
113
114
"Endpoints" : {"KafkaBootstrapServers" : self .KafkaBootstrapServers }
114
115
}
116
+ if self .ConsumerGroupId :
117
+ consumer_group_id_structure = {"ConsumerGroupId" : self .ConsumerGroupId }
118
+ if self .resource_type == "MSK" :
119
+ lambda_eventsourcemapping .AmazonManagedKafkaConfig = consumer_group_id_structure
120
+ elif self .resource_type == "SelfManagedKafka" :
121
+ lambda_eventsourcemapping .SelfManagedKafkaConfig = consumer_group_id_structure
122
+ else :
123
+ raise InvalidEventException (
124
+ self .logical_id ,
125
+ "Property ConsumerGroupId not defined for resource of type {}." .format (self .resource_type ),
126
+ )
115
127
116
128
destination_config_policy = None
117
129
if self .DestinationConfig :
0 commit comments