Skip to content

Commit a9601c4

Browse files
roopahcedenhill
authored andcommitted
AvroConsumer for handling schema registry (#80)
* AvroProducer for handling schema registry * Implemented revie feedback * Feedback * Implemented review comments * Implemented review comments * Implemented review comments * Consumer client for handling avro schemas
1 parent 271483d commit a9601c4

File tree

4 files changed

+105
-6
lines changed

4 files changed

+105
-6
lines changed

README.md

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,33 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.
6060
avroProducer.produce(topic='my_topic', value=value, key=key)
6161
```
6262

63+
**AvroConsumer**
64+
```
65+
import sys
66+
import traceback
67+
from confluent_kafka import KafkaError
68+
from confluent_kafka.avro import AvroConsumer
69+
from confluent_kafka.avro.serializer import SerializerError
70+
71+
c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'})
72+
c.subscribe(['my_topic'])
73+
running = True
74+
while running:
75+
try:
76+
msg = c.poll(10)
77+
if msg:
78+
if not msg.error():
79+
print(msg.value())
80+
elif msg.error().code() != KafkaError._PARTITION_EOF:
81+
print(msg.error())
82+
running = False
83+
except SerializerError as e:
84+
print("Message deserialization failed for %s: %s" % (msg, e))
85+
running = False
86+
87+
c.close()
88+
```
89+
6390
See [examples](examples) for more examples.
6491

6592

@@ -100,15 +127,15 @@ Install
100127

101128
$ pip install confluent-kafka
102129

103-
# for AvroProducer
130+
# for AvroProducer or AvroConsumer
104131
$ pip install confluent-kafka[avro]
105132

106133

107134
**Install from source / tarball:**
108135

109136
$ pip install .
110137

111-
# for AvroProducer
138+
# for AvroProducer or AvroConsumer
112139
$ pip install .[avro]
113140

114141

confluent_kafka/avro/__init__.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""
55
import sys
66

7-
from confluent_kafka import Producer
7+
from confluent_kafka import Producer, Consumer
88

99
VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
1010

@@ -112,3 +112,43 @@ def produce(self, **kwargs):
112112
raise SerializerError("Avro schema required for key")
113113

114114
super(AvroProducer, self).produce(topic, value, key, **kwargs)
115+
116+
117+
class AvroConsumer(Consumer):
118+
"""
119+
Kafka Consumer client which does avro schema decoding of messages.
120+
Handles message deserialization.
121+
122+
Constructor takes below parameters
123+
124+
@:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
125+
"""
126+
def __init__(self, config):
127+
128+
if ('schema.registry.url' not in config.keys()):
129+
raise ValueError("Missing parameter: schema.registry.url")
130+
schem_registry_url = config["schema.registry.url"]
131+
del config["schema.registry.url"]
132+
133+
super(AvroConsumer, self).__init__(config)
134+
self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
135+
136+
def poll(self, timeout):
137+
"""
138+
This is an overriden method from confluent_kafka.Consumer class. This handles message
139+
deserialization using avro schema
140+
141+
@:param timeout
142+
@:return message object with deserialized key and value as dict objects
143+
"""
144+
message = super(AvroConsumer, self).poll(timeout)
145+
if not message:
146+
return message
147+
if not message.error():
148+
if message.value() is not None:
149+
decoded_value = self._serializer.decode_message(message.value())
150+
message.set_value(decoded_value)
151+
if message.key() is not None:
152+
decoded_key = self._serializer.decode_message(message.key())
153+
message.set_key(decoded_key)
154+
return message

confluent_kafka/avro/cached_schema_registry_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import requests
2727

2828
from . import ClientError, VALID_LEVELS
29+
from . import loads
2930

3031
# Common accept header sent
3132
ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"
@@ -161,7 +162,7 @@ def get_by_id(self, schema_id):
161162
# need to parse the schema
162163
schema_str = result.get("schema")
163164
try:
164-
result = avro.loads(schema_str)
165+
result = loads(schema_str)
165166
# cache it
166167
self._cache_schema(result, schema_id)
167168
return result
@@ -200,7 +201,7 @@ def get_latest_schema(self, subject):
200201
schema = self.id_to_schema[schema_id]
201202
else:
202203
try:
203-
schema = avro.loads(result['schema'])
204+
schema = loads(result['schema'])
204205
except:
205206
# bad schema - should not happen
206207
raise ClientError("Received bad schema from registry.")

confluent_kafka/src/confluent_kafka.c

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,23 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
332332
self->timestamp);
333333
}
334334

335+
static PyObject *Message_set_value (Message *self, PyObject *new_val) {
336+
if (self->value)
337+
Py_DECREF(self->value);
338+
self->value = new_val;
339+
Py_INCREF(self->value);
340+
341+
Py_RETURN_NONE;
342+
}
343+
344+
static PyObject *Message_set_key (Message *self, PyObject *new_key) {
345+
if (self->key)
346+
Py_DECREF(self->key);
347+
self->key = new_key;
348+
Py_INCREF(self->key);
349+
350+
Py_RETURN_NONE;
351+
}
335352

336353
static PyMethodDef Message_methods[] = {
337354
{ "error", (PyCFunction)Message_error, METH_NOARGS,
@@ -391,6 +408,20 @@ static PyMethodDef Message_methods[] = {
391408
" :rtype: (int, int)\n"
392409
"\n"
393410
},
411+
{ "set_value", (PyCFunction)Message_set_value, METH_O,
412+
" Set the field 'Message.value' with new value.\n"
413+
" :param: object value: Message.value.\n"
414+
" :returns: None.\n"
415+
" :rtype: None\n"
416+
"\n"
417+
},
418+
{ "set_key", (PyCFunction)Message_set_key, METH_O,
419+
" Set the field 'Message.key' with new value.\n"
420+
" :param: object value: Message.key.\n"
421+
" :returns: None.\n"
422+
" :rtype: None\n"
423+
"\n"
424+
},
394425
{ NULL }
395426
};
396427

@@ -1219,7 +1250,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12191250
Py_DECREF(ks);
12201251
return NULL;
12211252
}
1222-
1253+
12231254
if (h->stats_cb) {
12241255
Py_DECREF(h->stats_cb);
12251256
h->stats_cb = NULL;

0 commit comments

Comments
 (0)