Skip to content

Commit 73b363f

Browse files
committed
fixes based on pull request review feedback
1 parent 8f88eb1 commit 73b363f

File tree

5 files changed

+148
-51
lines changed

5 files changed

+148
-51
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
749749

750750
parts = PyList_New(c_parts->cnt);
751751

752-
for (i = 0 ; i < c_parts->cnt ; i++) {
752+
for (i = 0 ; i < (size_t)c_parts->cnt ; i++) {
753753
const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
754754
PyList_SET_ITEM(parts, i,
755755
TopicPartition_new0(
@@ -778,7 +778,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
778778

779779
c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));
780780

781-
for (i = 0 ; i < PyList_Size(plist) ; i++) {
781+
for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
782782
TopicPartition *tp = (TopicPartition *)
783783
PyList_GetItem(plist, i);
784784

@@ -825,9 +825,9 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
825825
result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
826826
Py_DECREF(eo);
827827

828-
if (result) {
828+
if (result)
829829
Py_DECREF(result);
830-
} else {
830+
else {
831831
CallState_crash(cs);
832832
rd_kafka_yield(h->rk);
833833
}
@@ -842,18 +842,18 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
842842
CallState *cs = NULL;
843843

844844
cs = CallState_get(h);
845-
if (json_len == 0 || !h->stats_cb) {
846-
/* Neither data nor call back defined. */
845+
if (json_len == 0) {
846+
/* No data returned*/
847847
goto done;
848848
}
849849

850850
eo = Py_BuildValue("s", json);
851851
result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
852852
Py_DECREF(eo);
853853

854-
if (result) {
854+
if (result)
855855
Py_DECREF(result);
856-
} else {
856+
else {
857857
CallState_crash(cs);
858858
rd_kafka_yield(h->rk);
859859
}
@@ -1143,7 +1143,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11431143
Py_DECREF(ks);
11441144
continue;
11451145

1146-
} else if (!strcmp(k, "error_cb")) {
1146+
} else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) {
11471147
if (h->error_cb) {
11481148
Py_DECREF(h->error_cb);
11491149
h->error_cb = NULL;
@@ -1154,7 +1154,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11541154
}
11551155
Py_DECREF(ks);
11561156
continue;
1157-
} else if (!strcmp(k, "stats_cb")) {
1157+
} else if (!strcmp(k, "stats_cb") && PyCallable_Check(vo)) {
11581158
if (h->stats_cb) {
11591159
Py_DECREF(h->stats_cb);
11601160
h->stats_cb = NULL;

examples/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def print_usage_and_exit(program_name):
3232
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
3333
options='''
3434
Options:
35-
-T <intvl> Enable statistics from Kafka at specified interval (ms)
35+
-T <intvl> Enable client statistics at specified interval (ms)
3636
'''
3737
sys.stderr.write(options)
3838
sys.exit(1)

examples/integration_test.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import time
2525
import uuid
2626
import sys
27+
import json
2728

2829
try:
2930
from progress.bar import Bar
@@ -39,6 +40,7 @@
3940

4041
def error_cb (err):
4142
print('Error: %s' % err)
43+
4244

4345
class MyTestDr(object):
4446
""" Producer: Delivery report callback """
@@ -353,6 +355,78 @@ def my_on_revoke (consumer, partitions):
353355
c.close()
354356

355357

358+
def verify_stats_cb():
359+
""" Verify stats_cb """
360+
361+
def stats_cb(stats_json_str):
362+
stats_json = json.loads(stats_json_str)
363+
if 'test' in stats_json['topics']:
364+
print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset'])
365+
366+
conf = {'bootstrap.servers': bootstrap_servers,
367+
'group.id': uuid.uuid1(),
368+
'session.timeout.ms': 6000,
369+
'error_cb': error_cb,
370+
'stats_cb': stats_cb,
371+
'statistics.interval.ms': 200,
372+
'default.topic.config': {
373+
'auto.offset.reset': 'earliest'
374+
}}
375+
376+
c = confluent_kafka.Consumer(**conf)
377+
c.subscribe(["test"])
378+
379+
max_msgcnt = 1000000
380+
bytecnt = 0
381+
msgcnt = 0
382+
383+
print('Will now consume %d messages' % max_msgcnt)
384+
385+
if with_progress:
386+
bar = Bar('Consuming', max=max_msgcnt,
387+
suffix='%(index)d/%(max)d [%(eta_td)s]')
388+
else:
389+
bar = None
390+
391+
while True:
392+
# Consume until EOF or error
393+
394+
msg = c.poll(timeout=20.0)
395+
if msg is None:
396+
raise Exception('Stalled at %d/%d message, no new messages for 20s' %
397+
(msgcnt, max_msgcnt))
398+
399+
if msg.error():
400+
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
401+
# Reached EOF for a partition, ignore.
402+
continue
403+
else:
404+
raise confluent_kafka.KafkaException(msg.error())
405+
406+
407+
bytecnt += len(msg)
408+
msgcnt += 1
409+
410+
if bar is not None and (msgcnt % 10000) == 0:
411+
bar.next(n=10000)
412+
413+
if msgcnt == 1:
414+
t_first_msg = time.time()
415+
if msgcnt >= max_msgcnt:
416+
break
417+
418+
if bar is not None:
419+
bar.finish()
420+
421+
if msgcnt > 0:
422+
t_spent = time.time() - t_first_msg
423+
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \
424+
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
425+
(bytecnt / t_spent) / (1024*1024)))
426+
427+
print('closing consumer')
428+
c.close()
429+
356430

357431
if __name__ == '__main__':
358432

@@ -377,6 +451,9 @@ def my_on_revoke (consumer, partitions):
377451
print('=' * 30, 'Verifying Consumer performance', '=' * 30)
378452
verify_consumer_performance()
379453

454+
print('=' * 30, 'Verifying stats_cb', '=' * 30)
455+
verify_stats_cb()
456+
380457
print('=' * 30, 'Done', '=' * 30)
381458

382459

examples/producer.py

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,55 +19,22 @@
1919
# Example Kafka Producer.
2020
# Reads lines from stdin and sends to Kafka.
2121
#
22+
2223
from confluent_kafka import Producer
2324
import sys
24-
import getopt
25-
import json
26-
from pprint import pformat
27-
28-
def stats_cb(stats_json_str):
29-
stats_json = json.loads(stats_json_str)
30-
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
31-
32-
def print_usage_and_exit(program_name):
33-
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <topic>\n' % program_name)
34-
options='''
35-
Options:
36-
-T <intvl> Enable statistics from Kafka at specified interval (ms)
37-
'''
38-
sys.stderr.write(options)
39-
sys.exit(1)
40-
4125

4226
if __name__ == '__main__':
43-
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
44-
if len(argv) != 2:
45-
print_usage_and_exit(sys.argv[0])
27+
if len(sys.argv) != 3:
28+
sys.stderr.write('Usage: %s <bootstrap-brokers> <topic>\n' % sys.argv[0])
29+
sys.exit(1)
4630

47-
broker = argv[0]
48-
topic = argv[1]
31+
broker = sys.argv[1]
32+
topic = sys.argv[2]
4933

5034
# Producer configuration
5135
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
5236
conf = {'bootstrap.servers': broker}
5337

54-
# Check to see if -T option exists
55-
for opt in optlist:
56-
if opt[0] != '-T':
57-
continue
58-
try:
59-
intval = int(opt[1])
60-
except:
61-
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
62-
sys.exit(1)
63-
64-
if intval <= 0:
65-
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
66-
sys.exit(1)
67-
68-
conf['stats_cb'] = stats_cb
69-
conf['statistics.interval.ms'] = int(opt[1])
70-
7138
# Create Producer instance
7239
p = Producer(**conf)
7340

tests/test_misc.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#!/usr/bin/env python
22

33
import confluent_kafka
4-
4+
import json
5+
import time
6+
from pprint import pprint
57

68
def test_version():
79
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
@@ -14,3 +16,54 @@ def test_version():
1416
assert len(sver) > 0
1517
assert iver > 0
1618

19+
def test_error_cb():
20+
""" Tests error_cb. """
21+
22+
def error_cb(error_msg):
23+
print('OK: error_cb() called')
24+
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN)
25+
26+
conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error
27+
'group.id':'test',
28+
'socket.timeout.ms':'100',
29+
'session.timeout.ms': 1000, # Avoid close() blocking too long
30+
'error_cb': error_cb
31+
}
32+
33+
kc = confluent_kafka.Consumer(**conf)
34+
35+
kc.subscribe(["test"])
36+
kc.poll(timeout=0.001)
37+
time.sleep(1)
38+
kc.unsubscribe()
39+
40+
kc.close()
41+
42+
def test_stats_cb():
43+
""" Tests stats_cb. """
44+
45+
def stats_cb(stats_json_str):
46+
# print(stats_json_str)
47+
try:
48+
stats_json = json.loads(stats_json_str)
49+
if 'type' in stats_json:
50+
print("stats_cb: type=%s" % stats_json['type'])
51+
print('OK: stats_cb() called')
52+
except Exception as e:
53+
assert False
54+
55+
conf = {'group.id':'test',
56+
'socket.timeout.ms':'100',
57+
'session.timeout.ms': 1000, # Avoid close() blocking too long
58+
'statistics.interval.ms': 200,
59+
'stats_cb': stats_cb
60+
}
61+
62+
kc = confluent_kafka.Consumer(**conf)
63+
64+
kc.subscribe(["test"])
65+
kc.poll(timeout=0.001)
66+
time.sleep(1)
67+
68+
kc.close()
69+

0 commit comments

Comments
 (0)