Skip to content

Commit d12fad9

Browse files
authored
Merge pull request #15 from confluentinc/error_cb
Added error callback for propagating generic errors from librdkafka to app
2 parents cb0d0af + 353ac3f commit d12fad9

File tree

8 files changed

+285
-142
lines changed

8 files changed

+285
-142
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 62 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,26 @@
2828
****************************************************************************/
2929

3030

31-
static int Consumer_clear (Consumer *self) {
32-
if (self->on_assign) {
33-
Py_DECREF(self->on_assign);
34-
self->on_assign = NULL;
31+
static int Consumer_clear (Handle *self) {
32+
if (self->u.Consumer.on_assign) {
33+
Py_DECREF(self->u.Consumer.on_assign);
34+
self->u.Consumer.on_assign = NULL;
3535
}
36-
if (self->on_revoke) {
37-
Py_DECREF(self->on_revoke);
38-
self->on_revoke = NULL;
36+
if (self->u.Consumer.on_revoke) {
37+
Py_DECREF(self->u.Consumer.on_revoke);
38+
self->u.Consumer.on_revoke = NULL;
3939
}
40-
if (self->on_commit) {
41-
Py_DECREF(self->on_commit);
42-
self->on_commit = NULL;
40+
if (self->u.Consumer.on_commit) {
41+
Py_DECREF(self->u.Consumer.on_commit);
42+
self->u.Consumer.on_commit = NULL;
4343
}
44+
45+
Handle_clear(self);
46+
4447
return 0;
4548
}
4649

47-
static void Consumer_dealloc (Consumer *self) {
50+
static void Consumer_dealloc (Handle *self) {
4851
PyObject_GC_UnTrack(self);
4952

5053
Consumer_clear(self);
@@ -55,12 +58,17 @@ static void Consumer_dealloc (Consumer *self) {
5558
Py_TYPE(self)->tp_free((PyObject *)self);
5659
}
5760

58-
static int Consumer_traverse (Consumer *self,
59-
visitproc visit, void *arg) {
60-
if (self->on_assign)
61-
Py_VISIT(self->on_assign);
62-
if (self->on_revoke)
63-
Py_VISIT(self->on_revoke);
61+
static int Consumer_traverse (Handle *self,
62+
visitproc visit, void *arg) {
63+
if (self->u.Consumer.on_assign)
64+
Py_VISIT(self->u.Consumer.on_assign);
65+
if (self->u.Consumer.on_revoke)
66+
Py_VISIT(self->u.Consumer.on_revoke);
67+
if (self->u.Consumer.on_commit)
68+
Py_VISIT(self->u.Consumer.on_commit);
69+
70+
Handle_traverse(self, visit, arg);
71+
6472
return 0;
6573
}
6674

@@ -69,7 +77,7 @@ static int Consumer_traverse (Consumer *self,
6977

7078

7179

72-
static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
80+
static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
7381
PyObject *kwargs) {
7482

7583
rd_kafka_topic_partition_list_t *topics;
@@ -130,29 +138,29 @@ static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
130138
/*
131139
* Update rebalance callbacks
132140
*/
133-
if (self->on_assign) {
134-
Py_DECREF(self->on_assign);
135-
self->on_assign = NULL;
141+
if (self->u.Consumer.on_assign) {
142+
Py_DECREF(self->u.Consumer.on_assign);
143+
self->u.Consumer.on_assign = NULL;
136144
}
137145
if (on_assign) {
138-
self->on_assign = on_assign;
139-
Py_INCREF(self->on_assign);
146+
self->u.Consumer.on_assign = on_assign;
147+
Py_INCREF(self->u.Consumer.on_assign);
140148
}
141149

142-
if (self->on_revoke) {
143-
Py_DECREF(self->on_revoke);
144-
self->on_revoke = NULL;
150+
if (self->u.Consumer.on_revoke) {
151+
Py_DECREF(self->u.Consumer.on_revoke);
152+
self->u.Consumer.on_revoke = NULL;
145153
}
146154
if (on_revoke) {
147-
self->on_revoke = on_revoke;
148-
Py_INCREF(self->on_revoke);
155+
self->u.Consumer.on_revoke = on_revoke;
156+
Py_INCREF(self->u.Consumer.on_revoke);
149157
}
150158

151159
Py_RETURN_NONE;
152160
}
153161

154162

155-
static PyObject *Consumer_unsubscribe (Consumer *self,
163+
static PyObject *Consumer_unsubscribe (Handle *self,
156164
PyObject *ignore) {
157165

158166
rd_kafka_resp_err_t err;
@@ -169,15 +177,15 @@ static PyObject *Consumer_unsubscribe (Consumer *self,
169177
}
170178

171179

172-
static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
180+
static PyObject *Consumer_assign (Handle *self, PyObject *tlist) {
173181

174182
rd_kafka_topic_partition_list_t *c_parts;
175183
rd_kafka_resp_err_t err;
176184

177185
if (!(c_parts = py_to_c_parts(tlist)))
178186
return NULL;
179187

180-
self->rebalance_assigned++;
188+
self->u.Consumer.rebalance_assigned++;
181189

182190
err = rd_kafka_assign(self->rk, c_parts);
183191

@@ -194,11 +202,11 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
194202
}
195203

196204

197-
static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
205+
static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {
198206

199207
rd_kafka_resp_err_t err;
200208

201-
self->rebalance_assigned++;
209+
self->u.Consumer.rebalance_assigned++;
202210

203211
err = rd_kafka_assign(self->rk, NULL);
204212
if (err) {
@@ -213,7 +221,7 @@ static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
213221

214222

215223

216-
static PyObject *Consumer_commit (Consumer *self, PyObject *args,
224+
static PyObject *Consumer_commit (Handle *self, PyObject *args,
217225
PyObject *kwargs) {
218226

219227
rd_kafka_resp_err_t err;
@@ -281,7 +289,7 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args,
281289

282290

283291

284-
static PyObject *Consumer_committed (Consumer *self, PyObject *args,
292+
static PyObject *Consumer_committed (Handle *self, PyObject *args,
285293
PyObject *kwargs) {
286294

287295
PyObject *plist;
@@ -317,7 +325,7 @@ static PyObject *Consumer_committed (Consumer *self, PyObject *args,
317325
}
318326

319327

320-
static PyObject *Consumer_position (Consumer *self, PyObject *args,
328+
static PyObject *Consumer_position (Handle *self, PyObject *args,
321329
PyObject *kwargs) {
322330

323331
PyObject *plist;
@@ -352,7 +360,7 @@ static PyObject *Consumer_position (Consumer *self, PyObject *args,
352360

353361

354362

355-
static PyObject *Consumer_poll (Consumer *self, PyObject *args,
363+
static PyObject *Consumer_poll (Handle *self, PyObject *args,
356364
PyObject *kwargs) {
357365
double tmout = -1.0f;
358366
static char *kws[] = { "timeout", NULL };
@@ -384,7 +392,7 @@ static PyObject *Consumer_poll (Consumer *self, PyObject *args,
384392
}
385393

386394

387-
static PyObject *Consumer_close (Consumer *self, PyObject *ignore) {
395+
static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
388396
self->thread_state = PyEval_SaveThread();
389397
rd_kafka_consumer_close(self->rk);
390398
PyEval_RestoreThread(self->thread_state);
@@ -523,7 +531,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
523531
PyTypeObject ConsumerType = {
524532
PyVarObject_HEAD_INIT(NULL, 0)
525533
"cimpl.Consumer", /*tp_name*/
526-
sizeof(Consumer), /*tp_basicsize*/
534+
sizeof(Handle), /*tp_basicsize*/
527535
0, /*tp_itemsize*/
528536
(destructor)Consumer_dealloc, /*tp_dealloc*/
529537
0, /*tp_print*/
@@ -584,14 +592,16 @@ PyTypeObject ConsumerType = {
584592
static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
585593
rd_kafka_topic_partition_list_t *c_parts,
586594
void *opaque) {
587-
Consumer *self = opaque;
595+
Handle *self = opaque;
588596

589597
PyEval_RestoreThread(self->thread_state);
590598

591-
self->rebalance_assigned = 0;
599+
self->u.Consumer.rebalance_assigned = 0;
592600

593-
if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && self->on_assign) ||
594-
(err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && self->on_revoke)) {
601+
if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS &&
602+
self->u.Consumer.on_assign) ||
603+
(err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS &&
604+
self->u.Consumer.on_revoke)) {
595605
PyObject *parts;
596606
PyObject *args, *result;
597607

@@ -612,7 +622,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
612622

613623
result = PyObject_CallObject(
614624
err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
615-
self->on_assign : self->on_revoke, args);
625+
self->u.Consumer.on_assign :
626+
self->u.Consumer.on_revoke, args);
616627

617628
Py_DECREF(args);
618629

@@ -628,7 +639,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
628639
* to synchronize state, if the user did not do this from callback,
629640
* or there was no callback, or the callback failed, then we perform
630641
* that assign() call here instead. */
631-
if (!self->rebalance_assigned) {
642+
if (!self->u.Consumer.rebalance_assigned) {
632643
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
633644
rd_kafka_assign(rk, c_parts);
634645
else
@@ -642,10 +653,10 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
642653
static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
643654
rd_kafka_topic_partition_list_t *c_parts,
644655
void *opaque) {
645-
Consumer *self = opaque;
656+
Handle *self = opaque;
646657
PyObject *parts, *k_err, *args, *result;
647658

648-
if (!self->on_commit)
659+
if (!self->u.Consumer.on_commit)
649660
return;
650661

651662
PyEval_RestoreThread(self->thread_state);
@@ -669,7 +680,7 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
669680
return;
670681
}
671682

672-
result = PyObject_CallObject(self->on_commit, args);
683+
result = PyObject_CallObject(self->u.Consumer.on_commit, args);
673684

674685
Py_DECREF(args);
675686

@@ -687,16 +698,16 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
687698

688699
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
689700
PyObject *kwargs) {
690-
Consumer *self;
701+
Handle *self;
691702
char errstr[256];
692703
rd_kafka_conf_t *conf;
693704

694-
self = (Consumer *)ConsumerType.tp_alloc(&ConsumerType, 0);
705+
self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0);
695706
if (!self)
696707
return NULL;
697708

698709
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
699-
args, kwargs))) {
710+
args, kwargs))) {
700711
Py_DECREF(self);
701712
return NULL;
702713
}

0 commit comments

Comments
 (0)