Skip to content

Commit 864a935

Browse files
committed
Consumer, Producer, TopicPartition classes are now sub-classable
1 parent 5aa3385 commit 864a935

File tree

7 files changed

+221
-149
lines changed

7 files changed

+221
-149
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 89 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ static void Consumer_dealloc (Handle *self) {
6767
CallState_end(self, &cs);
6868
}
6969

70-
Py_TYPE(self)->tp_free((PyObject *)self);
70+
Py_TYPE(self)->tp_free((PyObject *)self);
7171
}
7272

7373
static int Consumer_traverse (Handle *self,
@@ -550,70 +550,6 @@ static PyMethodDef Consumer_methods[] = {
550550
};
551551

552552

553-
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
554-
PyObject *kwargs);
555-
556-
PyTypeObject ConsumerType = {
557-
PyVarObject_HEAD_INIT(NULL, 0)
558-
"cimpl.Consumer", /*tp_name*/
559-
sizeof(Handle), /*tp_basicsize*/
560-
0, /*tp_itemsize*/
561-
(destructor)Consumer_dealloc, /*tp_dealloc*/
562-
0, /*tp_print*/
563-
0, /*tp_getattr*/
564-
0, /*tp_setattr*/
565-
0, /*tp_compare*/
566-
0, /*tp_repr*/
567-
0, /*tp_as_number*/
568-
0, /*tp_as_sequence*/
569-
0, /*tp_as_mapping*/
570-
0, /*tp_hash */
571-
0, /*tp_call*/
572-
0, /*tp_str*/
573-
0, /*tp_getattro*/
574-
0, /*tp_setattro*/
575-
0, /*tp_as_buffer*/
576-
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
577-
"High-level Kafka Consumer\n"
578-
"\n"
579-
".. py:function:: Consumer(**kwargs)\n"
580-
"\n"
581-
" Create new Consumer instance using provided configuration dict.\n"
582-
"\n"
583-
" Special configuration properties:\n"
584-
" ``on_commit``: Optional callback will be called when a commit "
585-
"request has succeeded or failed.\n"
586-
"\n"
587-
"\n"
588-
".. py:function:: on_commit(err, partitions)\n"
589-
"\n"
590-
" :param Consumer consumer: Consumer instance.\n"
591-
" :param KafkaError err: Commit error object, or None on success.\n"
592-
" :param list(TopicPartition) partitions: List of partitions with "
593-
"their committed offsets or per-partition errors.\n"
594-
"\n"
595-
"\n", /*tp_doc*/
596-
(traverseproc)Consumer_traverse, /* tp_traverse */
597-
(inquiry)Consumer_clear, /* tp_clear */
598-
0, /* tp_richcompare */
599-
0, /* tp_weaklistoffset */
600-
0, /* tp_iter */
601-
0, /* tp_iternext */
602-
Consumer_methods, /* tp_methods */
603-
0, /* tp_members */
604-
0, /* tp_getset */
605-
0, /* tp_base */
606-
0, /* tp_dict */
607-
0, /* tp_descr_get */
608-
0, /* tp_descr_set */
609-
0, /* tp_dictoffset */
610-
0, /* tp_init */
611-
0, /* tp_alloc */
612-
Consumer_new /* tp_new */
613-
};
614-
615-
616-
617553
static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
618554
rd_kafka_topic_partition_list_t *c_parts,
619555
void *opaque) {
@@ -726,37 +662,100 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
726662

727663

728664

729-
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
730-
PyObject *kwargs) {
731-
Handle *self;
732-
char errstr[256];
733-
rd_kafka_conf_t *conf;
665+
static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
666+
Handle *self = (Handle *)selfobj;
667+
char errstr[256];
668+
rd_kafka_conf_t *conf;
734669

735-
self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0);
736-
if (!self)
737-
return NULL;
670+
if (self->rk) {
671+
PyErr_SetString(PyExc_RuntimeError,
672+
"Consumer already __init__:ialized");
673+
return -1;
674+
}
738675

739-
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
740-
args, kwargs))) {
741-
Py_DECREF(self);
742-
return NULL;
743-
}
676+
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
677+
args, kwargs)))
678+
return -1; /* Exception raised by ..conf_setup() */
744679

745-
rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
746-
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);
680+
rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
681+
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);
747682

748-
self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
749-
errstr, sizeof(errstr));
750-
if (!self->rk) {
751-
cfl_PyErr_Format(rd_kafka_last_error(),
752-
"Failed to create consumer: %s", errstr);
753-
Py_DECREF(self);
754-
return NULL;
755-
}
683+
self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
684+
errstr, sizeof(errstr));
685+
if (!self->rk) {
686+
cfl_PyErr_Format(rd_kafka_last_error(),
687+
"Failed to create consumer: %s", errstr);
688+
rd_kafka_conf_destroy(conf);
689+
return -1;
690+
}
691+
692+
rd_kafka_poll_set_consumer(self->rk);
756693

757-
rd_kafka_poll_set_consumer(self->rk);
694+
return 0;
695+
}
758696

759-
return (PyObject *)self;
697+
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
698+
PyObject *kwargs) {
699+
return type->tp_alloc(type, 0);
760700
}
761701

762702

703+
PyTypeObject ConsumerType = {
704+
PyVarObject_HEAD_INIT(NULL, 0)
705+
"cimpl.Consumer", /*tp_name*/
706+
sizeof(Handle), /*tp_basicsize*/
707+
0, /*tp_itemsize*/
708+
(destructor)Consumer_dealloc, /*tp_dealloc*/
709+
0, /*tp_print*/
710+
0, /*tp_getattr*/
711+
0, /*tp_setattr*/
712+
0, /*tp_compare*/
713+
0, /*tp_repr*/
714+
0, /*tp_as_number*/
715+
0, /*tp_as_sequence*/
716+
0, /*tp_as_mapping*/
717+
0, /*tp_hash */
718+
0, /*tp_call*/
719+
0, /*tp_str*/
720+
0, /*tp_getattro*/
721+
0, /*tp_setattro*/
722+
0, /*tp_as_buffer*/
723+
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
724+
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
725+
"High-level Kafka Consumer\n"
726+
"\n"
727+
".. py:function:: Consumer(**kwargs)\n"
728+
"\n"
729+
" Create new Consumer instance using provided configuration dict.\n"
730+
"\n"
731+
" Special configuration properties:\n"
732+
" ``on_commit``: Optional callback will be called when a commit "
733+
"request has succeeded or failed.\n"
734+
"\n"
735+
"\n"
736+
".. py:function:: on_commit(err, partitions)\n"
737+
"\n"
738+
" :param Consumer consumer: Consumer instance.\n"
739+
" :param KafkaError err: Commit error object, or None on success.\n"
740+
" :param list(TopicPartition) partitions: List of partitions with "
741+
"their committed offsets or per-partition errors.\n"
742+
"\n"
743+
"\n", /*tp_doc*/
744+
(traverseproc)Consumer_traverse, /* tp_traverse */
745+
(inquiry)Consumer_clear, /* tp_clear */
746+
0, /* tp_richcompare */
747+
0, /* tp_weaklistoffset */
748+
0, /* tp_iter */
749+
0, /* tp_iternext */
750+
Consumer_methods, /* tp_methods */
751+
0, /* tp_members */
752+
0, /* tp_getset */
753+
0, /* tp_base */
754+
0, /* tp_dict */
755+
0, /* tp_descr_get */
756+
0, /* tp_descr_set */
757+
0, /* tp_dictoffset */
758+
Consumer_init, /* tp_init */
759+
0, /* tp_alloc */
760+
Consumer_new /* tp_new */
761+
};

confluent_kafka/src/Producer.c

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,44 @@ static Py_ssize_t Producer__len__ (Handle *self) {
426426
static PySequenceMethods Producer_seq_methods = {
427427
(lenfunc)Producer__len__ /* sq_length */
428428
};
429-
429+
430+
431+
static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
432+
Handle *self = (Handle *)selfobj;
433+
char errstr[256];
434+
rd_kafka_conf_t *conf;
435+
436+
if (self->rk) {
437+
PyErr_SetString(PyExc_RuntimeError,
438+
"Producer already __init__:ialized");
439+
return -1;
440+
}
441+
442+
if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
443+
args, kwargs)))
444+
return -1;
445+
446+
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
447+
448+
self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
449+
errstr, sizeof(errstr));
450+
if (!self->rk) {
451+
cfl_PyErr_Format(rd_kafka_last_error(),
452+
"Failed to create producer: %s", errstr);
453+
rd_kafka_conf_destroy(conf);
454+
return -1;
455+
}
456+
457+
return 0;
458+
}
459+
430460

431461
static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
432-
PyObject *kwargs);
462+
PyObject *kwargs) {
463+
return type->tp_alloc(type, 0);
464+
}
465+
466+
433467

434468
PyTypeObject ProducerType = {
435469
PyVarObject_HEAD_INIT(NULL, 0)
@@ -451,7 +485,8 @@ PyTypeObject ProducerType = {
451485
0, /*tp_getattro*/
452486
0, /*tp_setattro*/
453487
0, /*tp_as_buffer*/
454-
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
488+
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
489+
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
455490
"Asynchronous Kafka Producer\n"
456491
"\n"
457492
".. py:function:: Producer(**kwargs)\n"
@@ -478,42 +513,11 @@ PyTypeObject ProducerType = {
478513
0, /* tp_descr_get */
479514
0, /* tp_descr_set */
480515
0, /* tp_dictoffset */
481-
0, /* tp_init */
516+
Producer_init, /* tp_init */
482517
0, /* tp_alloc */
483518
Producer_new /* tp_new */
484519
};
485520

486521

487522

488-
static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
489-
PyObject *kwargs) {
490-
Handle *self;
491-
char errstr[256];
492-
rd_kafka_conf_t *conf;
493-
494-
self = (Handle *)ProducerType.tp_alloc(&ProducerType, 0);
495-
if (!self)
496-
return NULL;
497-
498-
if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
499-
args, kwargs))) {
500-
Py_DECREF(self);
501-
return NULL;
502-
}
503-
504-
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
505-
506-
self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
507-
errstr, sizeof(errstr));
508-
if (!self->rk) {
509-
cfl_PyErr_Format(rd_kafka_last_error(),
510-
"Failed to create producer: %s", errstr);
511-
Py_DECREF(self);
512-
return NULL;
513-
}
514-
515-
return (PyObject *)self;
516-
}
517-
518-
519523

0 commit comments

Comments
 (0)