From 864a9358a6464a1a8aa50743859fe13e9d85381d Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Mon, 17 Oct 2016 21:11:00 +0200 Subject: [PATCH] Consumer, Producer, TopicPartition classes are now sub-classable --- confluent_kafka/src/Consumer.c | 179 +++++++++++++------------- confluent_kafka/src/Producer.c | 74 ++++++----- confluent_kafka/src/confluent_kafka.c | 61 +++++---- tests/test_Consumer.py | 11 ++ tests/test_KafkaError.py | 10 +- tests/test_Producer.py | 21 +++ tests/test_TopicPartition.py | 14 ++ 7 files changed, 221 insertions(+), 149 deletions(-) diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c index cc196c433..b7a21b254 100644 --- a/confluent_kafka/src/Consumer.c +++ b/confluent_kafka/src/Consumer.c @@ -67,7 +67,7 @@ static void Consumer_dealloc (Handle *self) { CallState_end(self, &cs); } - Py_TYPE(self)->tp_free((PyObject *)self); + Py_TYPE(self)->tp_free((PyObject *)self); } static int Consumer_traverse (Handle *self, @@ -550,70 +550,6 @@ static PyMethodDef Consumer_methods[] = { }; -static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, - PyObject *kwargs); - -PyTypeObject ConsumerType = { - PyVarObject_HEAD_INIT(NULL, 0) - "cimpl.Consumer", /*tp_name*/ - sizeof(Handle), /*tp_basicsize*/ - 0, /*tp_itemsize*/ - (destructor)Consumer_dealloc, /*tp_dealloc*/ - 0, /*tp_print*/ - 0, /*tp_getattr*/ - 0, /*tp_setattr*/ - 0, /*tp_compare*/ - 0, /*tp_repr*/ - 0, /*tp_as_number*/ - 0, /*tp_as_sequence*/ - 0, /*tp_as_mapping*/ - 0, /*tp_hash */ - 0, /*tp_call*/ - 0, /*tp_str*/ - 0, /*tp_getattro*/ - 0, /*tp_setattro*/ - 0, /*tp_as_buffer*/ - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ - "High-level Kafka Consumer\n" - "\n" - ".. py:function:: Consumer(**kwargs)\n" - "\n" - " Create new Consumer instance using provided configuration dict.\n" - "\n" - " Special configuration properties:\n" - " ``on_commit``: Optional callback will be called when a commit " - "request has succeeded or failed.\n" - "\n" - "\n" - ".. py:function:: on_commit(err, partitions)\n" - "\n" - " :param Consumer consumer: Consumer instance.\n" - " :param KafkaError err: Commit error object, or None on success.\n" - " :param list(TopicPartition) partitions: List of partitions with " - "their committed offsets or per-partition errors.\n" - "\n" - "\n", /*tp_doc*/ - (traverseproc)Consumer_traverse, /* tp_traverse */ - (inquiry)Consumer_clear, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - Consumer_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0, /* tp_alloc */ - Consumer_new /* tp_new */ -}; - - - static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *c_parts, void *opaque) { @@ -726,37 +662,100 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, -static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, - PyObject *kwargs) { - Handle *self; - char errstr[256]; - rd_kafka_conf_t *conf; +static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) { + Handle *self = (Handle *)selfobj; + char errstr[256]; + rd_kafka_conf_t *conf; - self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0); - if (!self) - return NULL; + if (self->rk) { + PyErr_SetString(PyExc_RuntimeError, + "Consumer already __init__:ialized"); + return -1; + } - if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self, - args, kwargs))) { - Py_DECREF(self); - return NULL; - } + if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self, + args, kwargs))) + return -1; /* Exception raised by ..conf_setup() */ - rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb); - rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb); + rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb); + rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb); - self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, - errstr, sizeof(errstr)); - if (!self->rk) { - cfl_PyErr_Format(rd_kafka_last_error(), - "Failed to create consumer: %s", errstr); - Py_DECREF(self); - return NULL; - } + self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, + errstr, sizeof(errstr)); + if (!self->rk) { + cfl_PyErr_Format(rd_kafka_last_error(), + "Failed to create consumer: %s", errstr); + rd_kafka_conf_destroy(conf); + return -1; + } + + rd_kafka_poll_set_consumer(self->rk); - rd_kafka_poll_set_consumer(self->rk); + return 0; +} - return (PyObject *)self; +static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs) { + return type->tp_alloc(type, 0); } +PyTypeObject ConsumerType = { + PyVarObject_HEAD_INIT(NULL, 0) + "cimpl.Consumer", /*tp_name*/ + sizeof(Handle), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)Consumer_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + "High-level Kafka Consumer\n" + "\n" + ".. py:function:: Consumer(**kwargs)\n" + "\n" + " Create new Consumer instance using provided configuration dict.\n" + "\n" + " Special configuration properties:\n" + " ``on_commit``: Optional callback will be called when a commit " + "request has succeeded or failed.\n" + "\n" + "\n" + ".. py:function:: on_commit(err, partitions)\n" + "\n" + " :param Consumer consumer: Consumer instance.\n" + " :param KafkaError err: Commit error object, or None on success.\n" + " :param list(TopicPartition) partitions: List of partitions with " + "their committed offsets or per-partition errors.\n" + "\n" + "\n", /*tp_doc*/ + (traverseproc)Consumer_traverse, /* tp_traverse */ + (inquiry)Consumer_clear, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + Consumer_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + Consumer_init, /* tp_init */ + 0, /* tp_alloc */ + Consumer_new /* tp_new */ +}; diff --git a/confluent_kafka/src/Producer.c b/confluent_kafka/src/Producer.c index a5b16bd8e..111d14132 100644 --- a/confluent_kafka/src/Producer.c +++ b/confluent_kafka/src/Producer.c @@ -426,10 +426,44 @@ static Py_ssize_t Producer__len__ (Handle *self) { static PySequenceMethods Producer_seq_methods = { (lenfunc)Producer__len__ /* sq_length */ }; - + + +static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) { + Handle *self = (Handle *)selfobj; + char errstr[256]; + rd_kafka_conf_t *conf; + + if (self->rk) { + PyErr_SetString(PyExc_RuntimeError, + "Producer already __init__:ialized"); + return -1; + } + + if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self, + args, kwargs))) + return -1; + + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + + self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, + errstr, sizeof(errstr)); + if (!self->rk) { + cfl_PyErr_Format(rd_kafka_last_error(), + "Failed to create producer: %s", errstr); + rd_kafka_conf_destroy(conf); + return -1; + } + + return 0; +} + static PyObject *Producer_new (PyTypeObject *type, PyObject *args, - PyObject *kwargs); + PyObject *kwargs) { + return type->tp_alloc(type, 0); +} + + PyTypeObject ProducerType = { PyVarObject_HEAD_INIT(NULL, 0) @@ -451,7 +485,8 @@ PyTypeObject ProducerType = { 0, /*tp_getattro*/ 0, /*tp_setattro*/ 0, /*tp_as_buffer*/ - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_HAVE_GC, /*tp_flags*/ "Asynchronous Kafka Producer\n" "\n" ".. py:function:: Producer(**kwargs)\n" @@ -478,42 +513,11 @@ PyTypeObject ProducerType = { 0, /* tp_descr_get */ 0, /* tp_descr_set */ 0, /* tp_dictoffset */ - 0, /* tp_init */ + Producer_init, /* tp_init */ 0, /* tp_alloc */ Producer_new /* tp_new */ }; -static PyObject *Producer_new (PyTypeObject *type, PyObject *args, - PyObject *kwargs) { - Handle *self; - char errstr[256]; - rd_kafka_conf_t *conf; - - self = (Handle *)ProducerType.tp_alloc(&ProducerType, 0); - if (!self) - return NULL; - - if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self, - args, kwargs))) { - Py_DECREF(self); - return NULL; - } - - rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); - - self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, - errstr, sizeof(errstr)); - if (!self->rk) { - cfl_PyErr_Format(rd_kafka_last_error(), - "Failed to create producer: %s", errstr); - Py_DECREF(self); - return NULL; - } - - return (PyObject *)self; -} - - diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 9293f6464..570060cb2 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -183,10 +183,10 @@ static PyTypeObject KafkaErrorType = { (hashfunc)KafkaError_hash, /*tp_hash */ 0, /*tp_call*/ 0, /*tp_str*/ - 0, /*tp_getattro*/ + PyObject_GenericGetAttr, /*tp_getattro*/ 0, /*tp_setattro*/ 0, /*tp_as_buffer*/ - Py_TPFLAGS_DEFAULT, /*tp_flags*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/ "Kafka error and event object\n" "\n" " The KafkaError class serves multiple purposes:\n" @@ -463,7 +463,8 @@ PyTypeObject MessageType = { 0, /*tp_getattro*/ 0, /*tp_setattro*/ 0, /*tp_as_buffer*/ - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_HAVE_GC, /*tp_flags*/ "The Message object represents either a single consumed or " "produced message, or an event (:py:func:`error()` is not None).\n" "\n" @@ -562,6 +563,16 @@ static int TopicPartition_clear (TopicPartition *self) { return 0; } +static void TopicPartition_setup (TopicPartition *self, const char *topic, + int partition, long long offset, + rd_kafka_resp_err_t err) { + self->topic = strdup(topic); + self->partition = partition; + self->offset = offset; + self->error = KafkaError_new_or_None(err, NULL); +} + + static void TopicPartition_dealloc (TopicPartition *self) { PyObject_GC_UnTrack(self); @@ -570,13 +581,9 @@ static void TopicPartition_dealloc (TopicPartition *self) { Py_TYPE(self)->tp_free((PyObject *)self); } -static PyObject *TopicPartition_new0 (const char *topic, int partition, - long long offset, - rd_kafka_resp_err_t err); - -static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args, - PyObject *kwargs) { +static int TopicPartition_init (PyObject *self, PyObject *args, + PyObject *kwargs) { const char *topic; int partition = RD_KAFKA_PARTITION_UA; long long offset = RD_KAFKA_OFFSET_INVALID; @@ -587,9 +594,19 @@ static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws, &topic, &partition, &offset)) - return NULL; + return -1; - return TopicPartition_new0(topic, partition, offset, 0); + TopicPartition_setup((TopicPartition *)self, + topic, partition, offset, 0); + + return 0; +} + + +static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs) { + PyObject *self = type->tp_alloc(type, 1); + return self; } @@ -706,10 +723,11 @@ static PyTypeObject TopicPartitionType = { (hashfunc)TopicPartition_hash, /*tp_hash */ 0, /*tp_call*/ 0, /*tp_str*/ - 0, /*tp_getattro*/ + PyObject_GenericGetAttr, /*tp_getattro*/ 0, /*tp_setattro*/ 0, /*tp_as_buffer*/ - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_HAVE_GC, /*tp_flags*/ "TopicPartition is a generic type to hold a single partition and " "various information about it.\n" "\n" @@ -740,9 +758,9 @@ static PyTypeObject TopicPartitionType = { 0, /* tp_descr_get */ 0, /* tp_descr_set */ 0, /* tp_dictoffset */ - 0, /* tp_init */ + TopicPartition_init, /* tp_init */ 0, /* tp_alloc */ - TopicPartition_new /* tp_new */ + TopicPartition_new /* tp_new */ }; /** @@ -753,15 +771,10 @@ static PyObject *TopicPartition_new0 (const char *topic, int partition, rd_kafka_resp_err_t err) { TopicPartition *self; - self = (TopicPartition *)TopicPartitionType.tp_alloc( - &TopicPartitionType, 0); - if (!self) - return NULL; + self = (TopicPartition *)TopicPartitionType.tp_new( + &TopicPartitionType, NULL, NULL); - self->topic = strdup(topic); - self->partition = partition; - self->offset = offset; - self->error = KafkaError_new_or_None(err, NULL); + TopicPartition_setup(self, topic, partition, offset, err); return (PyObject *)self; } @@ -1289,6 +1302,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, */ void CallState_begin (Handle *h, CallState *cs) { cs->thread_state = PyEval_SaveThread(); + assert(cs->thread_state != NULL); cs->crashed = 0; PyThread_set_key_value(h->tlskey, cs); } @@ -1315,6 +1329,7 @@ int CallState_end (Handle *h, CallState *cs) { CallState *CallState_get (Handle *h) { CallState *cs = PyThread_get_key_value(h->tlskey); assert(cs != NULL); + assert(cs->thread_state != NULL); PyEval_RestoreThread(cs->thread_state); cs->thread_state = NULL; return cs; diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 7125ca7d2..8a2585bd3 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -109,3 +109,14 @@ def commit_cb (cs, err, ps): assert e.args[0].code() == KafkaError._NO_OFFSET c.close() + + +def test_subclassing(): + class SubConsumer(Consumer): + def poll (self, somearg): + assert type(somearg) == str + super(SubConsumer, self).poll(timeout=0.0001) + + sc = SubConsumer({"group.id":"test", "session.timeout.ms": "90"}) + sc.poll("astring") + sc.close() diff --git a/tests/test_KafkaError.py b/tests/test_KafkaError.py index b3e9db062..4da88e6be 100644 --- a/tests/test_KafkaError.py +++ b/tests/test_KafkaError.py @@ -10,7 +10,6 @@ def error_cb (err): if err.code() == KafkaError._ALL_BROKERS_DOWN: global seen_all_brokers_down seen_all_brokers_down = True - def test_error_cb(): """ Test the error callback. """ @@ -28,3 +27,12 @@ def test_error_cb(): p.poll(1) assert seen_all_brokers_down + + +def test_subclassing(): + class MyExc(KafkaException): + def a_method (self): + return "yes" + err = MyExc() + assert err.a_method() == "yes" + assert isinstance(err, KafkaException) diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 6bc176754..9e09bc619 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -36,3 +36,24 @@ def on_delivery(err,msg): p.flush() + +def test_subclassing(): + class SubProducer(Producer): + def __init__ (self, conf, topic): + super(SubProducer, self).__init__(conf) + self.topic = topic + def produce_hi (self): + super(SubProducer, self).produce(self.topic, value='hi') + + sp = SubProducer(dict(), 'atopic') + assert type(sp) == SubProducer + + # Invalid config should fail + try: + sp = SubProducer({'should.fail': False}, 'mytopic') + except KafkaException: + pass + + sp = SubProducer({'log.thread.name': True}, 'mytopic') + sp.produce('someother', value='not hello') + sp.produce_hi() diff --git a/tests/test_TopicPartition.py b/tests/test_TopicPartition.py index b31335942..9f7f12b73 100644 --- a/tests/test_TopicPartition.py +++ b/tests/test_TopicPartition.py @@ -36,3 +36,17 @@ def test_hash(): tp2 = TopicPartition('somethingelse', 12) assert hash(tp1) != hash(tp2) + +def test_subclassing(): + class SubTopicPartition(TopicPartition): + def __init__(self, topic_part_str): + topic, part = topic_part_str.split(":") + super(SubTopicPartition, self).__init__(topic=topic, partition=int(part)) + + st = SubTopicPartition("topic1:0") + assert st.topic == "topic1" + assert st.partition == 0 + + st = SubTopicPartition("topic2:920") + assert st.topic == "topic2" + assert st.partition == 920