Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ __pycache__
confluent?kafka.egg-info
*.pyc
.cache
*.log
confluent-kafka-0.*.*
41 changes: 23 additions & 18 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,20 +366,17 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
static char *kws[] = { "timeout", NULL };
rd_kafka_message_t *rkm;
PyObject *msgobj;
CallState cs;

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
return NULL;

self->callback_crashed = 0;
self->thread_state = PyEval_SaveThread();
CallState_begin(self, &cs);

rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ?
(int)(tmout * 1000.0f) : -1);

PyEval_RestoreThread(self->thread_state);
self->thread_state = NULL;

if (self->callback_crashed)
if (!CallState_end(self, &cs))
return NULL;

if (!rkm)
Expand All @@ -393,9 +390,15 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,


static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
self->thread_state = PyEval_SaveThread();
CallState cs;

CallState_begin(self, &cs);

rd_kafka_consumer_close(self->rk);
PyEval_RestoreThread(self->thread_state);

if (!CallState_end(self, &cs))
return NULL;

Py_RETURN_NONE;
}

Expand Down Expand Up @@ -593,8 +596,9 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
Handle *self = opaque;
CallState *cs;

PyEval_RestoreThread(self->thread_state);
cs = CallState_get(self);

self->u.Consumer.rebalance_assigned = 0;

Expand All @@ -615,8 +619,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
self->thread_state = PyEval_SaveThread();
self->callback_crashed++;
CallState_crash(cs);
CallState_resume(cs);
return;
}

Expand All @@ -630,7 +634,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (result)
Py_DECREF(result);
else {
self->callback_crashed++;
CallState_crash(cs);
rd_kafka_yield(rk);
}
}
Expand All @@ -646,7 +650,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_assign(rk, NULL);
}

self->thread_state = PyEval_SaveThread();
CallState_resume(cs);
}


Expand All @@ -655,11 +659,12 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
void *opaque) {
Handle *self = opaque;
PyObject *parts, *k_err, *args, *result;
CallState *cs;

if (!self->u.Consumer.on_commit)
return;

PyEval_RestoreThread(self->thread_state);
cs = CallState_get(self);

/* Insantiate error object */
k_err = KafkaError_new_or_None(err, NULL);
Expand All @@ -675,8 +680,8 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
self->thread_state = PyEval_SaveThread();
self->callback_crashed++;
CallState_crash(cs);
CallState_resume(cs);
return;
}

Expand All @@ -687,11 +692,11 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (result)
Py_DECREF(result);
else {
self->callback_crashed++;
CallState_crash(cs);
rd_kafka_yield(rk);
}

self->thread_state = PyEval_SaveThread();
CallState_resume(cs);
}


Expand Down
26 changes: 11 additions & 15 deletions confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,15 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
void *opaque) {
struct Producer_msgstate *msgstate = rkm->_private;
Handle *self = opaque;
CallState *cs;
PyObject *args;
PyObject *result;
PyObject *msgobj;

if (!msgstate)
return;

PyEval_RestoreThread(self->thread_state);
cs = CallState_get(self);

if (!msgstate->dr_cb) {
/* No callback defined */
Expand All @@ -156,7 +157,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
self->callback_crashed++;
CallState_crash(cs);
goto done;
}

Expand All @@ -166,13 +167,13 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
if (result)
Py_DECREF(result);
else {
self->callback_crashed++;
CallState_crash(cs);
rd_kafka_yield(rk);
}

done:
Producer_msgstate_destroy(msgstate);
self->thread_state = PyEval_SaveThread();
CallState_resume(cs);
}


Expand Down Expand Up @@ -279,9 +280,9 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
return NULL;
}

if (!dr_cb)
if (!dr_cb || dr_cb == Py_None)
dr_cb = self->u.Producer.default_dr_cb;
if (!partitioner_cb)
if (!partitioner_cb || partitioner_cb == Py_None)
partitioner_cb = self->u.Producer.partitioner_cb;

/* Create msgstate if necessary, may return NULL if no callbacks
Expand Down Expand Up @@ -321,20 +322,15 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
*/
static int Producer_poll0 (Handle *self, int tmout) {
int r;
CallState cs;

self->callback_crashed = 0;
self->thread_state = PyEval_SaveThread();
CallState_begin(self, &cs);

r = rd_kafka_poll(self->rk, tmout);

PyEval_RestoreThread(self->thread_state);
self->thread_state = NULL;

if (PyErr_CheckSignals() == -1)
return -1;

if (self->callback_crashed)
if (!CallState_end(self, &cs)) {
return -1;
}

return r;
}
Expand Down
74 changes: 68 additions & 6 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,9 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
Handle *h = opaque;
PyObject *eo, *result;
CallState *cs;

PyEval_RestoreThread(h->thread_state);
cs = CallState_get(h);
if (!h->error_cb) {
/* No callback defined */
goto done;
Expand All @@ -827,12 +828,12 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
if (result) {
Py_DECREF(result);
} else {
h->callback_crashed++;
CallState_crash(cs);
rd_kafka_yield(h->rk);
}

done:
h->thread_state = PyEval_SaveThread();
CallState_resume(cs);
}


Expand All @@ -855,6 +856,8 @@ void Handle_clear (Handle *h) {
if (h->error_cb) {
Py_DECREF(h->error_cb);
}

PyThread_delete_key(h->tlskey);
}

/**
Expand Down Expand Up @@ -1110,10 +1113,14 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
continue;

} else if (!strcmp(k, "error_cb")) {
if (h->error_cb)
if (h->error_cb) {
Py_DECREF(h->error_cb);
h->error_cb = vo;
Py_INCREF(h->error_cb);
h->error_cb = NULL;
}
if (vo != Py_None) {
h->error_cb = vo;
Py_INCREF(h->error_cb);
}
Py_DECREF(ks);
continue;
}
Expand Down Expand Up @@ -1172,12 +1179,67 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,

rd_kafka_conf_set_opaque(conf, h);

h->tlskey = PyThread_create_key();

return conf;
}




/**
* @brief Initialiase a CallState and unlock the GIL prior to a
* possibly blocking external call.
*/
void CallState_begin (Handle *h, CallState *cs) {
cs->thread_state = PyEval_SaveThread();
cs->crashed = 0;
PyThread_set_key_value(h->tlskey, cs);
}

/**
* @brief Relock the GIL after external call is done.
* @returns 0 if a Python signal was raised or a callback crashed, else 1.
*/
int CallState_end (Handle *h, CallState *cs) {
PyThread_delete_key_value(h->tlskey);

PyEval_RestoreThread(cs->thread_state);

if (PyErr_CheckSignals() == -1 || cs->crashed)
return 0;

return 1;
}


/**
* @brief Get the current thread's CallState and re-locks the GIL.
*/
CallState *CallState_get (Handle *h) {
CallState *cs = PyThread_get_key_value(h->tlskey);
assert(cs != NULL);
PyEval_RestoreThread(cs->thread_state);
cs->thread_state = NULL;
return cs;
}

/**
* @brief Un-locks the GIL to resume blocking external call.
*/
void CallState_resume (CallState *cs) {
assert(cs->thread_state == NULL);
cs->thread_state = PyEval_SaveThread();
}

/**
* @brief Indicate that call crashed.
*/
void CallState_crash (CallState *cs) {
cs->crashed++;
}



/****************************************************************************
*
Expand Down
38 changes: 36 additions & 2 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Python.h>
#include <structmember.h>
#include <pythread.h>

#include <librdkafka/rdkafka.h>

Expand Down Expand Up @@ -112,9 +113,8 @@ PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str);
typedef struct {
PyObject_HEAD
rd_kafka_t *rk;
int callback_crashed;
PyThreadState *thread_state;
PyObject *error_cb;
int tlskey; /* Thread-Local-Storage key */

union {
/**
Expand Down Expand Up @@ -147,6 +147,40 @@ void Handle_clear (Handle *h);
int Handle_traverse (Handle *h, visitproc visit, void *arg);


/**
* @brief Current thread's state for "blocking" calls to librdkafka.
*/
typedef struct {
PyThreadState *thread_state;
int crashed; /* Callback crashed */
} CallState;

/**
* @brief Initialiase a CallState and unlock the GIL prior to a
* possibly blocking external call.
*/
void CallState_begin (Handle *h, CallState *cs);
/**
* @brief Relock the GIL after external call is done, remove TLS state.
* @returns 0 if a Python signal was raised or a callback crashed, else 1.
*/
int CallState_end (Handle *h, CallState *cs);

/**
* @brief Get the current thread's CallState and re-locks the GIL.
*/
CallState *CallState_get (Handle *h);
/**
* @brief Un-locks the GIL to resume blocking external call.
*/
void CallState_resume (CallState *cs);

/**
* @brief Indicate that call crashed.
*/
void CallState_crash (CallState *cs);


/****************************************************************************
*
*
Expand Down
Loading