Skip to content

Commit 9c53580

Browse files
committed
Add support for passing librdkafka logs to the standard logging module (#148)
1 parent b2bf9bf commit 9c53580

File tree

7 files changed

+163
-15
lines changed

7 files changed

+163
-15
lines changed

confluent_kafka/src/Consumer.c

+5
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,11 @@ static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
13631363
return -1;
13641364
}
13651365

1366+
/* Forward log messages to main queue which is then forwarded
1367+
* to the consumer queue */
1368+
if (self->logger)
1369+
rd_kafka_set_log_queue(self->rk, NULL);
1370+
13661371
rd_kafka_poll_set_consumer(self->rk);
13671372

13681373
self->u.Consumer.rkqu = rd_kafka_queue_get_consumer(self->rk);

confluent_kafka/src/Producer.c

+4
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,10 @@ static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
588588
return -1;
589589
}
590590

591+
/* Forward log messages to poll queue */
592+
if (self->logger)
593+
rd_kafka_set_log_queue(self->rk, NULL);
594+
591595
return 0;
592596
}
593597

confluent_kafka/src/confluent_kafka.c

+56-1
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,39 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
10761076
return 0;
10771077
}
10781078

1079+
static void log_cb (const rd_kafka_t *rk, int level,
1080+
const char *fac, const char *buf) {
1081+
Handle *h = rd_kafka_opaque(rk);
1082+
PyObject *result;
1083+
CallState *cs;
1084+
static const int level_map[8] = {
1085+
/* Map syslog levels to python logging levels */
1086+
[0] = 50, /* LOG_EMERG -> logging.CRITICAL */
1087+
[1] = 50, /* LOG_ALERT -> logging.CRITICAL */
1088+
[2] = 50, /* LOG_CRIT -> logging.CRITICAL */
1089+
[3] = 40, /* LOG_ERR -> logging.ERROR */
1090+
[4] = 30, /* LOG_WARNING -> logging.WARNING */
1091+
[5] = 20, /* LOG_NOTICE -> logging.INFO */
1092+
[6] = 20, /* LOG_INFO -> logging.INFO */
1093+
[7] = 10, /* LOG_DEBUG -> logging.DEBUG */
1094+
};
1095+
1096+
cs = CallState_get(h);
1097+
result = PyObject_CallMethod(h->logger, "log", "issss",
1098+
level_map[level],
1099+
"%s [%s] %s",
1100+
fac, rd_kafka_name(rk), buf);
1101+
1102+
if (result)
1103+
Py_DECREF(result);
1104+
else {
1105+
CallState_crash(cs);
1106+
rd_kafka_yield(h->rk);
1107+
}
1108+
1109+
CallState_resume(cs);
1110+
}
1111+
10791112
/****************************************************************************
10801113
*
10811114
*
@@ -1098,6 +1131,8 @@ void Handle_clear (Handle *h) {
10981131
if (h->stats_cb)
10991132
Py_DECREF(h->stats_cb);
11001133

1134+
Py_XDECREF(h->logger);
1135+
11011136
if (h->initiated)
11021137
PyThread_delete_key(h->tlskey);
11031138
}
@@ -1443,7 +1478,20 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
14431478
Py_XDECREF(ks8);
14441479
Py_DECREF(ks);
14451480
continue;
1446-
}
1481+
} else if (!strcmp(k, "logger")) {
1482+
if (h->logger) {
1483+
Py_DECREF(h->logger);
1484+
h->logger = NULL;
1485+
}
1486+
1487+
if (vo != Py_None) {
1488+
h->logger = vo;
1489+
Py_INCREF(h->logger);
1490+
}
1491+
Py_XDECREF(ks8);
1492+
Py_DECREF(ks);
1493+
continue;
1494+
}
14471495

14481496
/* Special handling for certain config keys. */
14491497
if (ktype == RD_KAFKA_PRODUCER)
@@ -1509,6 +1557,13 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15091557
if (h->stats_cb)
15101558
rd_kafka_conf_set_stats_cb(conf, stats_cb);
15111559

1560+
if (h->logger) {
1561+
/* Write logs to log queue (which is forwarded
1562+
* to the polled queue in the Producer/Consumer constructors) */
1563+
rd_kafka_conf_set(conf, "log.queue", "true", NULL, 0);
1564+
rd_kafka_conf_set_log_cb(conf, log_cb);
1565+
}
1566+
15121567
rd_kafka_topic_conf_set_opaque(tconf, h);
15131568
rd_kafka_conf_set_default_topic_conf(conf, tconf);
15141569

confluent_kafka/src/confluent_kafka.h

+2
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ typedef struct {
163163
int tlskey; /* Thread-Local-Storage key */
164164
rd_kafka_type_t type; /* Producer or consumer */
165165

166+
PyObject *logger;
167+
166168
union {
167169
/**
168170
* Producer

docs/index.rst

+9-13
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ providing a dict of configuration properties to the instance constructor, e.g.::
7777
'group.id': 'mygroup', 'session.timeout.ms': 6000,
7878
'on_commit': my_commit_callback,
7979
'default.topic.config': {'auto.offset.reset': 'smallest'}}
80-
consumer = confluent_kafka.Consumer(**conf)
80+
consumer = confluent_kafka.Consumer(conf)
8181

8282
The supported configuration values are dictated by the underlying
8383
librdkafka C library. For the full range of configuration properties
@@ -108,16 +108,12 @@ The Python bindings also provide some additional configuration properties:
108108
* ``on_commit(kafka.KafkaError, list(kafka.TopicPartition))`` (**Consumer**): Callback used to indicate success or failure
109109
of commit requests.
110110

111-
Changelog
112-
=========
113-
114-
Version 3.0.1
115-
^^^^^^^^^^^^^
116-
117-
* `PR-3 <https://github.com/confluentinc/confluent-kafka-python/pull/3>`_ - Add /usr/local/lib to library_dirs in setup
118-
* `PR-4 <https://github.com/confluentinc/confluent-kafka-python/pull/4>`_ - Py3: use bytes for Message payload and key
119-
* `PR-5 <https://github.com/confluentinc/confluent-kafka-python/pull/5>`_ - Removed hard coded c extentions lib/include paths
120-
* `PR-9 <https://github.com/confluentinc/confluent-kafka-python/pull/9>`_ - Use consistent syntax highlighting (e.g. prefix commands with `$`)
121-
* `PR-17 <https://github.com/confluentinc/confluent-kafka-python/pull/17>`_ - Version bump to 0.9.1.2
122-
111+
* ``logger=logging.Handler`` kwarg: forward logs from the Kafka client to the
112+
provided ``logging.Handler`` instance.
113+
To avoid spontaneous calls from non-Python threads the log messages
114+
will only be forwarded when ``client.poll()`` is called.
123115

116+
mylogger = logging.getLogger()
117+
mylogger.addHandler(logging.StreamHandler())
118+
producer = confluent_kafka.Producer({'bootstrap.servers': 'mybroker.com'},
119+
logger=mylogger)

examples/consumer.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import sys
2323
import getopt
2424
import json
25+
import logging
2526
from pprint import pformat
2627

2728

@@ -70,8 +71,16 @@ def print_usage_and_exit(program_name):
7071
conf['stats_cb'] = stats_cb
7172
conf['statistics.interval.ms'] = int(opt[1])
7273

74+
# Create logger for consumer (logs will be emitted when poll() is called)
75+
logger = logging.getLogger('consumer')
76+
logger.setLevel(logging.DEBUG)
77+
handler = logging.StreamHandler()
78+
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
79+
logger.addHandler(handler)
80+
7381
# Create Consumer instance
74-
c = Consumer(**conf)
82+
# Hint: try debug='fetch' to generate some log messages
83+
c = Consumer(conf, logger=logger)
7584

7685
def print_assignment(consumer, partitions):
7786
print('Assignment:', partitions)

tests/test_log.py

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#!/usr/bin/env python
2+
3+
import confluent_kafka
4+
import logging
5+
6+
7+
class CountingFilter(logging.Filter):
8+
def __init__(self, name):
9+
self.name = name
10+
self.cnt = 0
11+
12+
def filter(self, record):
13+
print(self.name, record.getMessage())
14+
self.cnt += 1
15+
print(record)
16+
17+
18+
def test_logging_consumer():
19+
""" Tests that logging works """
20+
21+
logger = logging.getLogger('consumer')
22+
logger.setLevel(logging.DEBUG)
23+
f = CountingFilter('consumer')
24+
logger.addFilter(f)
25+
26+
kc = confluent_kafka.Consumer({'group.id': 'test',
27+
'debug': 'all'},
28+
logger=logger)
29+
while f.cnt == 0:
30+
kc.poll(timeout=0.5)
31+
32+
print('%s: %d log messages seen' % (f.name, f.cnt))
33+
34+
kc.close()
35+
36+
37+
def test_logging_producer():
38+
""" Tests that logging works """
39+
40+
logger = logging.getLogger('producer')
41+
logger.setLevel(logging.DEBUG)
42+
f = CountingFilter('producer')
43+
logger.addFilter(f)
44+
45+
p = confluent_kafka.Producer({'debug': 'all'}, logger=logger)
46+
47+
while f.cnt == 0:
48+
p.poll(timeout=0.5)
49+
50+
print('%s: %d log messages seen' % (f.name, f.cnt))
51+
52+
53+
def test_logging_constructor():
54+
""" Verify different forms of constructors """
55+
56+
for how in ['dict', 'dict+kwarg', 'kwarg']:
57+
logger = logging.getLogger('producer: ' + how)
58+
logger.setLevel(logging.DEBUG)
59+
f = CountingFilter(logger.name)
60+
logger.addFilter(f)
61+
62+
if how == 'dict':
63+
p = confluent_kafka.Producer({'debug': 'all', 'logger': logger})
64+
elif how == 'dict+kwarg':
65+
p = confluent_kafka.Producer({'debug': 'all'}, logger=logger)
66+
elif how == 'kwarg':
67+
conf = {'debug': 'all', 'logger': logger}
68+
p = confluent_kafka.Producer(**conf)
69+
else:
70+
raise RuntimeError('Not reached')
71+
72+
print('Test %s with %s' % (p, how))
73+
74+
while f.cnt == 0:
75+
p.poll(timeout=0.5)
76+
77+
print('%s: %s: %d log messages seen' % (how, f.name, f.cnt))

0 commit comments

Comments
 (0)