Skip to content

Commit 9982b55

Browse files
authored
Merge pull request #27 from confluentinc/threadsafe
Thread safety fixes
2 parents fac4543 + 20bba1c commit 9982b55

File tree

6 files changed

+209
-41
lines changed

6 files changed

+209
-41
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ __pycache__
1111
confluent?kafka.egg-info
1212
*.pyc
1313
.cache
14+
*.log
15+
confluent-kafka-0.*.*

confluent_kafka/src/Consumer.c

+23-18
Original file line numberDiff line numberDiff line change
@@ -366,20 +366,17 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
366366
static char *kws[] = { "timeout", NULL };
367367
rd_kafka_message_t *rkm;
368368
PyObject *msgobj;
369+
CallState cs;
369370

370371
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
371372
return NULL;
372373

373-
self->callback_crashed = 0;
374-
self->thread_state = PyEval_SaveThread();
374+
CallState_begin(self, &cs);
375375

376376
rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ?
377377
(int)(tmout * 1000.0f) : -1);
378378

379-
PyEval_RestoreThread(self->thread_state);
380-
self->thread_state = NULL;
381-
382-
if (self->callback_crashed)
379+
if (!CallState_end(self, &cs))
383380
return NULL;
384381

385382
if (!rkm)
@@ -393,9 +390,15 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
393390

394391

395392
static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
396-
self->thread_state = PyEval_SaveThread();
393+
CallState cs;
394+
395+
CallState_begin(self, &cs);
396+
397397
rd_kafka_consumer_close(self->rk);
398-
PyEval_RestoreThread(self->thread_state);
398+
399+
if (!CallState_end(self, &cs))
400+
return NULL;
401+
399402
Py_RETURN_NONE;
400403
}
401404

@@ -593,8 +596,9 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
593596
rd_kafka_topic_partition_list_t *c_parts,
594597
void *opaque) {
595598
Handle *self = opaque;
599+
CallState *cs;
596600

597-
PyEval_RestoreThread(self->thread_state);
601+
cs = CallState_get(self);
598602

599603
self->u.Consumer.rebalance_assigned = 0;
600604

@@ -615,8 +619,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
615619
if (!args) {
616620
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
617621
"Unable to build callback args");
618-
self->thread_state = PyEval_SaveThread();
619-
self->callback_crashed++;
622+
CallState_crash(cs);
623+
CallState_resume(cs);
620624
return;
621625
}
622626

@@ -630,7 +634,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
630634
if (result)
631635
Py_DECREF(result);
632636
else {
633-
self->callback_crashed++;
637+
CallState_crash(cs);
634638
rd_kafka_yield(rk);
635639
}
636640
}
@@ -646,7 +650,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
646650
rd_kafka_assign(rk, NULL);
647651
}
648652

649-
self->thread_state = PyEval_SaveThread();
653+
CallState_resume(cs);
650654
}
651655

652656

@@ -655,11 +659,12 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
655659
void *opaque) {
656660
Handle *self = opaque;
657661
PyObject *parts, *k_err, *args, *result;
662+
CallState *cs;
658663

659664
if (!self->u.Consumer.on_commit)
660665
return;
661666

662-
PyEval_RestoreThread(self->thread_state);
667+
cs = CallState_get(self);
663668

664669
/* Insantiate error object */
665670
k_err = KafkaError_new_or_None(err, NULL);
@@ -675,8 +680,8 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
675680
if (!args) {
676681
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
677682
"Unable to build callback args");
678-
self->thread_state = PyEval_SaveThread();
679-
self->callback_crashed++;
683+
CallState_crash(cs);
684+
CallState_resume(cs);
680685
return;
681686
}
682687

@@ -687,11 +692,11 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
687692
if (result)
688693
Py_DECREF(result);
689694
else {
690-
self->callback_crashed++;
695+
CallState_crash(cs);
691696
rd_kafka_yield(rk);
692697
}
693698

694-
self->thread_state = PyEval_SaveThread();
699+
CallState_resume(cs);
695700
}
696701

697702

confluent_kafka/src/Producer.c

+11-15
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,15 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
131131
void *opaque) {
132132
struct Producer_msgstate *msgstate = rkm->_private;
133133
Handle *self = opaque;
134+
CallState *cs;
134135
PyObject *args;
135136
PyObject *result;
136137
PyObject *msgobj;
137138

138139
if (!msgstate)
139140
return;
140141

141-
PyEval_RestoreThread(self->thread_state);
142+
cs = CallState_get(self);
142143

143144
if (!msgstate->dr_cb) {
144145
/* No callback defined */
@@ -156,7 +157,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
156157
if (!args) {
157158
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
158159
"Unable to build callback args");
159-
self->callback_crashed++;
160+
CallState_crash(cs);
160161
goto done;
161162
}
162163

@@ -166,13 +167,13 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
166167
if (result)
167168
Py_DECREF(result);
168169
else {
169-
self->callback_crashed++;
170+
CallState_crash(cs);
170171
rd_kafka_yield(rk);
171172
}
172173

173174
done:
174175
Producer_msgstate_destroy(msgstate);
175-
self->thread_state = PyEval_SaveThread();
176+
CallState_resume(cs);
176177
}
177178

178179

@@ -279,9 +280,9 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
279280
return NULL;
280281
}
281282

282-
if (!dr_cb)
283+
if (!dr_cb || dr_cb == Py_None)
283284
dr_cb = self->u.Producer.default_dr_cb;
284-
if (!partitioner_cb)
285+
if (!partitioner_cb || partitioner_cb == Py_None)
285286
partitioner_cb = self->u.Producer.partitioner_cb;
286287

287288
/* Create msgstate if necessary, may return NULL if no callbacks
@@ -321,20 +322,15 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
321322
*/
322323
static int Producer_poll0 (Handle *self, int tmout) {
323324
int r;
325+
CallState cs;
324326

325-
self->callback_crashed = 0;
326-
self->thread_state = PyEval_SaveThread();
327+
CallState_begin(self, &cs);
327328

328329
r = rd_kafka_poll(self->rk, tmout);
329330

330-
PyEval_RestoreThread(self->thread_state);
331-
self->thread_state = NULL;
332-
333-
if (PyErr_CheckSignals() == -1)
334-
return -1;
335-
336-
if (self->callback_crashed)
331+
if (!CallState_end(self, &cs)) {
337332
return -1;
333+
}
338334

339335
return r;
340336
}

confluent_kafka/src/confluent_kafka.c

+68-6
Original file line numberDiff line numberDiff line change
@@ -813,8 +813,9 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
813813
static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
814814
Handle *h = opaque;
815815
PyObject *eo, *result;
816+
CallState *cs;
816817

817-
PyEval_RestoreThread(h->thread_state);
818+
cs = CallState_get(h);
818819
if (!h->error_cb) {
819820
/* No callback defined */
820821
goto done;
@@ -827,12 +828,12 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
827828
if (result) {
828829
Py_DECREF(result);
829830
} else {
830-
h->callback_crashed++;
831+
CallState_crash(cs);
831832
rd_kafka_yield(h->rk);
832833
}
833834

834835
done:
835-
h->thread_state = PyEval_SaveThread();
836+
CallState_resume(cs);
836837
}
837838

838839

@@ -855,6 +856,8 @@ void Handle_clear (Handle *h) {
855856
if (h->error_cb) {
856857
Py_DECREF(h->error_cb);
857858
}
859+
860+
PyThread_delete_key(h->tlskey);
858861
}
859862

860863
/**
@@ -1110,10 +1113,14 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11101113
continue;
11111114

11121115
} else if (!strcmp(k, "error_cb")) {
1113-
if (h->error_cb)
1116+
if (h->error_cb) {
11141117
Py_DECREF(h->error_cb);
1115-
h->error_cb = vo;
1116-
Py_INCREF(h->error_cb);
1118+
h->error_cb = NULL;
1119+
}
1120+
if (vo != Py_None) {
1121+
h->error_cb = vo;
1122+
Py_INCREF(h->error_cb);
1123+
}
11171124
Py_DECREF(ks);
11181125
continue;
11191126
}
@@ -1172,12 +1179,67 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11721179

11731180
rd_kafka_conf_set_opaque(conf, h);
11741181

1182+
h->tlskey = PyThread_create_key();
1183+
11751184
return conf;
11761185
}
11771186

11781187

11791188

11801189

1190+
/**
1191+
* @brief Initialiase a CallState and unlock the GIL prior to a
1192+
* possibly blocking external call.
1193+
*/
1194+
void CallState_begin (Handle *h, CallState *cs) {
1195+
cs->thread_state = PyEval_SaveThread();
1196+
cs->crashed = 0;
1197+
PyThread_set_key_value(h->tlskey, cs);
1198+
}
1199+
1200+
/**
1201+
* @brief Relock the GIL after external call is done.
1202+
* @returns 0 if a Python signal was raised or a callback crashed, else 1.
1203+
*/
1204+
int CallState_end (Handle *h, CallState *cs) {
1205+
PyThread_delete_key_value(h->tlskey);
1206+
1207+
PyEval_RestoreThread(cs->thread_state);
1208+
1209+
if (PyErr_CheckSignals() == -1 || cs->crashed)
1210+
return 0;
1211+
1212+
return 1;
1213+
}
1214+
1215+
1216+
/**
1217+
* @brief Get the current thread's CallState and re-locks the GIL.
1218+
*/
1219+
CallState *CallState_get (Handle *h) {
1220+
CallState *cs = PyThread_get_key_value(h->tlskey);
1221+
assert(cs != NULL);
1222+
PyEval_RestoreThread(cs->thread_state);
1223+
cs->thread_state = NULL;
1224+
return cs;
1225+
}
1226+
1227+
/**
1228+
* @brief Un-locks the GIL to resume blocking external call.
1229+
*/
1230+
void CallState_resume (CallState *cs) {
1231+
assert(cs->thread_state == NULL);
1232+
cs->thread_state = PyEval_SaveThread();
1233+
}
1234+
1235+
/**
1236+
* @brief Indicate that call crashed.
1237+
*/
1238+
void CallState_crash (CallState *cs) {
1239+
cs->crashed++;
1240+
}
1241+
1242+
11811243

11821244
/****************************************************************************
11831245
*

confluent_kafka/src/confluent_kafka.h

+36-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <Python.h>
1818
#include <structmember.h>
19+
#include <pythread.h>
1920

2021
#include <librdkafka/rdkafka.h>
2122

@@ -112,9 +113,8 @@ PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str);
112113
typedef struct {
113114
PyObject_HEAD
114115
rd_kafka_t *rk;
115-
int callback_crashed;
116-
PyThreadState *thread_state;
117116
PyObject *error_cb;
117+
int tlskey; /* Thread-Local-Storage key */
118118

119119
union {
120120
/**
@@ -147,6 +147,40 @@ void Handle_clear (Handle *h);
147147
int Handle_traverse (Handle *h, visitproc visit, void *arg);
148148

149149

150+
/**
151+
* @brief Current thread's state for "blocking" calls to librdkafka.
152+
*/
153+
typedef struct {
154+
PyThreadState *thread_state;
155+
int crashed; /* Callback crashed */
156+
} CallState;
157+
158+
/**
159+
* @brief Initialiase a CallState and unlock the GIL prior to a
160+
* possibly blocking external call.
161+
*/
162+
void CallState_begin (Handle *h, CallState *cs);
163+
/**
164+
* @brief Relock the GIL after external call is done, remove TLS state.
165+
* @returns 0 if a Python signal was raised or a callback crashed, else 1.
166+
*/
167+
int CallState_end (Handle *h, CallState *cs);
168+
169+
/**
170+
* @brief Get the current thread's CallState and re-locks the GIL.
171+
*/
172+
CallState *CallState_get (Handle *h);
173+
/**
174+
* @brief Un-locks the GIL to resume blocking external call.
175+
*/
176+
void CallState_resume (CallState *cs);
177+
178+
/**
179+
* @brief Indicate that call crashed.
180+
*/
181+
void CallState_crash (CallState *cs);
182+
183+
150184
/****************************************************************************
151185
*
152186
*

0 commit comments

Comments
 (0)