Skip to content

Commit ff1f073

Browse files
authored
Added statistics callback support (#43, @hqin)
2 parents 0b2d16d + 495a836 commit ff1f073

File tree

6 files changed

+250
-14
lines changed

6 files changed

+250
-14
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -825,9 +825,9 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
825825
result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
826826
Py_DECREF(eo);
827827

828-
if (result) {
828+
if (result)
829829
Py_DECREF(result);
830-
} else {
830+
else {
831831
CallState_crash(cs);
832832
rd_kafka_yield(h->rk);
833833
}
@@ -836,6 +836,32 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
836836
CallState_resume(cs);
837837
}
838838

839+
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
840+
Handle *h = opaque;
841+
PyObject *eo = NULL, *result = NULL;
842+
CallState *cs = NULL;
843+
844+
cs = CallState_get(h);
845+
if (json_len == 0) {
846+
/* No data returned*/
847+
goto done;
848+
}
849+
850+
eo = Py_BuildValue("s", json);
851+
result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
852+
Py_DECREF(eo);
853+
854+
if (result)
855+
Py_DECREF(result);
856+
else {
857+
CallState_crash(cs);
858+
rd_kafka_yield(h->rk);
859+
}
860+
861+
done:
862+
CallState_resume(cs);
863+
return 0;
864+
}
839865

840866
/****************************************************************************
841867
*
@@ -853,9 +879,11 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
853879
* Clear Python object references in Handle
854880
*/
855881
void Handle_clear (Handle *h) {
856-
if (h->error_cb) {
882+
if (h->error_cb)
857883
Py_DECREF(h->error_cb);
858-
}
884+
885+
if (h->stats_cb)
886+
Py_DECREF(h->stats_cb);
859887

860888
PyThread_delete_key(h->tlskey);
861889
}
@@ -867,6 +895,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) {
867895
if (h->error_cb)
868896
Py_VISIT(h->error_cb);
869897

898+
if (h->stats_cb)
899+
Py_VISIT(h->stats_cb);
900+
870901
return 0;
871902
}
872903

@@ -1113,6 +1144,15 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11131144
continue;
11141145

11151146
} else if (!strcmp(k, "error_cb")) {
1147+
if (!PyCallable_Check(vo)) {
1148+
PyErr_SetString(PyExc_TypeError,
1149+
"expected error_cb property "
1150+
"as a callable function");
1151+
rd_kafka_topic_conf_destroy(tconf);
1152+
rd_kafka_conf_destroy(conf);
1153+
Py_DECREF(ks);
1154+
return NULL;
1155+
}
11161156
if (h->error_cb) {
11171157
Py_DECREF(h->error_cb);
11181158
h->error_cb = NULL;
@@ -1123,6 +1163,27 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11231163
}
11241164
Py_DECREF(ks);
11251165
continue;
1166+
} else if (!strcmp(k, "stats_cb")) {
1167+
if (!PyCallable_Check(vo)) {
1168+
PyErr_SetString(PyExc_TypeError,
1169+
"expected stats_cb property "
1170+
"as a callable function");
1171+
rd_kafka_topic_conf_destroy(tconf);
1172+
rd_kafka_conf_destroy(conf);
1173+
Py_DECREF(ks);
1174+
return NULL;
1175+
}
1176+
1177+
if (h->stats_cb) {
1178+
Py_DECREF(h->stats_cb);
1179+
h->stats_cb = NULL;
1180+
}
1181+
if (vo != Py_None) {
1182+
h->stats_cb = vo;
1183+
Py_INCREF(h->stats_cb);
1184+
}
1185+
Py_DECREF(ks);
1186+
continue;
11261187
}
11271188

11281189
/* Special handling for certain config keys. */
@@ -1174,6 +1235,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11741235

11751236
if (h->error_cb)
11761237
rd_kafka_conf_set_error_cb(conf, error_cb);
1238+
1239+
if (h->stats_cb)
1240+
rd_kafka_conf_set_stats_cb(conf, stats_cb);
1241+
11771242
rd_kafka_topic_conf_set_opaque(tconf, h);
11781243
rd_kafka_conf_set_default_topic_conf(conf, tconf);
11791244

confluent_kafka/src/confluent_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ typedef struct {
114114
PyObject_HEAD
115115
rd_kafka_t *rk;
116116
PyObject *error_cb;
117+
PyObject *stats_cb;
117118
int tlskey; /* Thread-Local-Storage key */
118119

119120
union {

docs/index.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ The Python bindings also provide some additional configuration properties:
8181
* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by
8282
poll().
8383

84+
* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll()
85+
every ``statistics.interval.ms`` (needs to be configured separately).
86+
Function argument ``json_str`` is a str instance of a JSON document containing statistics data.
87+
8488
* ``on_delivery(kafka.KafkaError, kafka.Message)`` (**Producer**): value is a Python function reference
8589
that is called once for each produced message to indicate the final
8690
delivery result (success or failure).

examples/consumer.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,55 @@
1818
#
1919
# Example high-level Kafka 0.9 balanced Consumer
2020
#
21-
2221
from confluent_kafka import Consumer, KafkaException, KafkaError
2322
import sys
23+
import getopt
24+
import json
25+
from pprint import pformat
2426

25-
if __name__ == '__main__':
26-
if len(sys.argv) < 4:
27-
sys.stderr.write('Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % sys.argv[0])
28-
sys.exit(1)
27+
def stats_cb(stats_json_str):
28+
stats_json = json.loads(stats_json_str)
29+
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
2930

30-
broker = sys.argv[1]
31-
group = sys.argv[2]
32-
topics = sys.argv[3:]
31+
def print_usage_and_exit(program_name):
32+
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
33+
options='''
34+
Options:
35+
-T <intvl> Enable client statistics at specified interval (ms)
36+
'''
37+
sys.stderr.write(options)
38+
sys.exit(1)
3339

40+
41+
if __name__ == '__main__':
42+
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
43+
if len(argv) < 3:
44+
print_usage_and_exit(sys.argv[0])
45+
46+
broker = argv[0]
47+
group = argv[1]
48+
topics = argv[2:]
3449
# Consumer configuration
3550
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
3651
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
3752
'default.topic.config': {'auto.offset.reset': 'smallest'}}
3853

54+
# Check to see if -T option exists
55+
for opt in optlist:
56+
if opt[0] != '-T':
57+
continue
58+
try:
59+
intval = int(opt[1])
60+
except:
61+
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
62+
sys.exit(1)
63+
64+
if intval <= 0:
65+
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
66+
sys.exit(1)
67+
68+
conf['stats_cb'] = stats_cb
69+
conf['statistics.interval.ms'] = int(opt[1])
3970

4071
# Create Consumer instance
4172
c = Consumer(**conf)

examples/integration_test.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import time
2525
import uuid
2626
import sys
27+
import json
2728

2829
try:
2930
from progress.bar import Bar
@@ -35,10 +36,12 @@
3536
bootstrap_servers = 'localhost'
3637

3738

38-
39+
# global variable to be set by stats_cb call back function
40+
good_stats_cb_result = False
3941

4042
def error_cb (err):
4143
print('Error: %s' % err)
44+
4245

4346
class MyTestDr(object):
4447
""" Producer: Delivery report callback """
@@ -353,6 +356,82 @@ def my_on_revoke (consumer, partitions):
353356
c.close()
354357

355358

359+
def verify_stats_cb():
360+
""" Verify stats_cb """
361+
362+
def stats_cb(stats_json_str):
363+
global good_stats_cb_result
364+
stats_json = json.loads(stats_json_str)
365+
if 'test' in stats_json['topics']:
366+
app_offset = stats_json['topics']['test']['partitions']['0']['app_offset']
367+
if app_offset > 0:
368+
print("# app_offset stats for topic test partition 0: %d" % app_offset)
369+
good_stats_cb_result = True
370+
371+
conf = {'bootstrap.servers': bootstrap_servers,
372+
'group.id': uuid.uuid1(),
373+
'session.timeout.ms': 6000,
374+
'error_cb': error_cb,
375+
'stats_cb': stats_cb,
376+
'statistics.interval.ms': 200,
377+
'default.topic.config': {
378+
'auto.offset.reset': 'earliest'
379+
}}
380+
381+
c = confluent_kafka.Consumer(**conf)
382+
c.subscribe(["test"])
383+
384+
max_msgcnt = 1000000
385+
bytecnt = 0
386+
msgcnt = 0
387+
388+
print('Will now consume %d messages' % max_msgcnt)
389+
390+
if with_progress:
391+
bar = Bar('Consuming', max=max_msgcnt,
392+
suffix='%(index)d/%(max)d [%(eta_td)s]')
393+
else:
394+
bar = None
395+
396+
while not good_stats_cb_result:
397+
# Consume until EOF or error
398+
399+
msg = c.poll(timeout=20.0)
400+
if msg is None:
401+
raise Exception('Stalled at %d/%d message, no new messages for 20s' %
402+
(msgcnt, max_msgcnt))
403+
404+
if msg.error():
405+
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
406+
# Reached EOF for a partition, ignore.
407+
continue
408+
else:
409+
raise confluent_kafka.KafkaException(msg.error())
410+
411+
412+
bytecnt += len(msg)
413+
msgcnt += 1
414+
415+
if bar is not None and (msgcnt % 10000) == 0:
416+
bar.next(n=10000)
417+
418+
if msgcnt == 1:
419+
t_first_msg = time.time()
420+
if msgcnt >= max_msgcnt:
421+
break
422+
423+
if bar is not None:
424+
bar.finish()
425+
426+
if msgcnt > 0:
427+
t_spent = time.time() - t_first_msg
428+
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \
429+
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
430+
(bytecnt / t_spent) / (1024*1024)))
431+
432+
print('closing consumer')
433+
c.close()
434+
356435

357436
if __name__ == '__main__':
358437

@@ -377,6 +456,9 @@ def my_on_revoke (consumer, partitions):
377456
print('=' * 30, 'Verifying Consumer performance', '=' * 30)
378457
verify_consumer_performance()
379458

459+
print('=' * 30, 'Verifying stats_cb', '=' * 30)
460+
verify_stats_cb()
461+
380462
print('=' * 30, 'Done', '=' * 30)
381463

382464

tests/test_misc.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#!/usr/bin/env python
22

33
import confluent_kafka
4-
4+
import json
5+
import time
6+
from pprint import pprint
57

68
def test_version():
79
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
@@ -14,3 +16,54 @@ def test_version():
1416
assert len(sver) > 0
1517
assert iver > 0
1618

19+
# global variable for error_cb call back function
20+
seen_error_cb = False
21+
22+
def test_error_cb():
23+
""" Tests error_cb. """
24+
25+
def error_cb(error_msg):
26+
global seen_error_cb
27+
seen_error_cb = True
28+
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN)
29+
30+
conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error
31+
'group.id':'test',
32+
'socket.timeout.ms':'100',
33+
'session.timeout.ms': 1000, # Avoid close() blocking too long
34+
'error_cb': error_cb
35+
}
36+
37+
kc = confluent_kafka.Consumer(**conf)
38+
kc.subscribe(["test"])
39+
while not seen_error_cb:
40+
kc.poll(timeout=1)
41+
42+
kc.close()
43+
44+
# global variable for stats_cb call back function
45+
seen_stats_cb = False
46+
47+
def test_stats_cb():
48+
""" Tests stats_cb. """
49+
50+
def stats_cb(stats_json_str):
51+
global seen_stats_cb
52+
seen_stats_cb = True
53+
stats_json = json.loads(stats_json_str)
54+
assert len(stats_json['name']) > 0
55+
56+
conf = {'group.id':'test',
57+
'socket.timeout.ms':'100',
58+
'session.timeout.ms': 1000, # Avoid close() blocking too long
59+
'statistics.interval.ms': 200,
60+
'stats_cb': stats_cb
61+
}
62+
63+
kc = confluent_kafka.Consumer(**conf)
64+
65+
kc.subscribe(["test"])
66+
while not seen_stats_cb:
67+
kc.poll(timeout=1)
68+
kc.close()
69+

0 commit comments

Comments
 (0)