Skip to content

Commit 2938212

Browse files
johnistanedenhill
authored andcommitted
Message header support (#299, @johnistan)
* first stab at header support * Initial support for message headers * Fix temp testing changes * Fix temp testing changes * Add method doc * add error handling and remove debug statements * responding to PR comments and fixed up tests * responding to PR comments and fixed up tests * Responding to requests * Fix c_headers destroy and added tests for set_headers * use new detach header api * moves detach headers to consumer code to keep callback consistent. Message headers are not supported in delivery callbacks. Adds documentation to report this limitation * fix consumer.consume header support * adds ifdef gaurds for older versions of librdkafka * update rd_kafka_producev to handle timestamp support but not header support * cleanup pep8 error * responded to PR comments
1 parent e2d196c commit 2938212

File tree

7 files changed

+250
-12
lines changed

7 files changed

+250
-12
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,11 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
885885
Py_RETURN_NONE;
886886

887887
msgobj = Message_new0(self, rkm);
888+
#ifdef RD_KAFKA_V_HEADERS
889+
// Have to deatch headers outside Message_new0 because it declares the
890+
// rk message as a const
891+
rd_kafka_message_detach_headers(rkm, &((Message *)msgobj)->c_headers);
892+
#endif
888893
rd_kafka_message_destroy(rkm);
889894

890895
return msgobj;
@@ -946,6 +951,11 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
946951

947952
for (i = 0; i < n; i++) {
948953
PyObject *msgobj = Message_new0(self, rkmessages[i]);
954+
#ifdef RD_KAFKA_V_HEADERS
955+
// Have to deatch headers outside Message_new0 because it declares the
956+
// rk message as a const
957+
rd_kafka_message_detach_headers(rkmessages[i], &((Message *)msgobj)->c_headers);
958+
#endif
949959
PyList_SET_ITEM(msglist, i, msgobj);
950960
rd_kafka_message_destroy(rkmessages[i]);
951961
}

confluent_kafka/src/Producer.c

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,11 @@ Producer_producev (Handle *self,
258258
const char *topic, int32_t partition,
259259
const void *value, size_t value_len,
260260
const void *key, size_t key_len,
261-
void *opaque, int64_t timestamp) {
261+
void *opaque, int64_t timestamp
262+
#if RD_KAFKA_V_HEADERS
263+
,rd_kafka_headers_t *headers
264+
#endif
265+
) {
262266

263267
return rd_kafka_producev(self->rk,
264268
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
@@ -268,6 +272,9 @@ Producer_producev (Handle *self,
268272
RD_KAFKA_V_VALUE((void *)value,
269273
(size_t)value_len),
270274
RD_KAFKA_V_TIMESTAMP(timestamp),
275+
#if RD_KAFKA_V_HEADERS
276+
RD_KAFKA_V_HEADERS(headers),
277+
#endif
271278
RD_KAFKA_V_OPAQUE(opaque),
272279
RD_KAFKA_V_END);
273280
}
@@ -302,27 +309,32 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
302309
const char *topic, *value = NULL, *key = NULL;
303310
int value_len = 0, key_len = 0;
304311
int partition = RD_KAFKA_PARTITION_UA;
305-
PyObject *dr_cb = NULL, *dr_cb2 = NULL, *partitioner_cb = NULL;
312+
PyObject *headers = NULL, *dr_cb = NULL, *dr_cb2 = NULL, *partitioner_cb = NULL;
306313
long long timestamp = 0;
307314
rd_kafka_resp_err_t err;
308315
struct Producer_msgstate *msgstate;
316+
#ifdef RD_KAFKA_V_HEADERS
317+
rd_kafka_headers_t *rd_headers = NULL;
318+
#endif
319+
309320
static char *kws[] = { "topic",
310321
"value",
311322
"key",
312323
"partition",
313324
"callback",
314325
"on_delivery", /* Alias */
315326
"partitioner",
316-
"timestamp",
327+
"timestamp",
328+
"headers",
317329
NULL };
318330

319331
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
320-
"s|z#z#iOOOL"
332+
"s|z#z#iOOOLO"
321333
, kws,
322334
&topic, &value, &value_len,
323335
&key, &key_len, &partition,
324336
&dr_cb, &dr_cb2, &partitioner_cb,
325-
&timestamp))
337+
&timestamp, &headers))
326338
return NULL;
327339

328340
#if !HAVE_PRODUCEV
@@ -337,6 +349,24 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
337349
}
338350
#endif
339351

352+
#ifndef RD_KAFKA_V_HEADERS
353+
if (headers) {
354+
PyErr_Format(PyExc_NotImplementedError,
355+
"Producer message headers requires "
356+
"confluent-kafka-python built for librdkafka "
357+
"version >=v0.11.4 (librdkafka runtime 0x%x, "
358+
"buildtime 0x%x)",
359+
rd_kafka_version(), RD_KAFKA_VERSION);
360+
return NULL;
361+
}
362+
#else
363+
if (headers) {
364+
if(!(rd_headers = py_headers_to_c(headers)))
365+
return NULL;
366+
}
367+
#endif
368+
369+
340370
if (dr_cb2 && !dr_cb) /* Alias */
341371
dr_cb = dr_cb2;
342372

@@ -345,6 +375,7 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
345375
if (!partitioner_cb || partitioner_cb == Py_None)
346376
partitioner_cb = self->u.Producer.partitioner_cb;
347377

378+
348379
/* Create msgstate if necessary, may return NULL if no callbacks
349380
* are wanted. */
350381
msgstate = Producer_msgstate_new(self, dr_cb, partitioner_cb);
@@ -354,13 +385,16 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
354385
err = Producer_producev(self, topic, partition,
355386
value, value_len,
356387
key, key_len,
357-
msgstate, timestamp);
388+
msgstate, timestamp
389+
#if RD_KAFKA_V_HEADERS
390+
,rd_headers
391+
#endif
392+
);
358393
#else
359394
err = Producer_produce0(self, topic, partition,
360395
value, value_len,
361396
key, key_len,
362397
msgstate);
363-
364398
#endif
365399

366400
if (err) {
@@ -458,6 +492,10 @@ static PyMethodDef Producer_methods[] = {
458492
"``callback`` (alias ``on_delivery``) argument to pass a function "
459493
"(or lambda) that will be called from :py:func:`poll()` when the "
460494
"message has been successfully delivered or permanently fails delivery.\n"
495+
"\n"
496+
" Currently message headers are not supported on the message returned to the "
497+
"callback. The ``msg.headers()`` will return None even if the original message "
498+
"had headers set.\n"
461499
"\n"
462500
" :param str topic: Topic to produce message to\n"
463501
" :param str|bytes value: Message payload\n"

confluent_kafka/src/confluent_kafka.c

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,34 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
333333
self->timestamp);
334334
}
335335

336+
static PyObject *Message_headers (Message *self, PyObject *ignore) {
337+
#ifdef RD_KAFKA_V_HEADERS
338+
if (self->headers) {
339+
Py_INCREF(self->headers);
340+
return self->headers;
341+
} else if (self->c_headers) {
342+
self->headers = c_headers_to_py(self->c_headers);
343+
rd_kafka_headers_destroy(self->c_headers);
344+
self->c_headers = NULL;
345+
Py_INCREF(self->headers);
346+
return self->headers;
347+
} else {
348+
Py_RETURN_NONE;
349+
}
350+
#else
351+
Py_RETURN_NONE;
352+
#endif
353+
}
354+
355+
static PyObject *Message_set_headers (Message *self, PyObject *new_headers) {
356+
if (self->headers)
357+
Py_DECREF(self->headers);
358+
self->headers = new_headers;
359+
Py_INCREF(self->headers);
360+
361+
Py_RETURN_NONE;
362+
}
363+
336364
static PyObject *Message_set_value (Message *self, PyObject *new_val) {
337365
if (self->value)
338366
Py_DECREF(self->value);
@@ -409,6 +437,21 @@ static PyMethodDef Message_methods[] = {
409437
" :rtype: (int, int)\n"
410438
"\n"
411439
},
440+
{ "headers", (PyCFunction)Message_headers, METH_NOARGS,
441+
" Retrieve the headers set on a message. Each header is a key value"
442+
"pair. Please note that header keys are ordered and can repeat.\n"
443+
"\n"
444+
" :returns: list of two-tuples, one (key, value) pair for each header.\n"
445+
" :rtype: [(str, bytes),...] or None.\n"
446+
"\n"
447+
},
448+
{ "set_headers", (PyCFunction)Message_set_headers, METH_O,
449+
" Set the field 'Message.headers' with new value.\n"
450+
" :param: object value: Message.headers.\n"
451+
" :returns: None.\n"
452+
" :rtype: None\n"
453+
"\n"
454+
},
412455
{ "set_value", (PyCFunction)Message_set_value, METH_O,
413456
" Set the field 'Message.value' with new value.\n"
414457
" :param: object value: Message.value.\n"
@@ -443,6 +486,16 @@ static int Message_clear (Message *self) {
443486
Py_DECREF(self->error);
444487
self->error = NULL;
445488
}
489+
if (self->headers) {
490+
Py_DECREF(self->headers);
491+
self->headers = NULL;
492+
}
493+
#ifdef RD_KAFKA_V_HEADERS
494+
if (self->c_headers){
495+
rd_kafka_headers_destroy(self->c_headers);
496+
self->c_headers = NULL;
497+
}
498+
#endif
446499
return 0;
447500
}
448501

@@ -464,6 +517,8 @@ static int Message_traverse (Message *self,
464517
Py_VISIT(self->key);
465518
if (self->error)
466519
Py_VISIT(self->error);
520+
if (self->headers)
521+
Py_VISIT(self->headers);
467522
return 0;
468523
}
469524

@@ -883,6 +938,81 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
883938
return c_parts;
884939
}
885940

941+
#ifdef RD_KAFKA_V_HEADERS
942+
/**
943+
* @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
944+
*
945+
* @returns The new Python list[(header_key, header_value),...] object.
946+
*/
947+
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist) {
948+
int i, len;
949+
rd_kafka_headers_t *rd_headers = NULL;
950+
rd_kafka_resp_err_t err;
951+
const char *header_key, *header_value = NULL;
952+
int header_key_len = 0, header_value_len = 0;
953+
954+
len = PyList_Size(headers_plist);
955+
rd_headers = rd_kafka_headers_new(len);
956+
957+
for (i = 0; i < len; i++) {
958+
959+
if(!PyArg_ParseTuple(PyList_GET_ITEM(headers_plist, i), "s#z#", &header_key,
960+
&header_key_len, &header_value, &header_value_len)){
961+
rd_kafka_headers_destroy(rd_headers);
962+
PyErr_SetString(PyExc_TypeError,
963+
"Headers are expected to be a tuple of (key, value)");
964+
return NULL;
965+
}
966+
967+
err = rd_kafka_header_add(rd_headers, header_key, header_key_len, header_value, header_value_len);
968+
if (err) {
969+
rd_kafka_headers_destroy(rd_headers);
970+
cfl_PyErr_Format(err,
971+
"Unable to create message headers: %s",
972+
rd_kafka_err2str(err));
973+
return NULL;
974+
}
975+
}
976+
return rd_headers;
977+
}
978+
979+
/**
980+
* @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
981+
*
982+
* @returns The new C headers on success or NULL on error.
983+
*/
984+
PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
985+
size_t idx = 0;
986+
size_t header_size = 0;
987+
const char *header_key;
988+
const void *header_value;
989+
size_t header_value_size;
990+
PyObject *header_list;
991+
992+
header_size = rd_kafka_header_cnt(headers);
993+
header_list = PyList_New(header_size);
994+
995+
while (!rd_kafka_header_get_all(headers, idx++,
996+
&header_key, &header_value, &header_value_size)) {
997+
// Create one (key, value) tuple for each header
998+
PyObject *header_tuple = PyTuple_New(2);
999+
PyTuple_SetItem(header_tuple, 0,
1000+
cfl_PyUnistr(_FromString(header_key))
1001+
);
1002+
1003+
if (header_value) {
1004+
PyTuple_SetItem(header_tuple, 1,
1005+
cfl_PyBin(_FromStringAndSize(header_value, header_value_size))
1006+
);
1007+
} else {
1008+
PyTuple_SetItem(header_tuple, 1, Py_None);
1009+
}
1010+
PyList_SET_ITEM(header_list, idx-1, header_tuple);
1011+
}
1012+
1013+
return header_list;
1014+
}
1015+
#endif
8861016

8871017
/****************************************************************************
8881018
*

confluent_kafka/src/confluent_kafka.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
267267
PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
268268
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
269269

270-
270+
#ifdef RD_KAFKA_V_HEADERS
271+
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist);
272+
PyObject *c_headers_to_py (rd_kafka_headers_t *headers);
273+
#endif
271274
/****************************************************************************
272275
*
273276
*
@@ -286,6 +289,10 @@ typedef struct {
286289
PyObject *topic;
287290
PyObject *value;
288291
PyObject *key;
292+
PyObject *headers;
293+
#ifdef RD_KAFKA_V_HEADERS
294+
rd_kafka_headers_t *c_headers;
295+
#endif
289296
PyObject *error;
290297
int32_t partition;
291298
int64_t offset;

docs/index.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ The Python bindings also provide some additional configuration properties:
101101
delivery result (success or failure).
102102
This property may also be set per-message by passing ``callback=callable``
103103
(or ``on_delivery=callable``) to the confluent_kafka.Producer.produce() function.
104+
Currently message headers are not supported on the message returned to the
105+
callback. The ``msg.headers()`` will return None even if the original message
106+
had headers set.
104107

105108
* ``on_commit(kafka.KafkaError, list(kafka.TopicPartition))`` (**Consumer**): Callback used to indicate success or failure
106109
of commit requests.

examples/integration_test.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,14 @@ def verify_producer():
117117
p = confluent_kafka.Producer(**conf)
118118
print('producer at %s' % p)
119119

120+
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1')]
121+
120122
# Produce some messages
121-
p.produce(topic, 'Hello Python!')
123+
p.produce(topic, 'Hello Python!', headers=headers)
124+
p.produce(topic, key='Just a key and headers', headers=headers)
122125
p.produce(topic, key='Just a key')
123126
p.produce(topic, partition=1, value='Strictly for partition 1',
124-
key='mykey')
127+
key='mykey', headers=headers)
125128

126129
# Produce more messages, now with delivery report callbacks in various forms.
127130
mydr = MyTestDr()
@@ -479,9 +482,16 @@ def print_wmark(consumer, parts):
479482
break
480483

481484
tstype, timestamp = msg.timestamp()
482-
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' %
485+
headers = msg.headers()
486+
if headers:
487+
example_header = headers
488+
489+
msg.set_headers([('foo', 'bar')])
490+
assert msg.headers() == [('foo', 'bar')]
491+
492+
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s headers=%s' %
483493
(msg.topic(), msg.partition(), msg.offset(),
484-
msg.key(), msg.value(), tstype, timestamp))
494+
msg.key(), msg.value(), tstype, timestamp, headers))
485495

486496
if first_msg is None:
487497
first_msg = msg
@@ -511,6 +521,9 @@ def print_wmark(consumer, parts):
511521
print('max_msgcnt %d reached' % msgcnt)
512522
break
513523

524+
assert example_header, "We should have received at least one header"
525+
assert example_header == [(u'foo1', 'bar'), (u'foo1', 'bar2'), (u'foo2', '1')]
526+
514527
# Get current assignment
515528
assignment = c.assignment()
516529

0 commit comments

Comments
 (0)