Skip to content

Commit 00c362d

Browse files
feature/ListGroupsAPI KIP 848 (confluentinc#2)
* rebase commit * cherry-picked
1 parent 7c8abdf commit 00c362d

File tree

10 files changed

+185
-22
lines changed

10 files changed

+185
-22
lines changed

examples/adminapi.py

+56-7
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
# Example use of AdminClient operations.
1919

2020
from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
21-
TopicPartition, ConsumerGroupState, TopicCollection,
22-
IsolationLevel)
21+
TopicPartition, ConsumerGroupState,
22+
TopicCollection, IsolationLevel)
2323
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
2424
ConfigEntry, ConfigSource, AclBinding,
2525
AclBindingFilter, ResourceType, ResourcePatternType,
2626
AclOperation, AclPermissionType, AlterConfigOpType,
2727
ScramMechanism, ScramCredentialInfo,
2828
UserScramCredentialUpsertion, UserScramCredentialDeletion,
2929
OffsetSpec)
30+
from confluent_kafka._model import ConsumerGroupType
31+
3032
import sys
3133
import threading
3234
import logging
@@ -471,18 +473,64 @@ def example_list(a, args):
471473
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))
472474

473475

476+
def getConsumerGroupState(state_string):
477+
if state_string == "STABLE":
478+
return ConsumerGroupState.STABLE
479+
elif state_string == "DEAD":
480+
return ConsumerGroupState.DEAD
481+
elif state_string == "PREPARING_REBALANCING":
482+
return ConsumerGroupState.PREPARING_REBALANCING
483+
elif state_string == "COMPLETING_REBALANCING":
484+
return ConsumerGroupState.COMPLETING_REBALANCING
485+
elif state_string == "EMPTY":
486+
return ConsumerGroupState.EMPTY
487+
return ConsumerGroupState.UNKNOWN
488+
489+
490+
def getConsumerGroupType(type_string):
491+
if type_string == "CONSUMER":
492+
return ConsumerGroupType.CONSUMER
493+
elif type_string == "CLASSIC":
494+
return ConsumerGroupType.CLASSIC
495+
return ConsumerGroupType.UNKNOWN
496+
497+
474498
def example_list_consumer_groups(a, args):
475499
"""
476500
List Consumer Groups
477501
"""
478-
states = {ConsumerGroupState[state] for state in args}
479-
future = a.list_consumer_groups(request_timeout=10, states=states)
502+
states = set()
503+
group_types = set()
504+
if len(args) > 0:
505+
isType = False
506+
isState = False
507+
for i in range(0, len(args)):
508+
if (args[i] == "-states"):
509+
if (isState):
510+
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
511+
"[-types <grouptype1> <grouptype2> ..]")
512+
isState = True
513+
elif (args[i] == "-types"):
514+
if (isType):
515+
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
516+
"[-types <grouptype1> <grouptype2> ..]")
517+
isType = True
518+
else:
519+
if (isType):
520+
group_types.add(getConsumerGroupType(args[i]))
521+
elif (isState):
522+
states.add(getConsumerGroupState(args[i]))
523+
else:
524+
raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states <state1> <state2> ..] " +
525+
"[-types <grouptype1> <grouptype2> ..]")
526+
527+
future = a.list_consumer_groups(request_timeout=10, states=states, group_types=group_types)
480528
try:
481529
list_consumer_groups_result = future.result()
482530
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
483531
for valid in list_consumer_groups_result.valid:
484-
print(" id: {} is_simple: {} state: {}".format(
485-
valid.group_id, valid.is_simple_consumer_group, valid.state))
532+
print(" id: {} is_simple: {} state: {} group_type: {}".format(
533+
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.group_type))
486534
print("{} errors".format(len(list_consumer_groups_result.errors)))
487535
for error in list_consumer_groups_result.errors:
488536
print(" error: {}".format(error))
@@ -900,7 +948,8 @@ def example_delete_records(a, args):
900948
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
901949
'<principal1> <host1> <operation1> <permission_type1> ..\n')
902950
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
903-
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
951+
sys.stderr.write(' list_consumer_groups [-states <state1> <state2> ..] ' +
952+
'[-types <grouptype1> <grouptype2> ..]\n')
904953
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
905954
sys.stderr.write(' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n')
906955
sys.stderr.write(' describe_cluster <include_authorized_operations>\n')

src/confluent_kafka/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
'Producer', 'DeserializingConsumer',
4949
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
5050
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
51-
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
51+
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'ConsumerGroupType', 'Uuid',
5252
'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo']
5353

5454
__version__ = version()[0]

src/confluent_kafka/_model/__init__.py

+18
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,24 @@ def __lt__(self, other):
9595
return self.value < other.value
9696

9797

98+
class ConsumerGroupType(Enum):
99+
"""
100+
Enumerates the different types of Consumer Group Type.
101+
102+
"""
103+
#: Type is not known or not set
104+
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
105+
#: Consumer Type
106+
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
107+
#: Classic Type
108+
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC
109+
110+
def __lt__(self, other):
111+
if self.__class__ != other.__class__:
112+
return NotImplemented
113+
return self.value < other.value
114+
115+
98116
class TopicCollection:
99117
"""
100118
Represents collection of topics in the form of different identifiers

src/confluent_kafka/admin/__init__.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
from ._records import DeletedRecords # noqa: F401
5858

59-
from .._model import TopicCollection as _TopicCollection
59+
from .._model import TopicCollection as _TopicCollection, ConsumerGroupType as _ConsumerGroupType
6060

6161
from ..cimpl import (KafkaException, # noqa: F401
6262
KafkaError,
@@ -881,6 +881,8 @@ def list_consumer_groups(self, **kwargs):
881881
on broker, and response. Default: `socket.timeout.ms/1000.0`
882882
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
883883
these states.
884+
:param set(ConsumerGroupType) group_types: only list consumer groups which are currently of
885+
these types.
884886
885887
:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.
886888
@@ -900,6 +902,16 @@ def list_consumer_groups(self, **kwargs):
900902
raise TypeError("All elements of states must be of type ConsumerGroupState")
901903
kwargs["states_int"] = [state.value for state in states]
902904
kwargs.pop("states")
905+
if "group_types" in kwargs:
906+
group_types = kwargs["group_types"]
907+
if group_types is not None:
908+
if not isinstance(group_types, set):
909+
raise TypeError("'group_types' must be a set")
910+
for group_type in group_types:
911+
if not isinstance(group_type, _ConsumerGroupType):
912+
raise TypeError("All elements of group_types must be of type ConsumerGroupType")
913+
kwargs["group_types_int"] = [group_type.value for group_type in group_types]
914+
kwargs.pop("group_types")
903915

904916
f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)
905917

src/confluent_kafka/admin/_group.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
from .._util import ConversionUtil
17-
from .._model import ConsumerGroupState
17+
from .._model import ConsumerGroupState, ConsumerGroupType
1818
from ._acl import AclOperation
1919

2020

@@ -31,13 +31,17 @@ class ConsumerGroupListing:
3131
Whether a consumer group is simple or not.
3232
state : ConsumerGroupState
3333
Current state of the consumer group.
34+
group_type : ConsumerGroupType
35+
Current type of the consumer group.
3436
"""
3537

36-
def __init__(self, group_id, is_simple_consumer_group, state=None):
38+
def __init__(self, group_id, is_simple_consumer_group, state=None, group_type=None):
3739
self.group_id = group_id
3840
self.is_simple_consumer_group = is_simple_consumer_group
3941
if state is not None:
4042
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
43+
if group_type is not None:
44+
self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType)
4145

4246

4347
class ListConsumerGroupsResult:
@@ -119,14 +123,16 @@ class ConsumerGroupDescription:
119123
Partition assignor.
120124
state : ConsumerGroupState
121125
Current state of the consumer group.
126+
group_type : ConsumerGroupType
127+
Current type of the consumer group.
122128
coordinator: Node
123129
Consumer group coordinator.
124130
authorized_operations: list(AclOperation)
125131
AclOperations allowed for the consumer group.
126132
"""
127133

128134
def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state,
129-
coordinator, authorized_operations=None):
135+
coordinator, authorized_operations=None, group_type=None):
130136
self.group_id = group_id
131137
self.is_simple_consumer_group = is_simple_consumer_group
132138
self.members = members
@@ -139,4 +145,7 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign
139145
self.partition_assignor = partition_assignor
140146
if state is not None:
141147
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
148+
if group_type is not None:
149+
self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType)
150+
142151
self.coordinator = coordinator

src/confluent_kafka/src/Admin.c

+50-6
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ struct Admin_options {
8282
rd_kafka_IsolationLevel_t isolation_level;
8383
rd_kafka_consumer_group_state_t* states;
8484
int states_cnt;
85+
rd_kafka_consumer_group_type_t* group_types;
86+
int group_types_cnt;
8587
};
8688

8789
/**@brief "unset" value initializers for Admin_options
@@ -185,6 +187,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
185187
goto err;
186188
}
187189

190+
if (Admin_options_is_set_ptr(options->group_types) &&
191+
(err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types(
192+
c_options, options->group_types, options->group_types_cnt))) {
193+
snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj));
194+
goto err;
195+
}
196+
188197
return c_options;
189198

190199
err:
@@ -1698,24 +1707,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR(
16981707
* @brief List consumer groups
16991708
*/
17001709
PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) {
1701-
PyObject *future, *states_int = NULL;
1710+
PyObject *future, *states_int, *group_types_int = NULL;
17021711
struct Admin_options options = Admin_options_INITIALIZER;
17031712
rd_kafka_AdminOptions_t *c_options = NULL;
17041713
CallState cs;
17051714
rd_kafka_queue_t *rkqu;
17061715
rd_kafka_consumer_group_state_t *c_states = NULL;
1716+
rd_kafka_consumer_group_type_t *c_group_types = NULL;
17071717
int states_cnt = 0;
1718+
int group_types_cnt = 0;
17081719
int i = 0;
17091720

17101721
static char *kws[] = {"future",
17111722
/* options */
17121723
"states_int",
1724+
"group_types_int",
17131725
"request_timeout",
17141726
NULL};
17151727

1716-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws,
1728+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws,
17171729
&future,
17181730
&states_int,
1731+
&group_types_int,
17191732
&options.request_timeout)) {
17201733
goto err;
17211734
}
@@ -1746,6 +1759,32 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17461759
}
17471760
}
17481761

1762+
if(group_types_int != NULL && group_types_int != Py_None) {
1763+
if(!PyList_Check(group_types_int)) {
1764+
PyErr_SetString(PyExc_ValueError,
1765+
"group_types must of type list");
1766+
goto err;
1767+
}
1768+
1769+
group_types_cnt = (int)PyList_Size(group_types_int);
1770+
1771+
if(group_types_cnt > 0) {
1772+
c_group_types = (rd_kafka_consumer_group_type_t *)
1773+
malloc(group_types_cnt*sizeof(rd_kafka_consumer_group_type_t));
1774+
for(i = 0 ; i < group_types_cnt ; i++) {
1775+
PyObject *group_type = PyList_GET_ITEM(group_types_int, i);
1776+
if(!cfl_PyInt_Check(group_type)) {
1777+
PyErr_SetString(PyExc_ValueError,
1778+
"Element of group_types must be a valid group type");
1779+
goto err;
1780+
}
1781+
c_group_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(group_type);
1782+
}
1783+
options.group_types = c_group_types;
1784+
options.group_types_cnt = group_types_cnt;
1785+
}
1786+
}
1787+
17491788
c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS,
17501789
&options, future);
17511790
if (!c_options) {
@@ -1760,7 +1799,6 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17601799
/* Use librdkafka's background thread queue to automatically dispatch
17611800
* Admin_background_event_cb() when the admin operation is finished. */
17621801
rkqu = rd_kafka_queue_get_background(self->rk);
1763-
17641802
/*
17651803
* Call ListConsumerGroupOffsets
17661804
*
@@ -1774,22 +1812,27 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17741812
if(c_states) {
17751813
free(c_states);
17761814
}
1815+
if(c_group_types) {
1816+
free(c_group_types);
1817+
}
17771818
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
17781819
rd_kafka_AdminOptions_destroy(c_options);
1779-
17801820
Py_RETURN_NONE;
17811821
err:
17821822
if(c_states) {
17831823
free(c_states);
17841824
}
1825+
if(c_group_types) {
1826+
free(c_group_types);
1827+
}
17851828
if (c_options) {
17861829
rd_kafka_AdminOptions_destroy(c_options);
17871830
Py_DECREF(future);
17881831
}
17891832
return NULL;
17901833
}
17911834
const char Admin_list_consumer_groups_doc[] = PyDoc_STR(
1792-
".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n"
1835+
".. py:function:: list_consumer_groups(future, [states_int], [group_types_int], [request_timeout])\n"
17931836
"\n"
17941837
" List all the consumer groups.\n"
17951838
"\n"
@@ -3566,7 +3609,6 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(
35663609
size_t valid_cnt,
35673610
const rd_kafka_error_t **c_errors_responses,
35683611
size_t errors_cnt) {
3569-
35703612
PyObject *result = NULL;
35713613
PyObject *ListConsumerGroupsResult_type = NULL;
35723614
PyObject *ConsumerGroupListing_type = NULL;
@@ -3609,6 +3651,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(
36093651

36103652
cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i]));
36113653

3654+
cfl_PyDict_SetInt(kwargs, "group_type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i]));
3655+
36123656
args = PyTuple_New(0);
36133657

36143658
valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs);

src/confluent_kafka/src/AdminTypes.c

+8-1
Original file line numberDiff line numberDiff line change
@@ -570,8 +570,14 @@ static void AdminTypes_AddObjectsConsumerGroupStates (PyObject *m) {
570570
PyModule_AddIntConstant(m, "CONSUMER_GROUP_STATE_EMPTY", RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY);
571571
}
572572

573+
static void AdminTypes_AddObjectsConsumerGroupTypes (PyObject *m) {
574+
/* rd_kafka_consumer_group_type_t */
575+
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_UNKNOWN", RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN);
576+
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CONSUMER", RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER);
577+
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CLASSIC", RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC);
578+
}
579+
573580
static void AdminTypes_AddObjectsAlterConfigOpType (PyObject *m) {
574-
/* rd_kafka_consumer_group_state_t */
575581
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_SET", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET);
576582
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_DELETE", RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE);
577583
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_APPEND", RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND);
@@ -612,6 +618,7 @@ void AdminTypes_AddObjects (PyObject *m) {
612618
AdminTypes_AddObjectsAclOperation(m);
613619
AdminTypes_AddObjectsAclPermissionType(m);
614620
AdminTypes_AddObjectsConsumerGroupStates(m);
621+
AdminTypes_AddObjectsConsumerGroupTypes(m);
615622
AdminTypes_AddObjectsAlterConfigOpType(m);
616623
AdminTypes_AddObjectsScramMechanismType(m);
617624
AdminTypes_AddObjectsIsolationLevel(m);

0 commit comments

Comments
 (0)