Skip to content

Commit 1551f01

Browse files
rebase commit
1 parent a6d2e1e commit 1551f01

31 files changed

+478
-172
lines changed

.semaphore/semaphore.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,20 @@ blocks:
138138
commands:
139139
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
140140
jobs:
141-
- name: Build
141+
- name: Build and Tests with 'classic' group protocol
142+
commands:
143+
- sem-version python 3.8
144+
# use a virtualenv
145+
- python3 -m venv _venv && source _venv/bin/activate
146+
- chmod u+r+x tools/source-package-verification.sh
147+
- tools/source-package-verification.sh
148+
- name: Build and Tests with 'consumer' group protocol
142149
commands:
143150
- sem-version python 3.8
144151
# use a virtualenv
145152
- python3 -m venv _venv && source _venv/bin/activate
146153
- chmod u+r+x tools/source-package-verification.sh
154+
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
147155
- tools/source-package-verification.sh
148156
- name: "Source package verification with Python 3 (Linux arm64)"
149157
dependencies: []

examples/adminapi.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
# Example use of AdminClient operations.
1919

2020
from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
21-
TopicPartition, ConsumerGroupState, TopicCollection,
22-
IsolationLevel)
21+
TopicPartition, ConsumerGroupState,
22+
TopicCollection, IsolationLevel)
2323
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
2424
ConfigEntry, ConfigSource, AclBinding,
2525
AclBindingFilter, ResourceType, ResourcePatternType,
2626
AclOperation, AclPermissionType, AlterConfigOpType,
2727
ScramMechanism, ScramCredentialInfo,
2828
UserScramCredentialUpsertion, UserScramCredentialDeletion,
2929
OffsetSpec)
30+
from confluent_kafka._model import ConsumerGroupType
31+
3032
import sys
3133
import threading
3234
import logging
@@ -471,18 +473,64 @@ def example_list(a, args):
471473
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))
472474

473475

476+
def getConsumerGroupState(state_string):
477+
if state_string == "STABLE":
478+
return ConsumerGroupState.STABLE
479+
elif state_string == "DEAD":
480+
return ConsumerGroupState.DEAD
481+
elif state_string == "PREPARING_REBALANCING":
482+
return ConsumerGroupState.PREPARING_REBALANCING
483+
elif state_string == "COMPLETING_REBALANCING":
484+
return ConsumerGroupState.COMPLETING_REBALANCING
485+
elif state_string == "EMPTY":
486+
return ConsumerGroupState.EMPTY
487+
return ConsumerGroupState.UNKNOWN
488+
489+
490+
def getConsumerGroupType(type_string):
491+
if type_string == "CONSUMER":
492+
return ConsumerGroupType.CONSUMER
493+
elif type_string == "CLASSIC":
494+
return ConsumerGroupType.CLASSIC
495+
return ConsumerGroupType.UNKNOWN
496+
497+
474498
def example_list_consumer_groups(a, args):
475499
"""
476500
List Consumer Groups
477501
"""
478-
states = {ConsumerGroupState[state] for state in args}
479-
future = a.list_consumer_groups(request_timeout=10, states=states)
502+
states = set()
503+
group_types = set()
504+
if len(args) > 0:
505+
isType = False
506+
isState = False
507+
for i in range(0, len(args)):
508+
if (args[i] == "-states"):
509+
if (isState):
510+
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
511+
"[-types <grouptype1> <grouptype2> ..]")
512+
isState = True
513+
elif (args[i] == "-types"):
514+
if (isType):
515+
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
516+
"[-types <grouptype1> <grouptype2> ..]")
517+
isType = True
518+
else:
519+
if (isType):
520+
group_types.add(getConsumerGroupType(args[i]))
521+
elif (isState):
522+
states.add(getConsumerGroupState(args[i]))
523+
else:
524+
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
525+
"[-types <grouptype1> <grouptype2> ..]")
526+
527+
future = a.list_consumer_groups(request_timeout=10, states=states, group_types=group_types)
480528
try:
481529
list_consumer_groups_result = future.result()
482530
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
483531
for valid in list_consumer_groups_result.valid:
484-
print(" id: {} is_simple: {} state: {}".format(
485-
valid.group_id, valid.is_simple_consumer_group, valid.state))
532+
print(" id: {} is_simple: {} state: {} group_type: {}".format(
533+
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.group_type))
486534
print("{} errors".format(len(list_consumer_groups_result.errors)))
487535
for error in list_consumer_groups_result.errors:
488536
print(" error: {}".format(error))
@@ -867,7 +915,8 @@ def example_list_offsets(a, args):
867915
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
868916
'<principal1> <host1> <operation1> <permission_type1> ..\n')
869917
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
870-
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
918+
sys.stderr.write(' list_consumer_groups [-states <state1> <state2> ..] ' +
919+
'[-types <grouptype1> <grouptype2> ..]\n')
871920
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
872921
sys.stderr.write(' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n')
873922
sys.stderr.write(' describe_cluster <include_authorized_operations>\n')

src/confluent_kafka/_model/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,24 @@ def __lt__(self, other):
9595
return self.value < other.value
9696

9797

98+
class ConsumerGroupType(Enum):
99+
"""
100+
Enumerates the different types of Consumer Group Type.
101+
102+
"""
103+
#: Type is not known or not set
104+
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
105+
#: Consumer Type
106+
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
107+
#: Classic Type
108+
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC
109+
110+
def __lt__(self, other):
111+
if self.__class__ != other.__class__:
112+
return NotImplemented
113+
return self.value < other.value
114+
115+
98116
class TopicCollection:
99117
"""
100118
Represents collection of topics in the form of different identifiers

src/confluent_kafka/admin/__init__.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
from ._listoffsets import (OffsetSpec, # noqa: F401
5555
ListOffsetsResultInfo)
5656

57-
from .._model import TopicCollection as _TopicCollection
57+
from .._model import TopicCollection as _TopicCollection, ConsumerGroupType as _ConsumerGroupType
5858

5959
from ..cimpl import (KafkaException, # noqa: F401
6060
KafkaError,
@@ -864,6 +864,8 @@ def list_consumer_groups(self, **kwargs):
864864
on broker, and response. Default: `socket.timeout.ms*1000.0`
865865
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
866866
these states.
867+
:param set(ConsumerGroupType) group_types: only list consumer groups which are currently of
868+
these types.
867869
868870
:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.
869871
@@ -883,6 +885,16 @@ def list_consumer_groups(self, **kwargs):
883885
raise TypeError("All elements of states must be of type ConsumerGroupState")
884886
kwargs["states_int"] = [state.value for state in states]
885887
kwargs.pop("states")
888+
if "group_types" in kwargs:
889+
group_types = kwargs["group_types"]
890+
if group_types is not None:
891+
if not isinstance(group_types, set):
892+
raise TypeError("'group_types' must be a set")
893+
for group_type in group_types:
894+
if not isinstance(group_type, _ConsumerGroupType):
895+
raise TypeError("All elements of group_types must be of type ConsumerGroupType")
896+
kwargs["group_types_int"] = [group_type.value for group_type in group_types]
897+
kwargs.pop("group_types")
886898

887899
f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)
888900

src/confluent_kafka/admin/_group.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
from .._util import ConversionUtil
17-
from .._model import ConsumerGroupState
17+
from .._model import ConsumerGroupState, ConsumerGroupType
1818
from ._acl import AclOperation
1919

2020

@@ -31,13 +31,17 @@ class ConsumerGroupListing:
3131
Whether a consumer group is simple or not.
3232
state : ConsumerGroupState
3333
Current state of the consumer group.
34+
group_type : ConsumerGroupType
35+
Current type of the consumer group.
3436
"""
3537

36-
def __init__(self, group_id, is_simple_consumer_group, state=None):
38+
def __init__(self, group_id, is_simple_consumer_group, state=None, group_type=None):
3739
self.group_id = group_id
3840
self.is_simple_consumer_group = is_simple_consumer_group
3941
if state is not None:
4042
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
43+
if group_type is not None:
44+
self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType)
4145

4246

4347
class ListConsumerGroupsResult:
@@ -119,14 +123,16 @@ class ConsumerGroupDescription:
119123
Partition assignor.
120124
state : ConsumerGroupState
121125
Current state of the consumer group.
126+
group_type : ConsumerGroupType
127+
Current type of the consumer group.
122128
coordinator: Node
123129
Consumer group coordinator.
124130
authorized_operations: list(AclOperation)
125131
AclOperations allowed for the consumer group.
126132
"""
127133

128134
def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,
129-
coordinator, authorized_operations=None):
135+
coordinator, authorized_operations=None, group_type=None):
130136
self.group_id = group_id
131137
self.is_simple_consumer_group = is_simple_consumer_group
132138
self.members = members
@@ -139,4 +145,7 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign
139145
self.partition_assignor = partition_assignor
140146
if state is not None:
141147
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
148+
if group_type is not None:
149+
self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType)
150+
142151
self.coordinator = coordinator

src/confluent_kafka/src/Admin.c

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ struct Admin_options {
8282
rd_kafka_IsolationLevel_t isolation_level;
8383
rd_kafka_consumer_group_state_t* states;
8484
int states_cnt;
85+
rd_kafka_consumer_group_type_t* group_types;
86+
int group_types_cnt;
8587
};
8688

8789
/**@brief "unset" value initializers for Admin_options
@@ -185,6 +187,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
185187
goto err;
186188
}
187189

190+
if (Admin_options_is_set_ptr(options->group_types) &&
191+
(err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types(
192+
c_options, options->group_types, options->group_types_cnt))) {
193+
snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj));
194+
goto err;
195+
}
196+
188197
return c_options;
189198

190199
err:
@@ -1698,24 +1707,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR(
16981707
* @brief List consumer groups
16991708
*/
17001709
PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) {
1701-
PyObject *future, *states_int = NULL;
1710+
PyObject *future, *states_int, *group_types_int = NULL;
17021711
struct Admin_options options = Admin_options_INITIALIZER;
17031712
rd_kafka_AdminOptions_t *c_options = NULL;
17041713
CallState cs;
17051714
rd_kafka_queue_t *rkqu;
17061715
rd_kafka_consumer_group_state_t *c_states = NULL;
1716+
rd_kafka_consumer_group_type_t *c_group_types = NULL;
17071717
int states_cnt = 0;
1718+
int group_types_cnt = 0;
17081719
int i = 0;
17091720

17101721
static char *kws[] = {"future",
17111722
/* options */
17121723
"states_int",
1724+
"group_types_int",
17131725
"request_timeout",
17141726
NULL};
17151727

1716-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws,
1728+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws,
17171729
&future,
17181730
&states_int,
1731+
&group_types_int,
17191732
&options.request_timeout)) {
17201733
goto err;
17211734
}
@@ -1746,6 +1759,32 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17461759
}
17471760
}
17481761

1762+
if(group_types_int != NULL && group_types_int != Py_None) {
1763+
if(!PyList_Check(group_types_int)) {
1764+
PyErr_SetString(PyExc_ValueError,
1765+
"group_types must of type list");
1766+
goto err;
1767+
}
1768+
1769+
group_types_cnt = (int)PyList_Size(group_types_int);
1770+
1771+
if(group_types_cnt > 0) {
1772+
c_group_types = (rd_kafka_consumer_group_type_t *)
1773+
malloc(group_types_cnt*sizeof(rd_kafka_consumer_group_type_t));
1774+
for(i = 0 ; i < group_types_cnt ; i++) {
1775+
PyObject *group_type = PyList_GET_ITEM(group_types_int, i);
1776+
if(!cfl_PyInt_Check(group_type)) {
1777+
PyErr_SetString(PyExc_ValueError,
1778+
"Element of group_types must be a valid group type");
1779+
goto err;
1780+
}
1781+
c_group_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(group_type);
1782+
}
1783+
options.group_types = c_group_types;
1784+
options.group_types_cnt = group_types_cnt;
1785+
}
1786+
}
1787+
17491788
c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS,
17501789
&options, future);
17511790
if (!c_options) {
@@ -1760,7 +1799,6 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17601799
/* Use librdkafka's background thread queue to automatically dispatch
17611800
* Admin_background_event_cb() when the admin operation is finished. */
17621801
rkqu = rd_kafka_queue_get_background(self->rk);
1763-
17641802
/*
17651803
* Call ListConsumerGroupOffsets
17661804
*
@@ -1774,22 +1812,27 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17741812
if(c_states) {
17751813
free(c_states);
17761814
}
1815+
if(c_group_types) {
1816+
free(c_group_types);
1817+
}
17771818
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
17781819
rd_kafka_AdminOptions_destroy(c_options);
1779-
17801820
Py_RETURN_NONE;
17811821
err:
17821822
if(c_states) {
17831823
free(c_states);
17841824
}
1825+
if(c_group_types) {
1826+
free(c_group_types);
1827+
}
17851828
if (c_options) {
17861829
rd_kafka_AdminOptions_destroy(c_options);
17871830
Py_DECREF(future);
17881831
}
17891832
return NULL;
17901833
}
17911834
const char Admin_list_consumer_groups_doc[] = PyDoc_STR(
1792-
".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n"
1835+
".. py:function:: list_consumer_groups(future, [states_int], [group_types_int], [request_timeout])\n"
17931836
"\n"
17941837
" List all the consumer groups.\n"
17951838
"\n"
@@ -3466,7 +3509,6 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(
34663509
size_t valid_cnt,
34673510
const rd_kafka_error_t **c_errors_responses,
34683511
size_t errors_cnt) {
3469-
34703512
PyObject *result = NULL;
34713513
PyObject *ListConsumerGroupsResult_type = NULL;
34723514
PyObject *ConsumerGroupListing_type = NULL;
@@ -3509,6 +3551,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(
35093551

35103552
cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i]));
35113553

3554+
cfl_PyDict_SetInt(kwargs, "group_type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i]));
3555+
35123556
args = PyTuple_New(0);
35133557

35143558
valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs);

0 commit comments

Comments
 (0)