Skip to content

[KIP-848 EA] Admin API for listing consumer groups now has #1830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Confluent's Python client for Apache Kafka

## v2.6.0

v2.6.0 is a feature release with the following features, fixes and enhancements:

- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#1830).

confluent-kafka-python is based on librdkafka v2.6.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v2.5.3

v2.5.3 is a maintenance release with the following fixes and enhancements:
Expand Down
46 changes: 39 additions & 7 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
# Example use of AdminClient operations.

from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
TopicPartition, ConsumerGroupState, TopicCollection,
IsolationLevel, ElectionType)
TopicPartition, ConsumerGroupState,
TopicCollection, IsolationLevel,
ConsumerGroupType, ElectionType)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
Expand All @@ -30,6 +31,7 @@
import sys
import threading
import logging
import argparse

logging.basicConfig()

Expand Down Expand Up @@ -471,18 +473,47 @@ def example_list(a, args):
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))


def parse_list_consumer_groups_args(args, states, types):
parser = argparse.ArgumentParser(prog='list_consumer_groups')
parser.add_argument('-states')
parser.add_argument('-types')
parsed_args = parser.parse_args(args)

def usage(message):
print(message)
parser.print_usage()
sys.exit(1)

if parsed_args.states:
for arg in parsed_args.states.split(","):
try:
states.add(ConsumerGroupState[arg])
except KeyError:
usage(f"Invalid state: {arg}")
if parsed_args.types:
for arg in parsed_args.types.split(","):
try:
types.add(ConsumerGroupType[arg])
except KeyError:
usage(f"Invalid type: {arg}")


def example_list_consumer_groups(a, args):
"""
List Consumer Groups
"""
states = {ConsumerGroupState[state] for state in args}
future = a.list_consumer_groups(request_timeout=10, states=states)

states = set()
types = set()
parse_list_consumer_groups_args(args, states, types)

future = a.list_consumer_groups(request_timeout=10, states=states, types=types)
try:
list_consumer_groups_result = future.result()
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
for valid in list_consumer_groups_result.valid:
print(" id: {} is_simple: {} state: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state))
print(" id: {} is_simple: {} state: {} type: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.type))
print("{} errors".format(len(list_consumer_groups_result.errors)))
for error in list_consumer_groups_result.errors:
print(" error: {}".format(error))
Expand Down Expand Up @@ -937,7 +968,8 @@ def example_elect_leaders(a, args):
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
sys.stderr.write(' list_consumer_groups [-states <state1>,<state2>,..] ' +
'[-types <type1>,<type2>,..]\n')
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
sys.stderr.write(' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n')
sys.stderr.write(' describe_cluster <include_authorized_operations>\n')
Expand Down
4 changes: 3 additions & 1 deletion src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._model import (Node, # noqa: F401
ConsumerGroupTopicPartitions,
ConsumerGroupState,
ConsumerGroupType,
TopicCollection,
TopicPartitionInfo,
IsolationLevel,
Expand Down Expand Up @@ -49,7 +50,8 @@
'Producer', 'DeserializingConsumer',
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState',
'ConsumerGroupType', 'Uuid',
'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo']

__version__ = version()[0]
Expand Down
20 changes: 20 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ def __lt__(self, other):
return self.value < other.value


class ConsumerGroupType(Enum):
"""
Enumerates the different types of Consumer Group Type.

Values:
-------
"""
#: Type is not known or not set
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
#: Consumer Type
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
#: Classic Type
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value


class TopicCollection:
"""
Represents collection of topics in the form of different identifiers
Expand Down
13 changes: 13 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from ._records import DeletedRecords # noqa: F401

from .._model import (TopicCollection as _TopicCollection,
ConsumerGroupType as _ConsumerGroupType,
ElectionType as _ElectionType)

from ..cimpl import (KafkaException, # noqa: F401
Expand Down Expand Up @@ -898,6 +899,8 @@ def list_consumer_groups(self, **kwargs):
on broker, and response. Default: `socket.timeout.ms/1000.0`
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
these states.
:param set(ConsumerGroupType) types: only list consumer groups of
these types.

:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.

Expand All @@ -917,6 +920,16 @@ def list_consumer_groups(self, **kwargs):
raise TypeError("All elements of states must be of type ConsumerGroupState")
kwargs["states_int"] = [state.value for state in states]
kwargs.pop("states")
if "types" in kwargs:
types = kwargs["types"]
if types is not None:
if not isinstance(types, set):
raise TypeError("'types' must be a set")
for type in types:
if not isinstance(type, _ConsumerGroupType):
raise TypeError("All elements of types must be of type ConsumerGroupType")
kwargs["types_int"] = [type.value for type in types]
kwargs.pop("types")

f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)

Expand Down
8 changes: 6 additions & 2 deletions src/confluent_kafka/admin/_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from .._util import ConversionUtil
from .._model import ConsumerGroupState
from .._model import ConsumerGroupState, ConsumerGroupType
from ._acl import AclOperation


Expand All @@ -31,13 +31,17 @@ class ConsumerGroupListing:
Whether a consumer group is simple or not.
state : ConsumerGroupState
Current state of the consumer group.
type : ConsumerGroupType
Type of the consumer group.
"""

def __init__(self, group_id, is_simple_consumer_group, state=None):
def __init__(self, group_id, is_simple_consumer_group, state=None, type=None):
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if type is not None:
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)


class ListConsumerGroupsResult:
Expand Down
59 changes: 54 additions & 5 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ struct Admin_options {
rd_kafka_IsolationLevel_t isolation_level;
rd_kafka_consumer_group_state_t* states;
int states_cnt;
rd_kafka_consumer_group_type_t* types;
int types_cnt;
};

/**@brief "unset" value initializers for Admin_options
Expand All @@ -96,6 +98,8 @@ struct Admin_options {
Admin_options_def_int, \
Admin_options_def_ptr, \
Admin_options_def_cnt, \
Admin_options_def_ptr, \
Admin_options_def_cnt, \
}

#define Admin_options_is_set_int(v) ((v) != Admin_options_def_int)
Expand Down Expand Up @@ -185,6 +189,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
goto err;
}

if (Admin_options_is_set_ptr(options->types) &&
(err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types(
c_options, options->types, options->types_cnt))) {
snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj));
goto err;
}

return c_options;

err:
Expand Down Expand Up @@ -1698,24 +1709,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR(
* @brief List consumer groups
*/
PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *future, *states_int = NULL;
PyObject *future, *states_int = NULL, *types_int = NULL;
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
CallState cs;
rd_kafka_queue_t *rkqu;
rd_kafka_consumer_group_state_t *c_states = NULL;
rd_kafka_consumer_group_type_t *c_types = NULL;
int states_cnt = 0;
int types_cnt = 0;
int i = 0;

static char *kws[] = {"future",
/* options */
"states_int",
"types_int",
"request_timeout",
NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws,
&future,
&states_int,
&types_int,
&options.request_timeout)) {
goto err;
}
Expand All @@ -1736,7 +1751,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
PyObject *state = PyList_GET_ITEM(states_int, i);
if(!cfl_PyInt_Check(state)) {
PyErr_SetString(PyExc_ValueError,
"Element of states must be a valid state");
"Element of states must be valid states");
goto err;
}
c_states[i] = (rd_kafka_consumer_group_state_t) cfl_PyInt_AsInt(state);
Expand All @@ -1746,6 +1761,33 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
}
}

if(types_int != NULL && types_int != Py_None) {
if(!PyList_Check(types_int)) {
PyErr_SetString(PyExc_ValueError,
"types must of type list");
goto err;
}

types_cnt = (int)PyList_Size(types_int);

if(types_cnt > 0) {
c_types = (rd_kafka_consumer_group_type_t *)
malloc(types_cnt *
sizeof(rd_kafka_consumer_group_type_t));
for(i = 0 ; i < types_cnt ; i++) {
PyObject *type = PyList_GET_ITEM(types_int, i);
if(!cfl_PyInt_Check(type)) {
PyErr_SetString(PyExc_ValueError,
"Element of types must be valid group types");
goto err;
}
c_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(type);
}
options.types = c_types;
options.types_cnt = types_cnt;
}
}

c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS,
&options, future);
if (!c_options) {
Expand Down Expand Up @@ -1774,22 +1816,27 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
if(c_states) {
free(c_states);
}
if(c_types) {
free(c_types);
}
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
rd_kafka_AdminOptions_destroy(c_options);

Py_RETURN_NONE;
err:
if(c_states) {
free(c_states);
}
if(c_types) {
free(c_types);
}
if (c_options) {
rd_kafka_AdminOptions_destroy(c_options);
Py_DECREF(future);
}
return NULL;
}
const char Admin_list_consumer_groups_doc[] = PyDoc_STR(
".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n"
".. py:function:: list_consumer_groups(future, [states_int], [types_int], [request_timeout])\n"
"\n"
" List all the consumer groups.\n"
"\n"
Expand Down Expand Up @@ -3711,6 +3758,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(

cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i]));

cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i]));

args = PyTuple_New(0);

valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs);
Expand Down
9 changes: 8 additions & 1 deletion src/confluent_kafka/src/AdminTypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,14 @@ static void AdminTypes_AddObjectsConsumerGroupStates (PyObject *m) {
PyModule_AddIntConstant(m, "CONSUMER_GROUP_STATE_EMPTY", RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY);
}

static void AdminTypes_AddObjectsConsumerGroupTypes (PyObject *m) {
/* rd_kafka_consumer_group_type_t */
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_UNKNOWN", RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN);
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CONSUMER", RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER);
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CLASSIC", RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC);
}

static void AdminTypes_AddObjectsAlterConfigOpType (PyObject *m) {
/* rd_kafka_consumer_group_state_t */
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_SET", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET);
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_DELETE", RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE);
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_APPEND", RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND);
Expand Down Expand Up @@ -620,6 +626,7 @@ void AdminTypes_AddObjects (PyObject *m) {
AdminTypes_AddObjectsAclOperation(m);
AdminTypes_AddObjectsAclPermissionType(m);
AdminTypes_AddObjectsConsumerGroupStates(m);
AdminTypes_AddObjectsConsumerGroupTypes(m);
AdminTypes_AddObjectsAlterConfigOpType(m);
AdminTypes_AddObjectsScramMechanismType(m);
AdminTypes_AddObjectsIsolationLevel(m);
Expand Down