Skip to content

Commit b558c13

Browse files
committed
[KIP-848] Added support for testing with new 'consumer' group protocol.
* [KIP-848] Added support for testing with new 'consumer' group protocol. * Build fixes * updated trivup * Updated trivup install path * Fixed failing test * Style fix * Added more tests to be run with the new protocol * Fixed failing tests * Added Test common for common functionalities * Enabling SR again * Style fixes * Some refactoring * Added consumer protocol integration tests in semaphore * Ignoring failing admin tests * Fix typo * Fixed failing test case * Added new fixure for single broker and using this fixure for test_serializer tests * Build fixes * Fixed transiet test failures for proto * Fixed another test * Added Test*Consumer classes instead of functions * Build issue * Added common TestUtils * Using specific commit for trivup * Removed trivup 0.12.5 * PR comments * Style check * Skipping one list offsets assert for Zookeeper * 1) Moved sleep after result and assert. 2) Added a function to create a topic and wait for propogation. * Using create_topic_and_wait_propogation instead of create_topic function * Internally using create_topic in create_topic_and_wait_propogation * Removed kafka single broker cluster fixure * Removed unnecessary import time * Using broker version 3.8.0 for classic protocol and enabled test which was failing in 3.7.0 * Changed fixure scope to session * Style fixes
1 parent c83d3da commit b558c13

31 files changed

+368
-196
lines changed

.semaphore/semaphore.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,20 @@ blocks:
138138
commands:
139139
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
140140
jobs:
141-
- name: Build
141+
- name: Build and Tests with 'classic' group protocol
142+
commands:
143+
- sem-version python 3.8
144+
# use a virtualenv
145+
- python3 -m venv _venv && source _venv/bin/activate
146+
- chmod u+r+x tools/source-package-verification.sh
147+
- tools/source-package-verification.sh
148+
- name: Build and Tests with 'consumer' group protocol
142149
commands:
143150
- sem-version python 3.8
144151
# use a virtualenv
145152
- python3 -m venv _venv && source _venv/bin/activate
146153
- chmod u+r+x tools/source-package-verification.sh
154+
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
147155
- tools/source-package-verification.sh
148156
- name: "Source package verification with Python 3 (Linux arm64)"
149157
dependencies: []

tests/common/__init__.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2024 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
import os
20+
from confluent_kafka import Consumer, DeserializingConsumer
21+
from confluent_kafka.avro import AvroConsumer
22+
23+
_GROUP_PROTOCOL_ENV = 'TEST_CONSUMER_GROUP_PROTOCOL'
24+
_TRIVUP_CLUSTER_TYPE_ENV = 'TEST_TRIVUP_CLUSTER_TYPE'
25+
26+
27+
def _update_conf_group_protocol(conf=None):
28+
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
29+
conf['group.protocol'] = 'consumer'
30+
31+
32+
def _trivup_cluster_type_kraft():
33+
return _TRIVUP_CLUSTER_TYPE_ENV in os.environ and os.environ[_TRIVUP_CLUSTER_TYPE_ENV] == 'kraft'
34+
35+
36+
class TestUtils:
37+
@staticmethod
38+
def use_kraft():
39+
return TestUtils.use_group_protocol_consumer() or _trivup_cluster_type_kraft()
40+
41+
@staticmethod
42+
def use_group_protocol_consumer():
43+
return _GROUP_PROTOCOL_ENV in os.environ and os.environ[_GROUP_PROTOCOL_ENV] == 'consumer'
44+
45+
46+
class TestConsumer(Consumer):
47+
def __init__(self, conf=None, **kwargs):
48+
_update_conf_group_protocol(conf)
49+
super(TestConsumer, self).__init__(conf, **kwargs)
50+
51+
52+
class TestDeserializingConsumer(DeserializingConsumer):
53+
def __init__(self, conf=None, **kwargs):
54+
_update_conf_group_protocol(conf)
55+
super(TestDeserializingConsumer, self).__init__(conf, **kwargs)
56+
57+
58+
class TestAvroConsumer(AvroConsumer):
59+
def __init__(self, conf=None, **kwargs):
60+
_update_conf_group_protocol(conf)
61+
super(TestAvroConsumer, self).__init__(conf, **kwargs)

tests/integration/admin/test_basic_operations.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import confluent_kafka
1716
import struct
1817
import time
19-
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState
18+
19+
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState, KafkaError
2020
from confluent_kafka.admin import (NewPartitions, ConfigResource,
2121
AclBinding, AclBindingFilter, ResourceType,
2222
ResourcePatternType, AclOperation, AclPermissionType)
@@ -58,6 +58,8 @@ def verify_admin_acls(admin_client,
5858
for acl_binding, f in fs.items():
5959
f.result() # trigger exception if there was an error
6060

61+
time.sleep(1)
62+
6163
acl_binding_filter1 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY,
6264
None, None, AclOperation.ANY, AclPermissionType.ANY)
6365
acl_binding_filter2 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.PREFIXED,
@@ -83,6 +85,8 @@ def verify_admin_acls(admin_client,
8385
"Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings,
8486
expected_acl_bindings)
8587

88+
time.sleep(1)
89+
8690
#
8791
# Delete the ACLs with TOPIC and GROUP
8892
#
@@ -94,6 +98,9 @@ def verify_admin_acls(admin_client,
9498
assert deleted_acl_bindings == expected, \
9599
"Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings,
96100
expected)
101+
102+
time.sleep(1)
103+
97104
#
98105
# All the ACLs should have been deleted
99106
#
@@ -201,14 +208,14 @@ def test_basic_operations(kafka_cluster):
201208
# Second iteration: create topic.
202209
#
203210
for validate in (True, False):
204-
our_topic = kafka_cluster.create_topic(topic_prefix,
205-
{
206-
"num_partitions": num_partitions,
207-
"config": topic_config,
208-
"replication_factor": 1,
209-
},
210-
validate_only=validate
211-
)
211+
our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
212+
{
213+
"num_partitions": num_partitions,
214+
"config": topic_config,
215+
"replication_factor": 1,
216+
},
217+
validate_only=validate
218+
)
212219

213220
admin_client = kafka_cluster.admin()
214221

@@ -270,7 +277,7 @@ def consume_messages(group_id, num_messages=None):
270277
print('Read all the required messages: exiting')
271278
break
272279
except ConsumeError as e:
273-
if msg is not None and e.code == confluent_kafka.KafkaError._PARTITION_EOF:
280+
if msg is not None and e.code == KafkaError._PARTITION_EOF:
274281
print('Reached end of %s [%d] at offset %d' % (
275282
msg.topic(), msg.partition(), msg.offset()))
276283
eof_reached[(msg.topic(), msg.partition())] = True
@@ -345,6 +352,8 @@ def verify_config(expconfig, configs):
345352
fs = admin_client.alter_configs([resource])
346353
fs[resource].result() # will raise exception on failure
347354

355+
time.sleep(1)
356+
348357
#
349358
# Read the config back again and verify.
350359
#

tests/integration/admin/test_delete_records.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ def test_delete_records(kafka_cluster):
2525
admin_client = kafka_cluster.admin()
2626

2727
# Create a topic with a single partition
28-
topic = kafka_cluster.create_topic("test-del-records",
29-
{
30-
"num_partitions": 1,
31-
"replication_factor": 1,
32-
})
28+
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
29+
{
30+
"num_partitions": 1,
31+
"replication_factor": 1,
32+
})
3333

3434
# Create Producer instance
3535
p = kafka_cluster.producer()
@@ -73,16 +73,17 @@ def test_delete_records_multiple_topics_and_partitions(kafka_cluster):
7373
admin_client = kafka_cluster.admin()
7474
num_partitions = 3
7575
# Create two topics with a single partition
76-
topic = kafka_cluster.create_topic("test-del-records",
77-
{
78-
"num_partitions": num_partitions,
79-
"replication_factor": 1,
80-
})
81-
topic2 = kafka_cluster.create_topic("test-del-records2",
82-
{
83-
"num_partitions": num_partitions,
84-
"replication_factor": 1,
85-
})
76+
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
77+
{
78+
"num_partitions": num_partitions,
79+
"replication_factor": 1,
80+
})
81+
topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2",
82+
{
83+
"num_partitions": num_partitions,
84+
"replication_factor": 1,
85+
})
86+
8687
topics = [topic, topic2]
8788
partitions = list(range(num_partitions))
8889
# Create Producer instance

tests/integration/admin/test_describe_operations.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import time
1617
import pytest
18+
1719
from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType,
1820
ResourcePatternType, AclOperation, AclPermissionType)
1921
from confluent_kafka.error import ConsumeError
2022
from confluent_kafka import ConsumerGroupState, TopicCollection
2123

24+
from tests.common import TestUtils
25+
2226
topic_prefix = "test-topic"
2327

2428

@@ -82,10 +86,12 @@ def perform_admin_operation_sync(operation, *arg, **kwargs):
8286

8387
def create_acls(admin_client, acl_bindings):
8488
perform_admin_operation_sync(admin_client.create_acls, acl_bindings)
89+
time.sleep(1)
8590

8691

8792
def delete_acls(admin_client, acl_binding_filters):
8893
perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters)
94+
time.sleep(1)
8995

9096

9197
def verify_provided_describe_for_authorized_operations(
@@ -115,6 +121,7 @@ def verify_provided_describe_for_authorized_operations(
115121
acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL,
116122
"User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW)
117123
create_acls(admin_client, [acl_binding])
124+
time.sleep(1)
118125

119126
# Check with updated authorized operations
120127
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
@@ -126,6 +133,7 @@ def verify_provided_describe_for_authorized_operations(
126133
acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY,
127134
None, None, AclOperation.ANY, AclPermissionType.ANY)
128135
delete_acls(admin_client, [acl_binding_filter])
136+
time.sleep(1)
129137
return desc
130138

131139

@@ -196,20 +204,24 @@ def test_describe_operations(sasl_cluster):
196204

197205
# Create Topic
198206
topic_config = {"compression.type": "gzip"}
199-
our_topic = sasl_cluster.create_topic(topic_prefix,
200-
{
201-
"num_partitions": 1,
202-
"config": topic_config,
203-
"replication_factor": 1,
204-
},
205-
validate_only=False
206-
)
207+
our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix,
208+
{
209+
"num_partitions": 1,
210+
"config": topic_config,
211+
"replication_factor": 1,
212+
},
213+
validate_only=False
214+
)
207215

208216
# Verify Authorized Operations in Describe Topics
209217
verify_describe_topics(admin_client, our_topic)
210218

211219
# Verify Authorized Operations in Describe Groups
212-
verify_describe_groups(sasl_cluster, admin_client, our_topic)
220+
# Skip this test if using group protocol `consumer`
221+
# as there is new RPC for describe_groups() in
222+
# group protocol `consumer` case.
223+
if not TestUtils.use_group_protocol_consumer():
224+
verify_describe_groups(sasl_cluster, admin_client, our_topic)
213225

214226
# Delete Topic
215227
perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)

tests/integration/admin/test_incremental_alter_configs.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import time
17+
1618
from confluent_kafka.admin import ConfigResource, \
1719
ConfigEntry, ResourceType, \
1820
AlterConfigOpType
@@ -52,18 +54,18 @@ def test_incremental_alter_configs(kafka_cluster):
5254
num_partitions = 2
5355
topic_config = {"compression.type": "gzip"}
5456

55-
our_topic = kafka_cluster.create_topic(topic_prefix,
56-
{
57-
"num_partitions": num_partitions,
58-
"config": topic_config,
59-
"replication_factor": 1,
60-
})
61-
our_topic2 = kafka_cluster.create_topic(topic_prefix2,
62-
{
63-
"num_partitions": num_partitions,
64-
"config": topic_config,
65-
"replication_factor": 1,
66-
})
57+
our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
58+
{
59+
"num_partitions": num_partitions,
60+
"config": topic_config,
61+
"replication_factor": 1,
62+
})
63+
our_topic2 = kafka_cluster.create_topic_and_wait_propogation(topic_prefix2,
64+
{
65+
"num_partitions": num_partitions,
66+
"config": topic_config,
67+
"replication_factor": 1,
68+
})
6769

6870
admin_client = kafka_cluster.admin()
6971

@@ -103,6 +105,8 @@ def test_incremental_alter_configs(kafka_cluster):
103105

104106
assert_operation_succeeded(fs, 2)
105107

108+
time.sleep(1)
109+
106110
#
107111
# Get current topic config
108112
#
@@ -134,6 +138,8 @@ def test_incremental_alter_configs(kafka_cluster):
134138

135139
assert_operation_succeeded(fs, 1)
136140

141+
time.sleep(1)
142+
137143
#
138144
# Get current topic config
139145
#

tests/integration/admin/test_list_offsets.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec
1716
from confluent_kafka import TopicPartition, IsolationLevel
17+
from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec
1818

1919

2020
def test_list_offsets(kafka_cluster):
@@ -27,11 +27,11 @@ def test_list_offsets(kafka_cluster):
2727
admin_client = kafka_cluster.admin()
2828

2929
# Create a topic with a single partition
30-
topic = kafka_cluster.create_topic("test-topic-verify-list-offsets",
31-
{
32-
"num_partitions": 1,
33-
"replication_factor": 1,
34-
})
30+
topic = kafka_cluster.create_topic_and_wait_propogation("test-topic-verify-list-offsets",
31+
{
32+
"num_partitions": 1,
33+
"replication_factor": 1,
34+
})
3535

3636
# Create Producer instance
3737
p = kafka_cluster.producer()

0 commit comments

Comments
 (0)