Skip to content

Commit d7ab475

Browse files
committed
fixes based on pull request review feedback
1 parent 73b363f commit d7ab475

File tree

3 files changed

+49
-25
lines changed

3 files changed

+49
-25
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
749749

750750
parts = PyList_New(c_parts->cnt);
751751

752-
for (i = 0 ; i < (size_t)c_parts->cnt ; i++) {
752+
for (i = 0 ; i < c_parts->cnt ; i++) {
753753
const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
754754
PyList_SET_ITEM(parts, i,
755755
TopicPartition_new0(
@@ -778,7 +778,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
778778

779779
c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));
780780

781-
for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
781+
for (i = 0 ; i < PyList_Size(plist) ; i++) {
782782
TopicPartition *tp = (TopicPartition *)
783783
PyList_GetItem(plist, i);
784784

@@ -879,10 +879,10 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
879879
* Clear Python object references in Handle
880880
*/
881881
void Handle_clear (Handle *h) {
882-
if (h->error_cb)
882+
if (h->error_cb)
883883
Py_DECREF(h->error_cb);
884884

885-
if (h->stats_cb)
885+
if (h->stats_cb)
886886
Py_DECREF(h->stats_cb);
887887

888888
PyThread_delete_key(h->tlskey);
@@ -1143,7 +1143,16 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11431143
Py_DECREF(ks);
11441144
continue;
11451145

1146-
} else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) {
1146+
} else if (!strcmp(k, "error_cb")) {
1147+
if (!PyCallable_Check(vo)) {
1148+
PyErr_SetString(PyExc_TypeError,
1149+
"expected error_cb property "
1150+
"as a callable function");
1151+
rd_kafka_topic_conf_destroy(tconf);
1152+
rd_kafka_conf_destroy(conf);
1153+
Py_DECREF(ks);
1154+
return NULL;
1155+
}
11471156
if (h->error_cb) {
11481157
Py_DECREF(h->error_cb);
11491158
h->error_cb = NULL;
@@ -1154,7 +1163,17 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11541163
}
11551164
Py_DECREF(ks);
11561165
continue;
1157-
} else if (!strcmp(k, "stats_cb") && PyCallable_Check(vo)) {
1166+
} else if (!strcmp(k, "stats_cb")) {
1167+
if (!PyCallable_Check(vo)) {
1168+
PyErr_SetString(PyExc_TypeError,
1169+
"expected stats_cb property "
1170+
"as a callable function");
1171+
rd_kafka_topic_conf_destroy(tconf);
1172+
rd_kafka_conf_destroy(conf);
1173+
Py_DECREF(ks);
1174+
return NULL;
1175+
}
1176+
11581177
if (h->stats_cb) {
11591178
Py_DECREF(h->stats_cb);
11601179
h->stats_cb = NULL;

examples/integration_test.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
bootstrap_servers = 'localhost'
3737

3838

39-
39+
# global variable to be set by stats_cb call back function
40+
good_stats_cb_result = False
4041

4142
def error_cb (err):
4243
print('Error: %s' % err)
@@ -359,9 +360,13 @@ def verify_stats_cb():
359360
""" Verify stats_cb """
360361

361362
def stats_cb(stats_json_str):
363+
global good_stats_cb_result
362364
stats_json = json.loads(stats_json_str)
363365
if 'test' in stats_json['topics']:
364-
print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset'])
366+
app_offset = stats_json['topics']['test']['partitions']['0']['app_offset']
367+
if app_offset > 0:
368+
print("# app_offset stats for topic test partition 0: %d" % app_offset)
369+
good_stats_cb_result = True
365370

366371
conf = {'bootstrap.servers': bootstrap_servers,
367372
'group.id': uuid.uuid1(),
@@ -388,7 +393,7 @@ def stats_cb(stats_json_str):
388393
else:
389394
bar = None
390395

391-
while True:
396+
while not good_stats_cb_result:
392397
# Consume until EOF or error
393398

394399
msg = c.poll(timeout=20.0)

tests/test_misc.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@ def test_version():
1616
assert len(sver) > 0
1717
assert iver > 0
1818

19+
# global variable for error_cb call back function
20+
seen_error_cb = False
21+
1922
def test_error_cb():
2023
""" Tests error_cb. """
2124

2225
def error_cb(error_msg):
23-
print('OK: error_cb() called')
26+
global seen_error_cb
27+
seen_error_cb = True
2428
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN)
2529

2630
conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error
@@ -31,26 +35,23 @@ def error_cb(error_msg):
3135
}
3236

3337
kc = confluent_kafka.Consumer(**conf)
34-
3538
kc.subscribe(["test"])
36-
kc.poll(timeout=0.001)
37-
time.sleep(1)
38-
kc.unsubscribe()
39+
while not seen_error_cb:
40+
kc.poll(timeout=1)
3941

4042
kc.close()
4143

44+
# global variable for stats_cb call back function
45+
seen_stats_cb = False
46+
4247
def test_stats_cb():
4348
""" Tests stats_cb. """
4449

4550
def stats_cb(stats_json_str):
46-
# print(stats_json_str)
47-
try:
48-
stats_json = json.loads(stats_json_str)
49-
if 'type' in stats_json:
50-
print("stats_cb: type=%s" % stats_json['type'])
51-
print('OK: stats_cb() called')
52-
except Exception as e:
53-
assert False
51+
global seen_stats_cb
52+
seen_stats_cb = True
53+
stats_json = json.loads(stats_json_str)
54+
assert len(stats_json['name']) > 0
5455

5556
conf = {'group.id':'test',
5657
'socket.timeout.ms':'100',
@@ -62,8 +63,7 @@ def stats_cb(stats_json_str):
6263
kc = confluent_kafka.Consumer(**conf)
6364

6465
kc.subscribe(["test"])
65-
kc.poll(timeout=0.001)
66-
time.sleep(1)
67-
66+
while not seen_stats_cb:
67+
kc.poll(timeout=1)
6868
kc.close()
6969

0 commit comments

Comments
 (0)