Skip to content

Commit 4944479

Browse files
committed
Added Consumer.get_watermark_offsets() (#31)
1 parent 96e8c23 commit 4944479

File tree

5 files changed

+107
-12
lines changed

5 files changed

+107
-12
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,54 @@ static PyObject *Consumer_position (Handle *self, PyObject *args,
381381
}
382382

383383

384+
static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
385+
PyObject *kwargs) {
386+
387+
TopicPartition *tp;
388+
rd_kafka_resp_err_t err;
389+
double tmout = -1.0f;
390+
int cached = 0;
391+
int64_t low = RD_KAFKA_OFFSET_INVALID, high = RD_KAFKA_OFFSET_INVALID;
392+
static char *kws[] = { "partition", "timeout", "cached", NULL };
393+
PyObject *rlist;
394+
395+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|db", kws,
396+
(PyObject **)&tp, &tmout, &cached))
397+
return NULL;
398+
399+
400+
if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
401+
PyErr_Format(PyExc_TypeError,
402+
"expected %s", TopicPartitionType.tp_name);
403+
return NULL;
404+
}
405+
406+
if (cached) {
407+
err = rd_kafka_get_watermark_offsets(self->rk,
408+
tp->topic, tp->partition,
409+
&low, &high);
410+
} else {
411+
err = rd_kafka_query_watermark_offsets(self->rk,
412+
tp->topic, tp->partition,
413+
&low, &high,
414+
tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
415+
}
416+
417+
if (err) {
418+
cfl_PyErr_Format(err,
419+
"Failed to get watermark offsets: %s",
420+
rd_kafka_err2str(err));
421+
return NULL;
422+
}
423+
424+
rlist = PyList_New(2);
425+
PyList_SetItem(rlist, 0, PyLong_FromLongLong(low));
426+
PyList_SetItem(rlist, 1, PyLong_FromLongLong(high));
427+
428+
return rlist;
429+
}
430+
431+
384432

385433
static PyObject *Consumer_poll (Handle *self, PyObject *args,
386434
PyObject *kwargs) {
@@ -541,6 +589,22 @@ static PyMethodDef Consumer_methods[] = {
541589
" :raises: KafkaException\n"
542590
"\n"
543591
},
592+
{ "get_watermark_offsets", (PyCFunction)Consumer_get_watermark_offsets,
593+
METH_VARARGS|METH_KEYWORDS,
594+
".. py:function:: get_watermark_offsets(partition, [timeout=None], [cached=False])\n"
595+
"\n"
596+
" Retrieve low and high offsets for partition.\n"
597+
"\n"
598+
" :param TopicPartition partition: Topic+partition to return offsets for."
599+
" :param float timeout: Request timeout (when cached=False).\n"
600+
" :param bool cached: Instead of querying the broker used cached information. "
601+
"The low offset is updated periodically (if statistics.interval.ms is set) while "
602+
"the high offset is updated on each message fetched from the broker for this partition."
603+
" :returns: List of [low,high] on success or None on timeout.\n"
604+
" :rtype: list(int)\n"
605+
" :raises: KafkaException\n"
606+
"\n"
607+
},
544608
{ "close", (PyCFunction)Consumer_close, METH_NOARGS,
545609
"\n"
546610
" Close down and terminate the Kafka Consumer.\n"

confluent_kafka/src/confluent_kafka.c

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -510,15 +510,6 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
510510
*
511511
*
512512
****************************************************************************/
513-
typedef struct {
514-
PyObject_HEAD
515-
char *topic;
516-
int partition;
517-
int64_t offset;
518-
PyObject *error;
519-
} TopicPartition;
520-
521-
522513
static int TopicPartition_clear (TopicPartition *self) {
523514
if (self->topic) {
524515
free(self->topic);
@@ -600,8 +591,6 @@ static PyObject *TopicPartition_str0 (TopicPartition *self) {
600591
}
601592

602593

603-
static PyTypeObject TopicPartitionType;
604-
605594
static PyObject *
606595
TopicPartition_richcompare (TopicPartition *self, PyObject *o2,
607596
int op) {
@@ -658,7 +647,7 @@ static long TopicPartition_hash (TopicPartition *self) {
658647
}
659648

660649

661-
static PyTypeObject TopicPartitionType = {
650+
PyTypeObject TopicPartitionType = {
662651
PyVarObject_HEAD_INIT(NULL, 0)
663652
"cimpl.TopicPartition", /*tp_name*/
664653
sizeof(TopicPartition), /*tp_basicsize*/

confluent_kafka/src/confluent_kafka.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,26 @@ void CallState_resume (CallState *cs);
181181
void CallState_crash (CallState *cs);
182182

183183

184+
/****************************************************************************
185+
*
186+
*
187+
* TopicPartition
188+
*
189+
*
190+
*
191+
*
192+
****************************************************************************/
193+
typedef struct {
194+
PyObject_HEAD
195+
char *topic;
196+
int partition;
197+
int64_t offset;
198+
PyObject *error;
199+
} TopicPartition;
200+
201+
extern PyTypeObject TopicPartitionType;
202+
203+
184204
/****************************************************************************
185205
*
186206
*

examples/integration_test.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,18 @@ def verify_consumer():
258258
print('max_msgcnt %d reached' % msgcnt)
259259
break
260260

261+
# Get current assignment
262+
assignment = c.assignment()
263+
264+
# Get cached watermark offsets
265+
# Since we're not making use of statistics the low offset is not known so ignore it.
266+
lo,hi = c.get_watermark_offsets(assignment[0], cached=True)
267+
print('Cached offsets for %s: %d - %d' % (assignment[0], lo, hi))
268+
269+
# Query broker for offsets
270+
lo,hi = c.get_watermark_offsets(assignment[0], timeout=1.0)
271+
print('Queried offsets for %s: %d - %d' % (assignment[0], lo, hi))
272+
261273

262274
# Close consumer
263275
c.close()

tests/test_Consumer.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ def dummy_assign_revoke (consumer, partitions):
4343
assignment = kc.assignment()
4444
assert partitions == assignment
4545

46+
# Get cached watermarks, should all be invalid.
47+
lo, hi = kc.get_watermark_offsets(partitions[0], cached=True)
48+
assert lo == -1001 and hi == -1001
49+
50+
# Query broker for watermarks, should raise an exception.
51+
try:
52+
lo, hi = kc.get_watermark_offsets(partitions[0], timeout=0.5, cached=False)
53+
except KafkaException as e:
54+
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD)
55+
4656
kc.unassign()
4757

4858
kc.commit(async=True)

0 commit comments

Comments
 (0)