From fdaa0d4aa109639924bc44e6e0e0ce0ac75369d2 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 6 Jul 2016 07:57:54 +0200 Subject: [PATCH 1/5] Refactor Consumer and Producer into common Handle for code reuse. --- confluent_kafka/src/Consumer.c | 111 ++++++++++++++------------ confluent_kafka/src/Producer.c | 78 ++++++++++-------- confluent_kafka/src/confluent_kafka.c | 32 ++++---- confluent_kafka/src/confluent_kafka.h | 90 ++++++++++++--------- 4 files changed, 172 insertions(+), 139 deletions(-) diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c index 80e136a9e..88a586e7f 100644 --- a/confluent_kafka/src/Consumer.c +++ b/confluent_kafka/src/Consumer.c @@ -28,23 +28,26 @@ ****************************************************************************/ -static int Consumer_clear (Consumer *self) { - if (self->on_assign) { - Py_DECREF(self->on_assign); - self->on_assign = NULL; +static int Consumer_clear (Handle *self) { + if (self->u.Consumer.on_assign) { + Py_DECREF(self->u.Consumer.on_assign); + self->u.Consumer.on_assign = NULL; } - if (self->on_revoke) { - Py_DECREF(self->on_revoke); - self->on_revoke = NULL; + if (self->u.Consumer.on_revoke) { + Py_DECREF(self->u.Consumer.on_revoke); + self->u.Consumer.on_revoke = NULL; } - if (self->on_commit) { - Py_DECREF(self->on_commit); - self->on_commit = NULL; + if (self->u.Consumer.on_commit) { + Py_DECREF(self->u.Consumer.on_commit); + self->u.Consumer.on_commit = NULL; } + + Handle_clear(self); + return 0; } -static void Consumer_dealloc (Consumer *self) { +static void Consumer_dealloc (Handle *self) { PyObject_GC_UnTrack(self); Consumer_clear(self); @@ -55,12 +58,15 @@ static void Consumer_dealloc (Consumer *self) { Py_TYPE(self)->tp_free((PyObject *)self); } -static int Consumer_traverse (Consumer *self, - visitproc visit, void *arg) { - if (self->on_assign) - Py_VISIT(self->on_assign); - if (self->on_revoke) - Py_VISIT(self->on_revoke); +static int Consumer_traverse (Handle *self, + visitproc visit, void *arg) { + if (self->u.Consumer.on_assign) + Py_VISIT(self->u.Consumer.on_assign); + if (self->u.Consumer.on_revoke) + Py_VISIT(self->u.Consumer.on_revoke); + + Handle_traverse(self, visit, arg); + return 0; } @@ -69,7 +75,7 @@ static int Consumer_traverse (Consumer *self, -static PyObject *Consumer_subscribe (Consumer *self, PyObject *args, +static PyObject *Consumer_subscribe (Handle *self, PyObject *args, PyObject *kwargs) { rd_kafka_topic_partition_list_t *topics; @@ -130,29 +136,29 @@ static PyObject *Consumer_subscribe (Consumer *self, PyObject *args, /* * Update rebalance callbacks */ - if (self->on_assign) { - Py_DECREF(self->on_assign); - self->on_assign = NULL; + if (self->u.Consumer.on_assign) { + Py_DECREF(self->u.Consumer.on_assign); + self->u.Consumer.on_assign = NULL; } if (on_assign) { - self->on_assign = on_assign; - Py_INCREF(self->on_assign); + self->u.Consumer.on_assign = on_assign; + Py_INCREF(self->u.Consumer.on_assign); } - if (self->on_revoke) { - Py_DECREF(self->on_revoke); - self->on_revoke = NULL; + if (self->u.Consumer.on_revoke) { + Py_DECREF(self->u.Consumer.on_revoke); + self->u.Consumer.on_revoke = NULL; } if (on_revoke) { - self->on_revoke = on_revoke; - Py_INCREF(self->on_revoke); + self->u.Consumer.on_revoke = on_revoke; + Py_INCREF(self->u.Consumer.on_revoke); } Py_RETURN_NONE; } -static PyObject *Consumer_unsubscribe (Consumer *self, +static PyObject *Consumer_unsubscribe (Handle *self, PyObject *ignore) { rd_kafka_resp_err_t err; @@ -169,7 +175,7 @@ static PyObject *Consumer_unsubscribe (Consumer *self, } -static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) { +static PyObject *Consumer_assign (Handle *self, PyObject *tlist) { rd_kafka_topic_partition_list_t *c_parts; rd_kafka_resp_err_t err; @@ -177,7 +183,7 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) { if (!(c_parts = py_to_c_parts(tlist))) return NULL; - self->rebalance_assigned++; + self->u.Consumer.rebalance_assigned++; err = rd_kafka_assign(self->rk, c_parts); @@ -194,11 +200,11 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) { } -static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) { +static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) { rd_kafka_resp_err_t err; - self->rebalance_assigned++; + self->u.Consumer.rebalance_assigned++; err = rd_kafka_assign(self->rk, NULL); if (err) { @@ -213,7 +219,7 @@ static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) { -static PyObject *Consumer_commit (Consumer *self, PyObject *args, +static PyObject *Consumer_commit (Handle *self, PyObject *args, PyObject *kwargs) { rd_kafka_resp_err_t err; @@ -281,7 +287,7 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args, -static PyObject *Consumer_committed (Consumer *self, PyObject *args, +static PyObject *Consumer_committed (Handle *self, PyObject *args, PyObject *kwargs) { PyObject *plist; @@ -317,7 +323,7 @@ static PyObject *Consumer_committed (Consumer *self, PyObject *args, } -static PyObject *Consumer_position (Consumer *self, PyObject *args, +static PyObject *Consumer_position (Handle *self, PyObject *args, PyObject *kwargs) { PyObject *plist; @@ -352,7 +358,7 @@ static PyObject *Consumer_position (Consumer *self, PyObject *args, -static PyObject *Consumer_poll (Consumer *self, PyObject *args, +static PyObject *Consumer_poll (Handle *self, PyObject *args, PyObject *kwargs) { double tmout = -1.0f; static char *kws[] = { "timeout", NULL }; @@ -384,7 +390,7 @@ static PyObject *Consumer_poll (Consumer *self, PyObject *args, } -static PyObject *Consumer_close (Consumer *self, PyObject *ignore) { +static PyObject *Consumer_close (Handle *self, PyObject *ignore) { self->thread_state = PyEval_SaveThread(); rd_kafka_consumer_close(self->rk); PyEval_RestoreThread(self->thread_state); @@ -523,7 +529,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, PyTypeObject ConsumerType = { PyVarObject_HEAD_INIT(NULL, 0) "cimpl.Consumer", /*tp_name*/ - sizeof(Consumer), /*tp_basicsize*/ + sizeof(Handle), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)Consumer_dealloc, /*tp_dealloc*/ 0, /*tp_print*/ @@ -584,14 +590,16 @@ PyTypeObject ConsumerType = { static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_parts, void *opaque) { - Consumer *self = opaque; + Handle *self = opaque; PyEval_RestoreThread(self->thread_state); - self->rebalance_assigned = 0; + self->u.Consumer.rebalance_assigned = 0; - if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && self->on_assign) || - (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && self->on_revoke)) { + if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && + self->u.Consumer.on_assign) || + (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && + self->u.Consumer.on_revoke)) { PyObject *parts; PyObject *args, *result; @@ -612,7 +620,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, result = PyObject_CallObject( err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? - self->on_assign : self->on_revoke, args); + self->u.Consumer.on_assign : + self->u.Consumer.on_revoke, args); Py_DECREF(args); @@ -628,7 +637,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, * to synchronize state, if the user did not do this from callback, * or there was no callback, or the callback failed, then we perform * that assign() call here instead. */ - if (!self->rebalance_assigned) { + if (!self->u.Consumer.rebalance_assigned) { if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) rd_kafka_assign(rk, c_parts); else @@ -642,10 +651,10 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_parts, void *opaque) { - Consumer *self = opaque; + Handle *self = opaque; PyObject *parts, *k_err, *args, *result; - if (!self->on_commit) + if (!self->u.Consumer.on_commit) return; PyEval_RestoreThread(self->thread_state); @@ -669,7 +678,7 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, return; } - result = PyObject_CallObject(self->on_commit, args); + result = PyObject_CallObject(self->u.Consumer.on_commit, args); Py_DECREF(args); @@ -687,16 +696,16 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, PyObject *kwargs) { - Consumer *self; + Handle *self; char errstr[256]; rd_kafka_conf_t *conf; - self = (Consumer *)ConsumerType.tp_alloc(&ConsumerType, 0); + self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0); if (!self) return NULL; if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self, - args, kwargs))) { + args, kwargs))) { Py_DECREF(self); return NULL; } diff --git a/confluent_kafka/src/Producer.c b/confluent_kafka/src/Producer.c index c69ca07dc..bca70568f 100644 --- a/confluent_kafka/src/Producer.c +++ b/confluent_kafka/src/Producer.c @@ -46,7 +46,7 @@ * Per-message state. */ struct Producer_msgstate { - Producer *self; + Handle *self; PyObject *dr_cb; PyObject *partitioner_cb; }; @@ -57,8 +57,8 @@ struct Producer_msgstate { * Returns NULL if neither dr_cb or partitioner_cb is set. */ static __inline struct Producer_msgstate * -Producer_msgstate_new (Producer *self, - PyObject *dr_cb, PyObject *partitioner_cb) { +Producer_msgstate_new (Handle *self, + PyObject *dr_cb, PyObject *partitioner_cb) { struct Producer_msgstate *msgstate; if (!dr_cb && !partitioner_cb) @@ -88,19 +88,22 @@ Producer_msgstate_destroy (struct Producer_msgstate *msgstate) { } -static int Producer_clear (Producer *self) { - if (self->default_dr_cb) { - Py_DECREF(self->default_dr_cb); - self->default_dr_cb = NULL; +static int Producer_clear (Handle *self) { + if (self->u.Producer.default_dr_cb) { + Py_DECREF(self->u.Producer.default_dr_cb); + self->u.Producer.default_dr_cb = NULL; } - if (self->partitioner_cb) { - Py_DECREF(self->partitioner_cb); - self->partitioner_cb = NULL; + if (self->u.Producer.partitioner_cb) { + Py_DECREF(self->u.Producer.partitioner_cb); + self->u.Producer.partitioner_cb = NULL; } + + Handle_clear(self); + return 0; } -static void Producer_dealloc (Producer *self) { +static void Producer_dealloc (Handle *self) { PyObject_GC_UnTrack(self); Producer_clear(self); @@ -111,12 +114,15 @@ static void Producer_dealloc (Producer *self) { Py_TYPE(self)->tp_free((PyObject *)self); } -static int Producer_traverse (Producer *self, - visitproc visit, void *arg) { - if (self->default_dr_cb) - Py_VISIT(self->default_dr_cb); - if (self->partitioner_cb) - Py_VISIT(self->partitioner_cb); +static int Producer_traverse (Handle *self, + visitproc visit, void *arg) { + if (self->u.Producer.default_dr_cb) + Py_VISIT(self->u.Producer.default_dr_cb); + if (self->u.Producer.partitioner_cb) + Py_VISIT(self->u.Producer.partitioner_cb); + + Handle_traverse(self, visit, arg); + return 0; } @@ -124,7 +130,7 @@ static int Producer_traverse (Producer *self, static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, void *opaque) { struct Producer_msgstate *msgstate = rkm->_private; - Producer *self = opaque; + Handle *self = opaque; PyObject *args; PyObject *result; PyObject *msgobj; @@ -178,7 +184,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { - Producer *self = rkt_opaque; + Handle *self = rkt_opaque; struct Producer_msgstate *msgstate = msg_opaque; PyGILState_STATE gstate; PyObject *result; @@ -188,9 +194,9 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, if (!msgstate) { /* Fall back on default C partitioner if neither a per-msg * partitioner nor a default Python partitioner is available */ - return self->c_partitioner_cb(rkt, keydata, keylen, - partition_cnt, rkt_opaque, - msg_opaque); + return self->u.Producer.c_partitioner_cb(rkt, keydata, keylen, + partition_cnt, + rkt_opaque, msg_opaque); } gstate = PyGILState_Ensure(); @@ -198,9 +204,11 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, if (!msgstate->partitioner_cb) { /* Fall back on default C partitioner if neither a per-msg * partitioner nor a default Python partitioner is available */ - r = msgstate->self->c_partitioner_cb(rkt, keydata, keylen, - partition_cnt, rkt_opaque, - msg_opaque); + r = msgstate->self->u.Producer.c_partitioner_cb(rkt, + keydata, keylen, + partition_cnt, + rkt_opaque, + msg_opaque); goto done; } @@ -237,7 +245,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, -static PyObject *Producer_produce (Producer *self, PyObject *args, +static PyObject *Producer_produce (Handle *self, PyObject *args, PyObject *kwargs) { const char *topic, *value = NULL, *key = NULL; int value_len = 0, key_len = 0; @@ -272,9 +280,9 @@ static PyObject *Producer_produce (Producer *self, PyObject *args, } if (!dr_cb) - dr_cb = self->default_dr_cb; + dr_cb = self->u.Producer.default_dr_cb; if (!partitioner_cb) - partitioner_cb = self->partitioner_cb; + partitioner_cb = self->u.Producer.partitioner_cb; /* Create msgstate if necessary, may return NULL if no callbacks * are wanted. */ @@ -311,7 +319,7 @@ static PyObject *Producer_produce (Producer *self, PyObject *args, * @returns -1 if callback crashed (or poll() failed), else the number * of events served. */ -static int Producer_poll0 (Producer *self, int tmout) { +static int Producer_poll0 (Handle *self, int tmout) { int r; self->callback_crashed = 0; @@ -332,7 +340,7 @@ static int Producer_poll0 (Producer *self, int tmout) { } -static PyObject *Producer_poll (Producer *self, PyObject *args, +static PyObject *Producer_poll (Handle *self, PyObject *args, PyObject *kwargs) { double tmout; int r; @@ -349,7 +357,7 @@ static PyObject *Producer_poll (Producer *self, PyObject *args, } -static PyObject *Producer_flush (Producer *self, PyObject *ignore) { +static PyObject *Producer_flush (Handle *self, PyObject *ignore) { while (rd_kafka_outq_len(self->rk) > 0) { if (Producer_poll0(self, 500) == -1) return NULL; @@ -415,7 +423,7 @@ static PyMethodDef Producer_methods[] = { }; -static Py_ssize_t Producer__len__ (Producer *self) { +static Py_ssize_t Producer__len__ (Handle *self) { return rd_kafka_outq_len(self->rk); } @@ -431,7 +439,7 @@ static PyObject *Producer_new (PyTypeObject *type, PyObject *args, PyTypeObject ProducerType = { PyVarObject_HEAD_INIT(NULL, 0) "cimpl.Producer", /*tp_name*/ - sizeof(Producer), /*tp_basicsize*/ + sizeof(Handle), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)Producer_dealloc, /*tp_dealloc*/ 0, /*tp_print*/ @@ -484,11 +492,11 @@ PyTypeObject ProducerType = { static PyObject *Producer_new (PyTypeObject *type, PyObject *args, PyObject *kwargs) { - Producer *self; + Handle *self; char errstr[256]; rd_kafka_conf_t *conf; - self = (Producer *)ProducerType.tp_alloc(&ProducerType, 0); + self = (Handle *)ProducerType.tp_alloc(&ProducerType, 0); if (!self) return NULL; diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index e85ef2025..4b1d227ce 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -879,7 +879,7 @@ static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what, * * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised). */ -static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf, +static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf, const char *name, PyObject *valobj) { PyObject *vs; @@ -894,8 +894,8 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf, return -1; } - self->default_dr_cb = valobj; - Py_INCREF(self->default_dr_cb); + self->u.Producer.default_dr_cb = valobj; + Py_INCREF(self->u.Producer.default_dr_cb); return 1; @@ -947,11 +947,11 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf, return -1; } - if (self->partitioner_cb) - Py_DECREF(self->partitioner_cb); + if (self->u.Producer.partitioner_cb) + Py_DECREF(self->u.Producer.partitioner_cb); - self->partitioner_cb = valobj; - Py_INCREF(self->partitioner_cb); + self->u.Producer.partitioner_cb = valobj; + Py_INCREF(self->u.Producer.partitioner_cb); /* Use trampoline to call Python code. */ rd_kafka_topic_conf_set_partitioner_cb(tconf, @@ -970,7 +970,7 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf, * * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised). */ -static int consumer_conf_set_special (Consumer *self, rd_kafka_conf_t *conf, +static int consumer_conf_set_special (Handle *self, rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf, const char *name, PyObject *valobj) { @@ -983,8 +983,8 @@ static int consumer_conf_set_special (Consumer *self, rd_kafka_conf_t *conf, return -1; } - self->on_commit = valobj; - Py_INCREF(self->on_commit); + self->u.Consumer.on_commit = valobj; + Py_INCREF(self->u.Consumer.on_commit); return 1; } @@ -1000,7 +1000,7 @@ static int consumer_conf_set_special (Consumer *self, rd_kafka_conf_t *conf, * an exception has been raised. */ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, - void *self0, + Handle *h, PyObject *args, PyObject *kwargs) { rd_kafka_conf_t *conf; @@ -1057,11 +1057,9 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, /* Special handling for certain config keys. */ if (ktype == RD_KAFKA_PRODUCER) - r = producer_conf_set_special((Producer *)self0, - conf, tconf, k, vo); + r = producer_conf_set_special(h, conf, tconf, k, vo); else - r = consumer_conf_set_special((Consumer *)self0, - conf, tconf, k, vo); + r = consumer_conf_set_special(h, conf, tconf, k, vo); if (r == -1) { /* Error */ Py_DECREF(ks); @@ -1104,10 +1102,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); } - rd_kafka_topic_conf_set_opaque(tconf, self0); + rd_kafka_topic_conf_set_opaque(tconf, h); rd_kafka_conf_set_default_topic_conf(conf, tconf); - rd_kafka_conf_set_opaque(conf, self0); + rd_kafka_conf_set_opaque(conf, h); return conf; } diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h index b96e88129..b007dc79a 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -98,6 +98,55 @@ PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str); PyErr_SetObject(KafkaException, _eo); \ } while (0) + + +/**************************************************************************** + * + * + * Common instance handle for both Producer and Consumer + * + * + * + * + ****************************************************************************/ +typedef struct { + PyObject_HEAD + rd_kafka_t *rk; + int callback_crashed; + PyThreadState *thread_state; + PyObject *error_cb; + + union { + /** + * Producer + */ + struct { + PyObject *default_dr_cb; + PyObject *partitioner_cb; /**< Registered Python partitioner */ + int32_t (*c_partitioner_cb) ( + const rd_kafka_topic_t *, + const void *, size_t, int32_t, + void *, void *); /**< Fallback C partitioner*/ + } Producer; + + /** + * Consumer + */ + struct { + int rebalance_assigned; /* Rebalance: Callback performed assign() call.*/ + PyObject *on_assign; /* Rebalance: on_assign callback */ + PyObject *on_revoke; /* Rebalance: on_revoke callback */ + PyObject *on_commit; /* Commit callback */ + + } Consumer; + } u; +} Handle; + + +void Handle_clear (Handle *h); +int Handle_traverse (Handle *h, visitproc visit, void *arg); + + /**************************************************************************** * * @@ -108,9 +157,9 @@ PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str); * ****************************************************************************/ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, - void *self0, - PyObject *args, - PyObject *kwargs); + Handle *h, + PyObject *args, + PyObject *kwargs); PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts); rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist); @@ -154,25 +203,9 @@ PyObject *Message_error (Message *self, PyObject *ignore); * ****************************************************************************/ -/** - * @brief confluent_kafka.Producer object - */ -typedef struct { - PyObject_HEAD - rd_kafka_t *rk; - PyObject *default_dr_cb; - PyObject *partitioner_cb; /**< Registered Python partitioner */ - int32_t (*c_partitioner_cb) ( - const rd_kafka_topic_t *, - const void *, size_t, int32_t, - void *, void *); /**< Fallback C partitioner*/ - int callback_crashed; - PyThreadState *thread_state; -} Producer; - - extern PyTypeObject ProducerType; + int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, @@ -190,19 +223,4 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, * ****************************************************************************/ -/** - * @brief confluent_kafka.Consumer object - */ -typedef struct { - PyObject_HEAD - rd_kafka_t *rk; - int rebalance_assigned; /* Rebalance: Callback performed assign() call.*/ - PyObject *on_assign; /* Rebalance: on_assign callback */ - PyObject *on_revoke; /* Rebalance: on_revoke callback */ - PyObject *on_commit; /* Commit callback */ - int callback_crashed; - PyThreadState *thread_state; -} Consumer; - -extern PyTypeObject ConsumerType ; - +extern PyTypeObject ConsumerType; From 0e7b694b455923610858231f862c81a41b08f566 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 6 Jul 2016 07:59:18 +0200 Subject: [PATCH 2/5] Added error_cb for propagating generic errors from librdkafka to app (issue #14) --- confluent_kafka/src/confluent_kafka.c | 65 +++++++++++++++++++++++++++ docs/index.rst | 3 ++ examples/integration_test.py | 8 +++- tests/test_Producer.py | 4 ++ 4 files changed, 79 insertions(+), 1 deletion(-) diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 4b1d227ce..60d6de10b 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -801,6 +801,39 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { } +/**************************************************************************** + * + * + * Common callbacks + * + * + * + * + ****************************************************************************/ +static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) { + Handle *h = opaque; + PyObject *eo, *result; + + PyEval_RestoreThread(h->thread_state); + if (!h->error_cb) { + /* No callback defined */ + goto done; + } + + eo = KafkaError_new0(err, "%s", reason); + result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL); + Py_DECREF(eo); + + if (result) { + Py_DECREF(result); + } else { + h->callback_crashed++; + rd_kafka_yield(h->rk); + } + + done: + h->thread_state = PyEval_SaveThread(); +} /**************************************************************************** @@ -814,6 +847,28 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { ****************************************************************************/ + +/** + * Clear Python object references in Handle + */ +void Handle_clear (Handle *h) { + if (h->error_cb) { + Py_DECREF(h->error_cb); + } +} + +/** + * GC traversal for Python object references + */ +int Handle_traverse (Handle *h, visitproc visit, void *arg) { + if (h->error_cb) + Py_VISIT(h->error_cb); + + return 0; +} + + + /** * Populate topic conf from provided dict. * @@ -1053,6 +1108,14 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); continue; + + } else if (!strcmp(k, "error_cb")) { + if (h->error_cb) + Py_DECREF(h->error_cb); + h->error_cb = vo; + Py_INCREF(h->error_cb); + Py_DECREF(ks); + continue; } /* Special handling for certain config keys. */ @@ -1102,6 +1165,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); } + if (h->error_cb) + rd_kafka_conf_set_error_cb(conf, error_cb); rd_kafka_topic_conf_set_opaque(tconf, h); rd_kafka_conf_set_default_topic_conf(conf, tconf); diff --git a/docs/index.rst b/docs/index.rst index 55871a128..1ff778bfc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -78,6 +78,9 @@ The Python bindings also provide some additional configuration properties: * ``default.topic.config``: value is a dict of topic-level configuration properties that are applied to all used topics for the instance. +* ``error_cb``: Callback for generic/global error events. This callback is served by + poll(). + * ``on_delivery`` (**Producer**): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). diff --git a/examples/integration_test.py b/examples/integration_test.py index f2c5d43bf..f1f9a6009 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -37,6 +37,8 @@ +def error_cb (err): + print('Error: %s' % err) class MyTestDr(object): """ Producer: Delivery report callback """ @@ -77,6 +79,7 @@ def verify_producer(): # Producer config conf = {'bootstrap.servers': bootstrap_servers, + 'error_cb': error_cb, 'default.topic.config':{'produce.offset.report': True}} # Create producer @@ -112,7 +115,8 @@ def verify_producer(): def verify_producer_performance(with_dr_cb=True): """ Time how long it takes to produce and delivery X messages """ - conf = {'bootstrap.servers': bootstrap_servers} + conf = {'bootstrap.servers': bootstrap_servers, + 'error_cb': error_cb} p = confluent_kafka.Producer(**conf) @@ -207,6 +211,7 @@ def verify_consumer(): 'session.timeout.ms': 6000, 'enable.auto.commit': False, 'on_commit': print_commit_result, + 'error_cb': error_cb, 'default.topic.config': { 'auto.offset.reset': 'earliest' }} @@ -275,6 +280,7 @@ def verify_consumer_performance(): conf = {'bootstrap.servers': bootstrap_servers, 'group.id': uuid.uuid1(), 'session.timeout.ms': 6000, + 'error_cb': error_cb, 'default.topic.config': { 'auto.offset.reset': 'earliest' }} diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 68ce03032..6bc176754 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -13,7 +13,11 @@ def test_basic_api(): assert str(e) == "expected configuration dict" + def error_cb (err): + print('error_cb', err) + p = Producer({'socket.timeout.ms':10, + 'error_cb': error_cb, 'default.topic.config': {'message.timeout.ms': 10}}) p.produce('mytopic') From fa80712a5c046e10073469276041edd8d8608fc6 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 13 Jul 2016 07:53:11 +0200 Subject: [PATCH 3/5] Added unit-test for error_cb --- tests/test_KafkaError.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 tests/test_KafkaError.py diff --git a/tests/test_KafkaError.py b/tests/test_KafkaError.py new file mode 100644 index 000000000..b3e9db062 --- /dev/null +++ b/tests/test_KafkaError.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +from confluent_kafka import Producer, KafkaError, KafkaException +import time + +seen_all_brokers_down = False + +def error_cb (err): + print('error_cb', err) + if err.code() == KafkaError._ALL_BROKERS_DOWN: + global seen_all_brokers_down + seen_all_brokers_down = True + + +def test_error_cb(): + """ Test the error callback. """ + + global seen_all_brokers_down + + # Configure an invalid broker and make sure the ALL_BROKERS_DOWN + # error is seen in the error callback. + p = Producer({'bootstrap.servers': '127.0.0.1:1', 'socket.timeout.ms':10, + 'error_cb': error_cb}) + + t_end = time.time() + 5 + + while not seen_all_brokers_down and time.time() < t_end: + p.poll(1) + + assert seen_all_brokers_down From d7239ee7842f4139b4456288e46d78ade0189d5e Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 13 Jul 2016 08:00:57 +0200 Subject: [PATCH 4/5] Documented parameters of callbacks --- docs/index.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 1ff778bfc..7dae4c460 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -78,14 +78,14 @@ The Python bindings also provide some additional configuration properties: * ``default.topic.config``: value is a dict of topic-level configuration properties that are applied to all used topics for the instance. -* ``error_cb``: Callback for generic/global error events. This callback is served by +* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by poll(). -* ``on_delivery`` (**Producer**): value is a Python function reference +* ``on_delivery(kafka.KafkaError, kafka.Message)`` (**Producer**): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passing ``callback=callable`` (or ``on_delivery=callable``) to the confluent_kafka.Producer.produce() function. -* ``on_commit`` (**Consumer**): Callback used to indicate success or failure +* ``on_commit(kafka.KafkaError, list(kafka.TopicPartition))`` (**Consumer**): Callback used to indicate success or failure of commit requests. From 353ac3f4dfac0da2ab5004a3887d60c1bad00b30 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 4 Aug 2016 20:33:43 +0200 Subject: [PATCH 5/5] Make sure to GC on_commit callable --- confluent_kafka/src/Consumer.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c index 88a586e7f..722ca80c2 100644 --- a/confluent_kafka/src/Consumer.c +++ b/confluent_kafka/src/Consumer.c @@ -64,6 +64,8 @@ static int Consumer_traverse (Handle *self, Py_VISIT(self->u.Consumer.on_assign); if (self->u.Consumer.on_revoke) Py_VISIT(self->u.Consumer.on_revoke); + if (self->u.Consumer.on_commit) + Py_VISIT(self->u.Consumer.on_commit); Handle_traverse(self, visit, arg);