Skip to content

Commit 543d94f

Browse files
committed
Consumer, Producer, TopicPartition classes are now sub-classable
1 parent 6214e57 commit 543d94f

File tree

7 files changed

+223
-148
lines changed

7 files changed

+223
-148
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 91 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -539,70 +539,6 @@ static PyMethodDef Consumer_methods[] = {
539539
};
540540

541541

542-
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
543-
PyObject *kwargs);
544-
545-
PyTypeObject ConsumerType = {
546-
PyVarObject_HEAD_INIT(NULL, 0)
547-
"cimpl.Consumer", /*tp_name*/
548-
sizeof(Handle), /*tp_basicsize*/
549-
0, /*tp_itemsize*/
550-
(destructor)Consumer_dealloc, /*tp_dealloc*/
551-
0, /*tp_print*/
552-
0, /*tp_getattr*/
553-
0, /*tp_setattr*/
554-
0, /*tp_compare*/
555-
0, /*tp_repr*/
556-
0, /*tp_as_number*/
557-
0, /*tp_as_sequence*/
558-
0, /*tp_as_mapping*/
559-
0, /*tp_hash */
560-
0, /*tp_call*/
561-
0, /*tp_str*/
562-
0, /*tp_getattro*/
563-
0, /*tp_setattro*/
564-
0, /*tp_as_buffer*/
565-
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
566-
"High-level Kafka Consumer\n"
567-
"\n"
568-
".. py:function:: Consumer(**kwargs)\n"
569-
"\n"
570-
" Create new Consumer instance using provided configuration dict.\n"
571-
"\n"
572-
" Special configuration properties:\n"
573-
" ``on_commit``: Optional callback will be called when a commit "
574-
"request has succeeded or failed.\n"
575-
"\n"
576-
"\n"
577-
".. py:function:: on_commit(err, partitions)\n"
578-
"\n"
579-
" :param Consumer consumer: Consumer instance.\n"
580-
" :param KafkaError err: Commit error object, or None on success.\n"
581-
" :param list(TopicPartition) partitions: List of partitions with "
582-
"their committed offsets or per-partition errors.\n"
583-
"\n"
584-
"\n", /*tp_doc*/
585-
(traverseproc)Consumer_traverse, /* tp_traverse */
586-
(inquiry)Consumer_clear, /* tp_clear */
587-
0, /* tp_richcompare */
588-
0, /* tp_weaklistoffset */
589-
0, /* tp_iter */
590-
0, /* tp_iternext */
591-
Consumer_methods, /* tp_methods */
592-
0, /* tp_members */
593-
0, /* tp_getset */
594-
0, /* tp_base */
595-
0, /* tp_dict */
596-
0, /* tp_descr_get */
597-
0, /* tp_descr_set */
598-
0, /* tp_dictoffset */
599-
0, /* tp_init */
600-
0, /* tp_alloc */
601-
Consumer_new /* tp_new */
602-
};
603-
604-
605-
606542
static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
607543
rd_kafka_topic_partition_list_t *c_parts,
608544
void *opaque) {
@@ -715,37 +651,103 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
715651

716652

717653

718-
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
719-
PyObject *kwargs) {
720-
Handle *self;
721-
char errstr[256];
722-
rd_kafka_conf_t *conf;
654+
static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
655+
Handle *self = (Handle *)selfobj;
656+
char errstr[256];
657+
rd_kafka_conf_t *conf;
723658

724-
self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0);
725-
if (!self)
726-
return NULL;
659+
if (self->rk) {
660+
PyErr_SetString(PyExc_RuntimeError,
661+
"Consumer already __init__:ialized");
662+
return -1;
663+
}
727664

728-
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
729-
args, kwargs))) {
730-
Py_DECREF(self);
731-
return NULL;
732-
}
665+
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
666+
args, kwargs))) {
667+
Py_DECREF(self);
668+
return -1;
669+
}
733670

734-
rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
735-
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);
671+
rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
672+
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);
736673

737-
self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
738-
errstr, sizeof(errstr));
739-
if (!self->rk) {
740-
cfl_PyErr_Format(rd_kafka_last_error(),
741-
"Failed to create consumer: %s", errstr);
742-
Py_DECREF(self);
743-
return NULL;
744-
}
674+
self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
675+
errstr, sizeof(errstr));
676+
if (!self->rk) {
677+
cfl_PyErr_Format(rd_kafka_last_error(),
678+
"Failed to create consumer: %s", errstr);
679+
rd_kafka_conf_destroy(conf);
680+
Py_DECREF(self);
681+
return -1;
682+
}
745683

746-
rd_kafka_poll_set_consumer(self->rk);
684+
rd_kafka_poll_set_consumer(self->rk);
747685

748-
return (PyObject *)self;
686+
return 0;
749687
}
750688

689+
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
690+
PyObject *kwargs) {
691+
return type->tp_alloc(type, 0);
692+
}
751693

694+
695+
PyTypeObject ConsumerType = {
696+
PyVarObject_HEAD_INIT(NULL, 0)
697+
"cimpl.Consumer", /*tp_name*/
698+
sizeof(Handle), /*tp_basicsize*/
699+
0, /*tp_itemsize*/
700+
(destructor)Consumer_dealloc, /*tp_dealloc*/
701+
0, /*tp_print*/
702+
0, /*tp_getattr*/
703+
0, /*tp_setattr*/
704+
0, /*tp_compare*/
705+
0, /*tp_repr*/
706+
0, /*tp_as_number*/
707+
0, /*tp_as_sequence*/
708+
0, /*tp_as_mapping*/
709+
0, /*tp_hash */
710+
0, /*tp_call*/
711+
0, /*tp_str*/
712+
0, /*tp_getattro*/
713+
0, /*tp_setattro*/
714+
0, /*tp_as_buffer*/
715+
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
716+
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
717+
"High-level Kafka Consumer\n"
718+
"\n"
719+
".. py:function:: Consumer(**kwargs)\n"
720+
"\n"
721+
" Create new Consumer instance using provided configuration dict.\n"
722+
"\n"
723+
" Special configuration properties:\n"
724+
" ``on_commit``: Optional callback will be called when a commit "
725+
"request has succeeded or failed.\n"
726+
"\n"
727+
"\n"
728+
".. py:function:: on_commit(err, partitions)\n"
729+
"\n"
730+
" :param Consumer consumer: Consumer instance.\n"
731+
" :param KafkaError err: Commit error object, or None on success.\n"
732+
" :param list(TopicPartition) partitions: List of partitions with "
733+
"their committed offsets or per-partition errors.\n"
734+
"\n"
735+
"\n", /*tp_doc*/
736+
(traverseproc)Consumer_traverse, /* tp_traverse */
737+
(inquiry)Consumer_clear, /* tp_clear */
738+
0, /* tp_richcompare */
739+
0, /* tp_weaklistoffset */
740+
0, /* tp_iter */
741+
0, /* tp_iternext */
742+
Consumer_methods, /* tp_methods */
743+
0, /* tp_members */
744+
0, /* tp_getset */
745+
0, /* tp_base */
746+
0, /* tp_dict */
747+
0, /* tp_descr_get */
748+
0, /* tp_descr_set */
749+
0, /* tp_dictoffset */
750+
Consumer_init, /* tp_init */
751+
0, /* tp_alloc */
752+
Consumer_new /* tp_new */
753+
};

confluent_kafka/src/Producer.c

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,44 @@ static Py_ssize_t Producer__len__ (Handle *self) {
426426
static PySequenceMethods Producer_seq_methods = {
427427
(lenfunc)Producer__len__ /* sq_length */
428428
};
429-
429+
430+
431+
static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
432+
Handle *self = (Handle *)selfobj;
433+
char errstr[256];
434+
rd_kafka_conf_t *conf;
435+
436+
if (self->rk) {
437+
PyErr_SetString(PyExc_RuntimeError,
438+
"Producer already __init__:ialized");
439+
return -1;
440+
}
441+
442+
if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
443+
args, kwargs)))
444+
return -1;
445+
446+
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
447+
448+
self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
449+
errstr, sizeof(errstr));
450+
if (!self->rk) {
451+
cfl_PyErr_Format(rd_kafka_last_error(),
452+
"Failed to create producer: %s", errstr);
453+
rd_kafka_conf_destroy(conf);
454+
return -1;
455+
}
456+
457+
return 0;
458+
}
459+
430460

431461
static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
432-
PyObject *kwargs);
462+
PyObject *kwargs) {
463+
return type->tp_alloc(type, 0);
464+
}
465+
466+
433467

434468
PyTypeObject ProducerType = {
435469
PyVarObject_HEAD_INIT(NULL, 0)
@@ -451,7 +485,8 @@ PyTypeObject ProducerType = {
451485
0, /*tp_getattro*/
452486
0, /*tp_setattro*/
453487
0, /*tp_as_buffer*/
454-
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
488+
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
489+
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
455490
"Asynchronous Kafka Producer\n"
456491
"\n"
457492
".. py:function:: Producer(**kwargs)\n"
@@ -478,42 +513,11 @@ PyTypeObject ProducerType = {
478513
0, /* tp_descr_get */
479514
0, /* tp_descr_set */
480515
0, /* tp_dictoffset */
481-
0, /* tp_init */
516+
Producer_init, /* tp_init */
482517
0, /* tp_alloc */
483518
Producer_new /* tp_new */
484519
};
485520

486521

487522

488-
static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
489-
PyObject *kwargs) {
490-
Handle *self;
491-
char errstr[256];
492-
rd_kafka_conf_t *conf;
493-
494-
self = (Handle *)ProducerType.tp_alloc(&ProducerType, 0);
495-
if (!self)
496-
return NULL;
497-
498-
if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
499-
args, kwargs))) {
500-
Py_DECREF(self);
501-
return NULL;
502-
}
503-
504-
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
505-
506-
self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
507-
errstr, sizeof(errstr));
508-
if (!self->rk) {
509-
cfl_PyErr_Format(rd_kafka_last_error(),
510-
"Failed to create producer: %s", errstr);
511-
Py_DECREF(self);
512-
return NULL;
513-
}
514-
515-
return (PyObject *)self;
516-
}
517-
518-
519523

0 commit comments

Comments
 (0)