diff --git a/CHANGELOG.md b/CHANGELOG.md index c9857786d..cef8da6fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Confluent's Python client for Apache Kafka +## v2.6.0 + +v2.6.0 is a feature release with the following features, fixes and enhancements: + +- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#1830). + +confluent-kafka-python is based on librdkafka v2.6.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. + + ## v2.5.3 v2.5.3 is a maintenance release with the following fixes and enhancements: diff --git a/examples/adminapi.py b/examples/adminapi.py index 2ce36e1e1..54f119e02 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -18,8 +18,9 @@ # Example use of AdminClient operations. from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions, - TopicPartition, ConsumerGroupState, TopicCollection, - IsolationLevel, ElectionType) + TopicPartition, ConsumerGroupState, + TopicCollection, IsolationLevel, + ConsumerGroupType, ElectionType) from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigEntry, ConfigSource, AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, @@ -30,6 +31,7 @@ import sys import threading import logging +import argparse logging.basicConfig() @@ -471,18 +473,47 @@ def example_list(a, args): print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host)) +def parse_list_consumer_groups_args(args, states, types): + parser = argparse.ArgumentParser(prog='list_consumer_groups') + parser.add_argument('-states') + parser.add_argument('-types') + parsed_args = parser.parse_args(args) + + def usage(message): + print(message) + parser.print_usage() + sys.exit(1) + + if parsed_args.states: + for arg in parsed_args.states.split(","): + try: + states.add(ConsumerGroupState[arg]) + except KeyError: + usage(f"Invalid state: {arg}") + if parsed_args.types: + for arg in parsed_args.types.split(","): + try: + types.add(ConsumerGroupType[arg]) + except KeyError: + usage(f"Invalid type: {arg}") + + def example_list_consumer_groups(a, args): """ List Consumer Groups """ - states = {ConsumerGroupState[state] for state in args} - future = a.list_consumer_groups(request_timeout=10, states=states) + + states = set() + types = set() + parse_list_consumer_groups_args(args, states, types) + + future = a.list_consumer_groups(request_timeout=10, states=states, types=types) try: list_consumer_groups_result = future.result() print("{} consumer groups".format(len(list_consumer_groups_result.valid))) for valid in list_consumer_groups_result.valid: - print(" id: {} is_simple: {} state: {}".format( - valid.group_id, valid.is_simple_consumer_group, valid.state)) + print(" id: {} is_simple: {} state: {} type: {}".format( + valid.group_id, valid.is_simple_consumer_group, valid.state, valid.type)) print("{} errors".format(len(list_consumer_groups_result.errors))) for error in list_consumer_groups_result.errors: print(" error: {}".format(error)) @@ -937,7 +968,8 @@ def example_elect_leaders(a, args): sys.stderr.write(' delete_acls ' + ' ..\n') sys.stderr.write(' list []\n') - sys.stderr.write(' list_consumer_groups [ ..]\n') + sys.stderr.write(' list_consumer_groups [-states ,,..] ' + + '[-types ,,..]\n') sys.stderr.write(' describe_consumer_groups ..\n') sys.stderr.write(' describe_topics ..\n') sys.stderr.write(' describe_cluster \n') diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index f05a0450b..05c813795 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -22,6 +22,7 @@ from ._model import (Node, # noqa: F401 ConsumerGroupTopicPartitions, ConsumerGroupState, + ConsumerGroupType, TopicCollection, TopicPartitionInfo, IsolationLevel, @@ -49,7 +50,8 @@ 'Producer', 'DeserializingConsumer', 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', - 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid', + 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', + 'ConsumerGroupType', 'Uuid', 'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo'] __version__ = version()[0] diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 77c9d536a..55e624c22 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -95,6 +95,26 @@ def __lt__(self, other): return self.value < other.value +class ConsumerGroupType(Enum): + """ + Enumerates the different types of Consumer Group Type. + + Values: + ------- + """ + #: Type is not known or not set + UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN + #: Consumer Type + CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER + #: Classic Type + CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value + + class TopicCollection: """ Represents collection of topics in the form of different identifiers diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 14c1e701a..35b3ea719 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -57,6 +57,7 @@ from ._records import DeletedRecords # noqa: F401 from .._model import (TopicCollection as _TopicCollection, + ConsumerGroupType as _ConsumerGroupType, ElectionType as _ElectionType) from ..cimpl import (KafkaException, # noqa: F401 @@ -898,6 +899,8 @@ def list_consumer_groups(self, **kwargs): on broker, and response. Default: `socket.timeout.ms/1000.0` :param set(ConsumerGroupState) states: only list consumer groups which are currently in these states. + :param set(ConsumerGroupType) types: only list consumer groups of + these types. :returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`. @@ -917,6 +920,16 @@ def list_consumer_groups(self, **kwargs): raise TypeError("All elements of states must be of type ConsumerGroupState") kwargs["states_int"] = [state.value for state in states] kwargs.pop("states") + if "types" in kwargs: + types = kwargs["types"] + if types is not None: + if not isinstance(types, set): + raise TypeError("'types' must be a set") + for type in types: + if not isinstance(type, _ConsumerGroupType): + raise TypeError("All elements of types must be of type ConsumerGroupType") + kwargs["types_int"] = [type.value for type in types] + kwargs.pop("types") f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result) diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 82ab98f1d..964d62b2f 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -14,7 +14,7 @@ from .._util import ConversionUtil -from .._model import ConsumerGroupState +from .._model import ConsumerGroupState, ConsumerGroupType from ._acl import AclOperation @@ -31,13 +31,17 @@ class ConsumerGroupListing: Whether a consumer group is simple or not. state : ConsumerGroupState Current state of the consumer group. + type : ConsumerGroupType + Type of the consumer group. """ - def __init__(self, group_id, is_simple_consumer_group, state=None): + def __init__(self, group_id, is_simple_consumer_group, state=None, type=None): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group if state is not None: self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState) + if type is not None: + self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType) class ListConsumerGroupsResult: diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 1a0980460..5eeb5c4cd 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -82,6 +82,8 @@ struct Admin_options { rd_kafka_IsolationLevel_t isolation_level; rd_kafka_consumer_group_state_t* states; int states_cnt; + rd_kafka_consumer_group_type_t* types; + int types_cnt; }; /**@brief "unset" value initializers for Admin_options @@ -96,6 +98,8 @@ struct Admin_options { Admin_options_def_int, \ Admin_options_def_ptr, \ Admin_options_def_cnt, \ + Admin_options_def_ptr, \ + Admin_options_def_cnt, \ } #define Admin_options_is_set_int(v) ((v) != Admin_options_def_int) @@ -185,6 +189,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api, goto err; } + if (Admin_options_is_set_ptr(options->types) && + (err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types( + c_options, options->types, options->types_cnt))) { + snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj)); + goto err; + } + return c_options; err: @@ -1698,24 +1709,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR( * @brief List consumer groups */ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) { - PyObject *future, *states_int = NULL; + PyObject *future, *states_int = NULL, *types_int = NULL; struct Admin_options options = Admin_options_INITIALIZER; rd_kafka_AdminOptions_t *c_options = NULL; CallState cs; rd_kafka_queue_t *rkqu; rd_kafka_consumer_group_state_t *c_states = NULL; + rd_kafka_consumer_group_type_t *c_types = NULL; int states_cnt = 0; + int types_cnt = 0; int i = 0; static char *kws[] = {"future", /* options */ "states_int", + "types_int", "request_timeout", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws, &future, &states_int, + &types_int, &options.request_timeout)) { goto err; } @@ -1736,7 +1751,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw PyObject *state = PyList_GET_ITEM(states_int, i); if(!cfl_PyInt_Check(state)) { PyErr_SetString(PyExc_ValueError, - "Element of states must be a valid state"); + "Element of states must be valid states"); goto err; } c_states[i] = (rd_kafka_consumer_group_state_t) cfl_PyInt_AsInt(state); @@ -1746,6 +1761,33 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw } } + if(types_int != NULL && types_int != Py_None) { + if(!PyList_Check(types_int)) { + PyErr_SetString(PyExc_ValueError, + "types must of type list"); + goto err; + } + + types_cnt = (int)PyList_Size(types_int); + + if(types_cnt > 0) { + c_types = (rd_kafka_consumer_group_type_t *) + malloc(types_cnt * + sizeof(rd_kafka_consumer_group_type_t)); + for(i = 0 ; i < types_cnt ; i++) { + PyObject *type = PyList_GET_ITEM(types_int, i); + if(!cfl_PyInt_Check(type)) { + PyErr_SetString(PyExc_ValueError, + "Element of types must be valid group types"); + goto err; + } + c_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(type); + } + options.types = c_types; + options.types_cnt = types_cnt; + } + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, &options, future); if (!c_options) { @@ -1774,14 +1816,19 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw if(c_states) { free(c_states); } + if(c_types) { + free(c_types); + } rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ rd_kafka_AdminOptions_destroy(c_options); - Py_RETURN_NONE; err: if(c_states) { free(c_states); } + if(c_types) { + free(c_types); + } if (c_options) { rd_kafka_AdminOptions_destroy(c_options); Py_DECREF(future); @@ -1789,7 +1836,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw return NULL; } const char Admin_list_consumer_groups_doc[] = PyDoc_STR( - ".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n" + ".. py:function:: list_consumer_groups(future, [states_int], [types_int], [request_timeout])\n" "\n" " List all the consumer groups.\n" "\n" @@ -3711,6 +3758,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py( cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i])); + cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i])); + args = PyTuple_New(0); valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs); diff --git a/src/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c index 4a9d37c1e..bcb75926f 100644 --- a/src/confluent_kafka/src/AdminTypes.c +++ b/src/confluent_kafka/src/AdminTypes.c @@ -570,8 +570,14 @@ static void AdminTypes_AddObjectsConsumerGroupStates (PyObject *m) { PyModule_AddIntConstant(m, "CONSUMER_GROUP_STATE_EMPTY", RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY); } +static void AdminTypes_AddObjectsConsumerGroupTypes (PyObject *m) { + /* rd_kafka_consumer_group_type_t */ + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_UNKNOWN", RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CONSUMER", RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CLASSIC", RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); +} + static void AdminTypes_AddObjectsAlterConfigOpType (PyObject *m) { - /* rd_kafka_consumer_group_state_t */ PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_SET", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET); PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_DELETE", RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE); PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_APPEND", RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND); @@ -620,6 +626,7 @@ void AdminTypes_AddObjects (PyObject *m) { AdminTypes_AddObjectsAclOperation(m); AdminTypes_AddObjectsAclPermissionType(m); AdminTypes_AddObjectsConsumerGroupStates(m); + AdminTypes_AddObjectsConsumerGroupTypes(m); AdminTypes_AddObjectsAlterConfigOpType(m); AdminTypes_AddObjectsScramMechanismType(m); AdminTypes_AddObjectsIsolationLevel(m);