Skip to content

Commit 96e8c23

Browse files
committed
Added Consumer.assignment() API
1 parent ab60427 commit 96e8c23

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,28 @@ static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {
219219
Py_RETURN_NONE;
220220
}
221221

222+
static PyObject *Consumer_assignment (Handle *self, PyObject *args,
223+
PyObject *kwargs) {
224+
225+
PyObject *plist;
226+
rd_kafka_topic_partition_list_t *c_parts;
227+
rd_kafka_resp_err_t err;
228+
229+
err = rd_kafka_assignment(self->rk, &c_parts);
230+
if (err) {
231+
cfl_PyErr_Format(err,
232+
"Failed to get assignment: %s",
233+
rd_kafka_err2str(err));
234+
return NULL;
235+
}
236+
237+
238+
plist = c_parts_to_py(c_parts);
239+
rd_kafka_topic_partition_list_destroy(c_parts);
240+
241+
return plist;
242+
}
243+
222244

223245

224246
static PyObject *Consumer_commit (Handle *self, PyObject *args,
@@ -465,6 +487,17 @@ static PyMethodDef Consumer_methods[] = {
465487
" Removes the current partition assignment and stops consuming.\n"
466488
"\n"
467489
},
490+
{ "assignment", (PyCFunction)Consumer_assignment,
491+
METH_VARARGS|METH_KEYWORDS,
492+
".. py:function:: assignment()\n"
493+
"\n"
494+
" Returns the current partition assignment.\n"
495+
"\n"
496+
" :returns: List of assigned topic+partitions.\n"
497+
" :rtype: list(TopicPartition)\n"
498+
" :raises: KafkaException\n"
499+
"\n"
500+
},
468501
{ "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS,
469502
".. py:function:: commit([message=None], [offsets=None], [async=True])\n"
470503
"\n"

tests/test_Consumer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ def dummy_assign_revoke (consumer, partitions):
3939
partitions = list(map(lambda p: TopicPartition("test", p), range(0,100,3)))
4040
kc.assign(partitions)
4141

42+
# Verify assignment
43+
assignment = kc.assignment()
44+
assert partitions == assignment
45+
4246
kc.unassign()
4347

4448
kc.commit(async=True)

0 commit comments

Comments
 (0)