From 201b6b333f91b2abe545e9262688233b81617836 Mon Sep 17 00:00:00 2001 From: Huajun Qin Date: Wed, 26 Oct 2016 12:01:05 -0700 Subject: [PATCH 1/7] expose stats_cb --- confluent_kafka/src/confluent_kafka.c | 50 +++++++++++++++++++++++++++ confluent_kafka/src/confluent_kafka.h | 1 + examples/consumer.py | 47 ++++++++++++++++++++----- examples/producer.py | 45 ++++++++++++++++++++---- 4 files changed, 129 insertions(+), 14 deletions(-) diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 3539f16f7..6c5530512 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -836,6 +836,33 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) CallState_resume(cs); } +static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) +{ + Handle *h = opaque; + PyObject *eo=NULL, *result=NULL; + CallState *cs=NULL; + + cs = CallState_get(h); + if (json_len== 0 || !h->stats_cb) { + /* Neither data nor call back defined. */ + goto done; + } + + eo = Py_BuildValue("s", json); + result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL); + Py_DECREF(eo); + + if (result) { + Py_DECREF(result); + } else { + CallState_crash(cs); + rd_kafka_yield(h->rk); + } + + done: + CallState_resume(cs); + return 0; +} /**************************************************************************** * @@ -857,6 +884,10 @@ void Handle_clear (Handle *h) { Py_DECREF(h->error_cb); } + if (h->stats_cb) { + Py_DECREF(h->stats_cb); + } + PyThread_delete_key(h->tlskey); } @@ -867,6 +898,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) { if (h->error_cb) Py_VISIT(h->error_cb); + if (h->stats_cb) + Py_VISIT(h->stats_cb); + return 0; } @@ -1123,6 +1157,17 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, } Py_DECREF(ks); continue; + } else if (!strcmp(k, "stats_cb")) { + if (h->stats_cb) { + Py_DECREF(h->stats_cb); + h->stats_cb = NULL; + } + if (vo != Py_None) { + h->stats_cb = vo; + Py_INCREF(h->stats_cb); + } + Py_DECREF(ks); + continue; } /* Special handling for certain config keys. */ @@ -1174,6 +1219,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, if (h->error_cb) rd_kafka_conf_set_error_cb(conf, error_cb); + + if (h->stats_cb) { + rd_kafka_conf_set_stats_cb(conf, stats_cb); + } + rd_kafka_topic_conf_set_opaque(tconf, h); rd_kafka_conf_set_default_topic_conf(conf, tconf); diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h index 5e8eb7989..ad5bd7eb9 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -114,6 +114,7 @@ typedef struct { PyObject_HEAD rd_kafka_t *rk; PyObject *error_cb; + PyObject *stats_cb; int tlskey; /* Thread-Local-Storage key */ union { diff --git a/examples/consumer.py b/examples/consumer.py index d9bda0b4e..096c8763f 100755 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -18,24 +18,55 @@ # # Example high-level Kafka 0.9 balanced Consumer # - from confluent_kafka import Consumer, KafkaException, KafkaError import sys +import getopt +import json +from pprint import pformat -if __name__ == '__main__': - if len(sys.argv) < 4: - sys.stderr.write('Usage: %s ..\n' % sys.argv[0]) - sys.exit(1) +def stats_cb(stats_json_str): + stats_json = json.loads(stats_json_str) + print('\nKAFKA Stats: {}\n'.format(pformat(stats_json))) - broker = sys.argv[1] - group = sys.argv[2] - topics = sys.argv[3:] +def print_usage_and_exit(program_name): + sys.stderr.write('Usage: %s [options..] ..\n' % program_name) + options=''' + Options: + -T Enable statistics from Kafka at specified interval (ms) +''' + sys.stderr.write(options) + sys.exit(1) + +if __name__ == '__main__': + optlist, argv = getopt.getopt(sys.argv[1:], 'T:') + if len(argv) < 3: + print_usage_and_exit(sys.argv[0]) + + broker = argv[0] + group = argv[1] + topics = argv[2:] # Consumer configuration # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}} + # Check to see if -T option exists + for opt in optlist: + if opt[0] != '-T': + continue + try: + intval = int(opt[1]) + except: + sys.stderr.write("Invalid option value for -T: %s\n" % opt[1]) + sys.exit(1) + + if intval <= 0: + sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1]) + sys.exit(1) + + conf['stats_cb'] = stats_cb + conf['statistics.interval.ms'] = int(opt[1]) # Create Consumer instance c = Consumer(**conf) diff --git a/examples/producer.py b/examples/producer.py index 903ecb4af..8d7c1611f 100755 --- a/examples/producer.py +++ b/examples/producer.py @@ -19,22 +19,55 @@ # Example Kafka Producer. # Reads lines from stdin and sends to Kafka. # - from confluent_kafka import Producer import sys +import getopt +import json +from pprint import pformat + +def stats_cb(stats_json_str): + stats_json = json.loads(stats_json_str) + print('\nKAFKA Stats: {}\n'.format(pformat(stats_json))) + +def print_usage_and_exit(program_name): + sys.stderr.write('Usage: %s [options..] \n' % program_name) + options=''' + Options: + -T Enable statistics from Kafka at specified interval (ms) +''' + sys.stderr.write(options) + sys.exit(1) + if __name__ == '__main__': - if len(sys.argv) != 3: - sys.stderr.write('Usage: %s \n' % sys.argv[0]) - sys.exit(1) + optlist, argv = getopt.getopt(sys.argv[1:], 'T:') + if len(argv) != 2: + print_usage_and_exit(sys.argv[0]) - broker = sys.argv[1] - topic = sys.argv[2] + broker = argv[0] + topic = argv[1] # Producer configuration # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md conf = {'bootstrap.servers': broker} + # Check to see if -T option exists + for opt in optlist: + if opt[0] != '-T': + continue + try: + intval = int(opt[1]) + except: + sys.stderr.write("Invalid option value for -T: %s\n" % opt[1]) + sys.exit(1) + + if intval <= 0: + sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1]) + sys.exit(1) + + conf['stats_cb'] = stats_cb + conf['statistics.interval.ms'] = int(opt[1]) + # Create Producer instance p = Producer(**conf) From 8f88eb1aaf99c91f1f147897db3efe8a7d44d814 Mon Sep 17 00:00:00 2001 From: Huajun Qin Date: Wed, 26 Oct 2016 14:49:14 -0700 Subject: [PATCH 2/7] working in progress --- confluent_kafka/src/confluent_kafka.c | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 6c5530512..3f4a935c2 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -836,14 +836,13 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) CallState_resume(cs); } -static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) -{ +static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { Handle *h = opaque; - PyObject *eo=NULL, *result=NULL; - CallState *cs=NULL; + PyObject *eo = NULL, *result = NULL; + CallState *cs = NULL; cs = CallState_get(h); - if (json_len== 0 || !h->stats_cb) { + if (json_len == 0 || !h->stats_cb) { /* Neither data nor call back defined. */ goto done; } @@ -880,13 +879,11 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) * Clear Python object references in Handle */ void Handle_clear (Handle *h) { - if (h->error_cb) { + if (h->error_cb) Py_DECREF(h->error_cb); - } - if (h->stats_cb) { + if (h->stats_cb) Py_DECREF(h->stats_cb); - } PyThread_delete_key(h->tlskey); } @@ -1220,9 +1217,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, if (h->error_cb) rd_kafka_conf_set_error_cb(conf, error_cb); - if (h->stats_cb) { + if (h->stats_cb) rd_kafka_conf_set_stats_cb(conf, stats_cb); - } rd_kafka_topic_conf_set_opaque(tconf, h); rd_kafka_conf_set_default_topic_conf(conf, tconf); From 73b363f3ef3e100c5e55aeef43817de16c05239e Mon Sep 17 00:00:00 2001 From: Huajun Qin Date: Wed, 26 Oct 2016 17:40:13 -0700 Subject: [PATCH 3/7] fixes based on pull request review feedback --- confluent_kafka/src/confluent_kafka.c | 20 +++---- examples/consumer.py | 2 +- examples/integration_test.py | 77 +++++++++++++++++++++++++++ examples/producer.py | 45 +++------------- tests/test_misc.py | 55 ++++++++++++++++++- 5 files changed, 148 insertions(+), 51 deletions(-) diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 3f4a935c2..9e13de4b6 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) { parts = PyList_New(c_parts->cnt); - for (i = 0 ; i < c_parts->cnt ; i++) { + for (i = 0 ; i < (size_t)c_parts->cnt ; i++) { const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i]; PyList_SET_ITEM(parts, i, TopicPartition_new0( @@ -778,7 +778,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist)); - for (i = 0 ; i < PyList_Size(plist) ; i++) { + for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) { TopicPartition *tp = (TopicPartition *) PyList_GetItem(plist, i); @@ -825,9 +825,9 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL); Py_DECREF(eo); - if (result) { + if (result) Py_DECREF(result); - } else { + else { CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -842,8 +842,8 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { CallState *cs = NULL; cs = CallState_get(h); - if (json_len == 0 || !h->stats_cb) { - /* Neither data nor call back defined. */ + if (json_len == 0) { + /* No data returned*/ goto done; } @@ -851,9 +851,9 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL); Py_DECREF(eo); - if (result) { + if (result) Py_DECREF(result); - } else { + else { CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -1143,7 +1143,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); continue; - } else if (!strcmp(k, "error_cb")) { + } else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) { if (h->error_cb) { Py_DECREF(h->error_cb); h->error_cb = NULL; @@ -1154,7 +1154,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, } Py_DECREF(ks); continue; - } else if (!strcmp(k, "stats_cb")) { + } else if (!strcmp(k, "stats_cb") && PyCallable_Check(vo)) { if (h->stats_cb) { Py_DECREF(h->stats_cb); h->stats_cb = NULL; diff --git a/examples/consumer.py b/examples/consumer.py index 096c8763f..40b4b2967 100755 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -32,7 +32,7 @@ def print_usage_and_exit(program_name): sys.stderr.write('Usage: %s [options..] ..\n' % program_name) options=''' Options: - -T Enable statistics from Kafka at specified interval (ms) + -T Enable client statistics at specified interval (ms) ''' sys.stderr.write(options) sys.exit(1) diff --git a/examples/integration_test.py b/examples/integration_test.py index f1f9a6009..9c48aa856 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -24,6 +24,7 @@ import time import uuid import sys +import json try: from progress.bar import Bar @@ -39,6 +40,7 @@ def error_cb (err): print('Error: %s' % err) + class MyTestDr(object): """ Producer: Delivery report callback """ @@ -353,6 +355,78 @@ def my_on_revoke (consumer, partitions): c.close() +def verify_stats_cb(): + """ Verify stats_cb """ + + def stats_cb(stats_json_str): + stats_json = json.loads(stats_json_str) + if 'test' in stats_json['topics']: + print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset']) + + conf = {'bootstrap.servers': bootstrap_servers, + 'group.id': uuid.uuid1(), + 'session.timeout.ms': 6000, + 'error_cb': error_cb, + 'stats_cb': stats_cb, + 'statistics.interval.ms': 200, + 'default.topic.config': { + 'auto.offset.reset': 'earliest' + }} + + c = confluent_kafka.Consumer(**conf) + c.subscribe(["test"]) + + max_msgcnt = 1000000 + bytecnt = 0 + msgcnt = 0 + + print('Will now consume %d messages' % max_msgcnt) + + if with_progress: + bar = Bar('Consuming', max=max_msgcnt, + suffix='%(index)d/%(max)d [%(eta_td)s]') + else: + bar = None + + while True: + # Consume until EOF or error + + msg = c.poll(timeout=20.0) + if msg is None: + raise Exception('Stalled at %d/%d message, no new messages for 20s' % + (msgcnt, max_msgcnt)) + + if msg.error(): + if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: + # Reached EOF for a partition, ignore. + continue + else: + raise confluent_kafka.KafkaException(msg.error()) + + + bytecnt += len(msg) + msgcnt += 1 + + if bar is not None and (msgcnt % 10000) == 0: + bar.next(n=10000) + + if msgcnt == 1: + t_first_msg = time.time() + if msgcnt >= max_msgcnt: + break + + if bar is not None: + bar.finish() + + if msgcnt > 0: + t_spent = time.time() - t_first_msg + print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \ + (msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent, + (bytecnt / t_spent) / (1024*1024))) + + print('closing consumer') + c.close() + if __name__ == '__main__': @@ -377,6 +451,9 @@ def my_on_revoke (consumer, partitions): print('=' * 30, 'Verifying Consumer performance', '=' * 30) verify_consumer_performance() + print('=' * 30, 'Verifying stats_cb', '=' * 30) + verify_stats_cb() + print('=' * 30, 'Done', '=' * 30) diff --git a/examples/producer.py b/examples/producer.py index 8d7c1611f..903ecb4af 100755 --- a/examples/producer.py +++ b/examples/producer.py @@ -19,55 +19,22 @@ # Example Kafka Producer. # Reads lines from stdin and sends to Kafka. # + from confluent_kafka import Producer import sys -import getopt -import json -from pprint import pformat - -def stats_cb(stats_json_str): - stats_json = json.loads(stats_json_str) - print('\nKAFKA Stats: {}\n'.format(pformat(stats_json))) - -def print_usage_and_exit(program_name): - sys.stderr.write('Usage: %s [options..] \n' % program_name) - options=''' - Options: - -T Enable statistics from Kafka at specified interval (ms) -''' - sys.stderr.write(options) - sys.exit(1) - if __name__ == '__main__': - optlist, argv = getopt.getopt(sys.argv[1:], 'T:') - if len(argv) != 2: - print_usage_and_exit(sys.argv[0]) + if len(sys.argv) != 3: + sys.stderr.write('Usage: %s \n' % sys.argv[0]) + sys.exit(1) - broker = argv[0] - topic = argv[1] + broker = sys.argv[1] + topic = sys.argv[2] # Producer configuration # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md conf = {'bootstrap.servers': broker} - # Check to see if -T option exists - for opt in optlist: - if opt[0] != '-T': - continue - try: - intval = int(opt[1]) - except: - sys.stderr.write("Invalid option value for -T: %s\n" % opt[1]) - sys.exit(1) - - if intval <= 0: - sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1]) - sys.exit(1) - - conf['stats_cb'] = stats_cb - conf['statistics.interval.ms'] = int(opt[1]) - # Create Producer instance p = Producer(**conf) diff --git a/tests/test_misc.py b/tests/test_misc.py index f29eb4853..7388e538f 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -1,7 +1,9 @@ #!/usr/bin/env python import confluent_kafka - +import json +import time +from pprint import pprint def test_version(): print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version()) @@ -14,3 +16,54 @@ def test_version(): assert len(sver) > 0 assert iver > 0 +def test_error_cb(): + """ Tests error_cb. """ + + def error_cb(error_msg): + print('OK: error_cb() called') + assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN) + + conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error + 'group.id':'test', + 'socket.timeout.ms':'100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'error_cb': error_cb + } + + kc = confluent_kafka.Consumer(**conf) + + kc.subscribe(["test"]) + kc.poll(timeout=0.001) + time.sleep(1) + kc.unsubscribe() + + kc.close() + +def test_stats_cb(): + """ Tests stats_cb. """ + + def stats_cb(stats_json_str): + # print(stats_json_str) + try: + stats_json = json.loads(stats_json_str) + if 'type' in stats_json: + print("stats_cb: type=%s" % stats_json['type']) + print('OK: stats_cb() called') + except Exception as e: + assert False + + conf = {'group.id':'test', + 'socket.timeout.ms':'100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'statistics.interval.ms': 200, + 'stats_cb': stats_cb + } + + kc = confluent_kafka.Consumer(**conf) + + kc.subscribe(["test"]) + kc.poll(timeout=0.001) + time.sleep(1) + + kc.close() + From d7ab475d434de28d1f57ee6955fa6891f65a69f4 Mon Sep 17 00:00:00 2001 From: Huajun Qin Date: Fri, 28 Oct 2016 11:53:46 -0700 Subject: [PATCH 4/7] fixes based on pull request review feedback --- confluent_kafka/src/confluent_kafka.c | 31 +++++++++++++++++++++----- examples/integration_test.py | 11 ++++++--- tests/test_misc.py | 32 +++++++++++++-------------- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 9e13de4b6..657505198 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) { parts = PyList_New(c_parts->cnt); - for (i = 0 ; i < (size_t)c_parts->cnt ; i++) { + for (i = 0 ; i < c_parts->cnt ; i++) { const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i]; PyList_SET_ITEM(parts, i, TopicPartition_new0( @@ -778,7 +778,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist)); - for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) { + for (i = 0 ; i < PyList_Size(plist) ; i++) { TopicPartition *tp = (TopicPartition *) PyList_GetItem(plist, i); @@ -879,10 +879,10 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { * Clear Python object references in Handle */ void Handle_clear (Handle *h) { - if (h->error_cb) + if (h->error_cb) Py_DECREF(h->error_cb); - if (h->stats_cb) + if (h->stats_cb) Py_DECREF(h->stats_cb); PyThread_delete_key(h->tlskey); @@ -1143,7 +1143,16 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); continue; - } else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) { + } else if (!strcmp(k, "error_cb")) { + if (!PyCallable_Check(vo)) { + PyErr_SetString(PyExc_TypeError, + "expected error_cb property " + "as a callable function"); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + Py_DECREF(ks); + return NULL; + } if (h->error_cb) { Py_DECREF(h->error_cb); h->error_cb = NULL; @@ -1154,7 +1163,17 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, } Py_DECREF(ks); continue; - } else if (!strcmp(k, "stats_cb") && PyCallable_Check(vo)) { + } else if (!strcmp(k, "stats_cb")) { + if (!PyCallable_Check(vo)) { + PyErr_SetString(PyExc_TypeError, + "expected stats_cb property " + "as a callable function"); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + Py_DECREF(ks); + return NULL; + } + if (h->stats_cb) { Py_DECREF(h->stats_cb); h->stats_cb = NULL; diff --git a/examples/integration_test.py b/examples/integration_test.py index 9c48aa856..49369d899 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -36,7 +36,8 @@ bootstrap_servers = 'localhost' - +# global variable to be set by stats_cb call back function +good_stats_cb_result = False def error_cb (err): print('Error: %s' % err) @@ -359,9 +360,13 @@ def verify_stats_cb(): """ Verify stats_cb """ def stats_cb(stats_json_str): + global good_stats_cb_result stats_json = json.loads(stats_json_str) if 'test' in stats_json['topics']: - print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset']) + app_offset = stats_json['topics']['test']['partitions']['0']['app_offset'] + if app_offset > 0: + print("# app_offset stats for topic test partition 0: %d" % app_offset) + good_stats_cb_result = True conf = {'bootstrap.servers': bootstrap_servers, 'group.id': uuid.uuid1(), @@ -388,7 +393,7 @@ def stats_cb(stats_json_str): else: bar = None - while True: + while not good_stats_cb_result: # Consume until EOF or error msg = c.poll(timeout=20.0) diff --git a/tests/test_misc.py b/tests/test_misc.py index 7388e538f..6a937e3ca 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -16,11 +16,15 @@ def test_version(): assert len(sver) > 0 assert iver > 0 +# global variable for error_cb call back function +seen_error_cb = False + def test_error_cb(): """ Tests error_cb. """ def error_cb(error_msg): - print('OK: error_cb() called') + global seen_error_cb + seen_error_cb = True assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN) conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error @@ -31,26 +35,23 @@ def error_cb(error_msg): } kc = confluent_kafka.Consumer(**conf) - kc.subscribe(["test"]) - kc.poll(timeout=0.001) - time.sleep(1) - kc.unsubscribe() + while not seen_error_cb: + kc.poll(timeout=1) kc.close() +# global variable for stats_cb call back function +seen_stats_cb = False + def test_stats_cb(): """ Tests stats_cb. """ def stats_cb(stats_json_str): - # print(stats_json_str) - try: - stats_json = json.loads(stats_json_str) - if 'type' in stats_json: - print("stats_cb: type=%s" % stats_json['type']) - print('OK: stats_cb() called') - except Exception as e: - assert False + global seen_stats_cb + seen_stats_cb = True + stats_json = json.loads(stats_json_str) + assert len(stats_json['name']) > 0 conf = {'group.id':'test', 'socket.timeout.ms':'100', @@ -62,8 +63,7 @@ def stats_cb(stats_json_str): kc = confluent_kafka.Consumer(**conf) kc.subscribe(["test"]) - kc.poll(timeout=0.001) - time.sleep(1) - + while not seen_stats_cb: + kc.poll(timeout=1) kc.close() From eb31ace8779fe3829080f2abf266d794a08d5ad7 Mon Sep 17 00:00:00 2001 From: Huajun Qin Date: Fri, 28 Oct 2016 13:49:40 -0700 Subject: [PATCH 5/7] removed extra space --- tests/test_misc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_misc.py b/tests/test_misc.py index 6a937e3ca..c7a4f4107 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -63,7 +63,7 @@ def stats_cb(stats_json_str): kc = confluent_kafka.Consumer(**conf) kc.subscribe(["test"]) - while not seen_stats_cb: + while not seen_stats_cb: kc.poll(timeout=1) kc.close() From 90d11df2525fc528dc4aa7a65f72c93b784fa60f Mon Sep 17 00:00:00 2001 From: Huajun Qin Date: Wed, 2 Nov 2016 10:49:44 -0700 Subject: [PATCH 6/7] add documentation for stats_cb --- docs/index.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/index.rst b/docs/index.rst index 229fe9ef3..1a85bbc67 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -81,6 +81,8 @@ The Python bindings also provide some additional configuration properties: * ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by poll(). +* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll() every ``statistics.interval.ms`` (needs to be configured separately). Function argument ``json_str`` is a str instance of a JSON document containing statistics data. + * ``on_delivery(kafka.KafkaError, kafka.Message)`` (**Producer**): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). From 495a836ed2eca9998f635e406e0aef55b6f73f33 Mon Sep 17 00:00:00 2001 From: Huajun Qin Date: Wed, 2 Nov 2016 11:14:58 -0700 Subject: [PATCH 7/7] reformatted to be more readable in terminal --- docs/index.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/index.rst b/docs/index.rst index 1a85bbc67..3ed82a4af 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -81,7 +81,9 @@ The Python bindings also provide some additional configuration properties: * ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by poll(). -* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll() every ``statistics.interval.ms`` (needs to be configured separately). Function argument ``json_str`` is a str instance of a JSON document containing statistics data. +* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll() + every ``statistics.interval.ms`` (needs to be configured separately). + Function argument ``json_str`` is a str instance of a JSON document containing statistics data. * ``on_delivery(kafka.KafkaError, kafka.Message)`` (**Producer**): value is a Python function reference that is called once for each produced message to indicate the final