Skip to content

Commit 550688d

Browse files
author
Roopa Hiremath Chandrasekaraiah
committed
AvroProducer for handling schema registry
1 parent 406a2bd commit 550688d

File tree

4 files changed

+87
-4
lines changed

4 files changed

+87
-4
lines changed

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,24 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.
6060
avroProducer.produce(topic='my_topic', value=value, key=key)
6161
```
6262

63+
**AvroConsumer**
64+
```
65+
from confluent_kafka.avro import AvroConsumer
66+
67+
c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'})
68+
c.subscribe(['my_topic'])
69+
running = True
70+
while running:
71+
msg = c.poll(10)
72+
if msg:
73+
if not msg.error():
74+
print(msg.value())
75+
elif msg.error().code() != KafkaError._PARTITION_EOF:
76+
print(msg.error())
77+
running = False
78+
c.close()
79+
```
80+
6381
See [examples](examples) for more examples.
6482

6583

confluent_kafka/avro/__init__.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""
55
import sys
66

7-
from confluent_kafka import Producer
7+
from confluent_kafka import Producer, Consumer
88

99
VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
1010

@@ -112,3 +112,44 @@ def produce(self, **kwargs):
112112
raise SerializerError("Avro schema required for key")
113113

114114
super(AvroProducer, self).produce(topic, value, key, **kwargs)
115+
116+
117+
class AvroConsumer(Consumer):
118+
"""
119+
Kafka Consumer client which does avro schema decoding of messages.
120+
Handles message deserialization.
121+
122+
Constructor takes below parameters
123+
124+
@:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
125+
"""
126+
def __init__(self, config):
127+
128+
if ('schema.registry.url' not in config.keys()):
129+
raise ValueError("Missing parameter: schema.registry.url")
130+
schem_registry_url = config["schema.registry.url"]
131+
del config["schema.registry.url"]
132+
133+
super(AvroConsumer, self).__init__(config)
134+
self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
135+
136+
def poll(self, timeout):
137+
"""
138+
This is an overriden method from confluent_kafka.Consumer class. This handles message
139+
deserialization using avro schema
140+
141+
@:param timeout
142+
@:return message object with deserialized key and value as dict objects
143+
"""
144+
message = super(AvroConsumer, self).poll(timeout)
145+
if not message:
146+
return message
147+
print(message)
148+
if not message.error():
149+
if message.value():
150+
decoded_value = self._serializer.decode_message(message.value())
151+
message.set_value(decoded_value)
152+
if message.key():
153+
decoded_key = self._serializer.decode_message(message.key())
154+
message.set_key(decoded_key)
155+
return message

confluent_kafka/avro/cached_schema_registry_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import requests
2727

2828
from . import ClientError, VALID_LEVELS
29+
from . import loads
2930

3031
# Common accept header sent
3132
ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"
@@ -161,7 +162,7 @@ def get_by_id(self, schema_id):
161162
# need to parse the schema
162163
schema_str = result.get("schema")
163164
try:
164-
result = avro.loads(schema_str)
165+
result = loads(schema_str)
165166
# cache it
166167
self._cache_schema(result, schema_id)
167168
return result
@@ -200,7 +201,7 @@ def get_latest_schema(self, subject):
200201
schema = self.id_to_schema[schema_id]
201202
else:
202203
try:
203-
schema = avro.loads(result['schema'])
204+
schema = loads(result['schema'])
204205
except:
205206
# bad schema - should not happen
206207
raise ClientError("Received bad schema from registry.")

confluent_kafka/src/confluent_kafka.c

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,19 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
332332
self->timestamp);
333333
}
334334

335+
static PyObject *Message_set_value (Message *self, PyObject *new_val) {
336+
Py_DECREF(self->value);
337+
self->value = new_val;
338+
Py_INCREF(self->value);
339+
Py_RETURN_NONE;
340+
}
341+
342+
static PyObject *Message_set_key (Message *self, PyObject *new_key) {
343+
Py_DECREF(self->key);
344+
self->key = new_key;
345+
Py_INCREF(self->key);
346+
Py_RETURN_NONE;
347+
}
335348

336349
static PyMethodDef Message_methods[] = {
337350
{ "error", (PyCFunction)Message_error, METH_NOARGS,
@@ -391,6 +404,16 @@ static PyMethodDef Message_methods[] = {
391404
" :rtype: (int, int)\n"
392405
"\n"
393406
},
407+
{ "set_value", (PyCFunction)Message_set_value, METH_O,
408+
" :returns: None.\n"
409+
" :rtype: None\n"
410+
"\n"
411+
},
412+
{ "set_key", (PyCFunction)Message_set_key, METH_O,
413+
" :returns: None.\n"
414+
" :rtype: None\n"
415+
"\n"
416+
},
394417
{ NULL }
395418
};
396419

@@ -1217,7 +1240,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12171240
Py_DECREF(ks);
12181241
return NULL;
12191242
}
1220-
1243+
12211244
if (h->stats_cb) {
12221245
Py_DECREF(h->stats_cb);
12231246
h->stats_cb = NULL;

0 commit comments

Comments
 (0)