Skip to content

Consumer, Producer, TopicPartition classes are now sub-classable #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 89 additions & 90 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static void Consumer_dealloc (Handle *self) {
CallState_end(self, &cs);
}

Py_TYPE(self)->tp_free((PyObject *)self);
Py_TYPE(self)->tp_free((PyObject *)self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to be getting mixed tabs & spaces...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file originally has tabs for indention, which sucks but I probably coded it with the wrong .emacs file in place.
New changes use space for indent instead.
We can do a forklift untabify at some point.

}

static int Consumer_traverse (Handle *self,
Expand Down Expand Up @@ -550,70 +550,6 @@ static PyMethodDef Consumer_methods[] = {
};


static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs);

PyTypeObject ConsumerType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cimpl.Consumer", /*tp_name*/
sizeof(Handle), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Consumer_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
"High-level Kafka Consumer\n"
"\n"
".. py:function:: Consumer(**kwargs)\n"
"\n"
" Create new Consumer instance using provided configuration dict.\n"
"\n"
" Special configuration properties:\n"
" ``on_commit``: Optional callback will be called when a commit "
"request has succeeded or failed.\n"
"\n"
"\n"
".. py:function:: on_commit(err, partitions)\n"
"\n"
" :param Consumer consumer: Consumer instance.\n"
" :param KafkaError err: Commit error object, or None on success.\n"
" :param list(TopicPartition) partitions: List of partitions with "
"their committed offsets or per-partition errors.\n"
"\n"
"\n", /*tp_doc*/
(traverseproc)Consumer_traverse, /* tp_traverse */
(inquiry)Consumer_clear, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
Consumer_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
0, /* tp_init */
0, /* tp_alloc */
Consumer_new /* tp_new */
};



static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
Expand Down Expand Up @@ -726,37 +662,100 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,



static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs) {
Handle *self;
char errstr[256];
rd_kafka_conf_t *conf;
static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
Handle *self = (Handle *)selfobj;
char errstr[256];
rd_kafka_conf_t *conf;

self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0);
if (!self)
return NULL;
if (self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer already __init__:ialized");
return -1;
}

if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
args, kwargs))) {
Py_DECREF(self);
return NULL;
}
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
args, kwargs)))
return -1; /* Exception raised by ..conf_setup() */

rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);
rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);

self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr));
if (!self->rk) {
cfl_PyErr_Format(rd_kafka_last_error(),
"Failed to create consumer: %s", errstr);
Py_DECREF(self);
return NULL;
}
self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr));
if (!self->rk) {
cfl_PyErr_Format(rd_kafka_last_error(),
"Failed to create consumer: %s", errstr);
rd_kafka_conf_destroy(conf);
return -1;
}

rd_kafka_poll_set_consumer(self->rk);

rd_kafka_poll_set_consumer(self->rk);
return 0;
}

return (PyObject *)self;
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs) {
return type->tp_alloc(type, 0);
}


PyTypeObject ConsumerType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cimpl.Consumer", /*tp_name*/
sizeof(Handle), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Consumer_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
"High-level Kafka Consumer\n"
"\n"
".. py:function:: Consumer(**kwargs)\n"
"\n"
" Create new Consumer instance using provided configuration dict.\n"
"\n"
" Special configuration properties:\n"
" ``on_commit``: Optional callback will be called when a commit "
"request has succeeded or failed.\n"
"\n"
"\n"
".. py:function:: on_commit(err, partitions)\n"
"\n"
" :param Consumer consumer: Consumer instance.\n"
" :param KafkaError err: Commit error object, or None on success.\n"
" :param list(TopicPartition) partitions: List of partitions with "
"their committed offsets or per-partition errors.\n"
"\n"
"\n", /*tp_doc*/
(traverseproc)Consumer_traverse, /* tp_traverse */
(inquiry)Consumer_clear, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
Consumer_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
Consumer_init, /* tp_init */
0, /* tp_alloc */
Consumer_new /* tp_new */
};
74 changes: 39 additions & 35 deletions confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,44 @@ static Py_ssize_t Producer__len__ (Handle *self) {
static PySequenceMethods Producer_seq_methods = {
(lenfunc)Producer__len__ /* sq_length */
};



static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
Handle *self = (Handle *)selfobj;
char errstr[256];
rd_kafka_conf_t *conf;

if (self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Producer already __init__:ialized");
return -1;
}

if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
args, kwargs)))
return -1;

rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
if (!self->rk) {
cfl_PyErr_Format(rd_kafka_last_error(),
"Failed to create producer: %s", errstr);
rd_kafka_conf_destroy(conf);
return -1;
}

return 0;
}


static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs);
PyObject *kwargs) {
return type->tp_alloc(type, 0);
}



PyTypeObject ProducerType = {
PyVarObject_HEAD_INIT(NULL, 0)
Expand All @@ -451,7 +485,8 @@ PyTypeObject ProducerType = {
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
"Asynchronous Kafka Producer\n"
"\n"
".. py:function:: Producer(**kwargs)\n"
Expand All @@ -478,42 +513,11 @@ PyTypeObject ProducerType = {
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
0, /* tp_init */
Producer_init, /* tp_init */
0, /* tp_alloc */
Producer_new /* tp_new */
};



static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs) {
Handle *self;
char errstr[256];
rd_kafka_conf_t *conf;

self = (Handle *)ProducerType.tp_alloc(&ProducerType, 0);
if (!self)
return NULL;

if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
args, kwargs))) {
Py_DECREF(self);
return NULL;
}

rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
if (!self->rk) {
cfl_PyErr_Format(rd_kafka_last_error(),
"Failed to create producer: %s", errstr);
Py_DECREF(self);
return NULL;
}

return (PyObject *)self;
}



Loading