Skip to content

Schema registry client : Consumer code #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.
avroProducer.produce(topic='my_topic', value=value, key=key)
```

**AvroConsumer**
```
from confluent_kafka.avro import AvroConsumer

c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'})
c.subscribe(['my_topic'])
running = True
while running:
msg = c.poll(10)
if msg:
if not msg.error():
print(msg.value())
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
```

See [examples](examples) for more examples.


Expand Down
44 changes: 43 additions & 1 deletion confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
import sys

from confluent_kafka import Producer
from confluent_kafka import Producer, Consumer

VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']

Expand Down Expand Up @@ -112,3 +112,45 @@ def produce(self, **kwargs):
raise SerializerError("Avro schema required for key")

super(AvroProducer, self).produce(topic, value, key, **kwargs)


class AvroConsumer(Consumer):
"""
Kafka Consumer client which does avro schema decoding of messages.
Handles message deserialization.

Constructor takes below parameters

@:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
"""
def __init__(self, config):

if ('schema.registry.url' not in config.keys()):
raise ValueError("Missing parameter: schema.registry.url")
schem_registry_url = config["schema.registry.url"]
del config["schema.registry.url"]

super(AvroConsumer, self).__init__(config)
self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))

def poll(self, timeout):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles message
deserialization using avro schema

@:param timeout
@:return message object with deserialized key and value as dict objects
"""
message = super(AvroConsumer, self).poll(timeout)
if not message:
return message
print(message)
if not message.error():
if message.value():
decoded_value = self._serializer.decode_message(message.value())
message.set_value(decoded_value)
if message.key():
decoded_key = self._serializer.decode_message(message.key())
message.set_key(decoded_key)
return message

5 changes: 3 additions & 2 deletions confluent_kafka/avro/cached_schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import requests

from . import ClientError, VALID_LEVELS
from . import loads

# Common accept header sent
ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"
Expand Down Expand Up @@ -161,7 +162,7 @@ def get_by_id(self, schema_id):
# need to parse the schema
schema_str = result.get("schema")
try:
result = avro.loads(schema_str)
result = loads(schema_str)
# cache it
self._cache_schema(result, schema_id)
return result
Expand Down Expand Up @@ -200,7 +201,7 @@ def get_latest_schema(self, subject):
schema = self.id_to_schema[schema_id]
else:
try:
schema = avro.loads(result['schema'])
schema = loads(result['schema'])
except:
# bad schema - should not happen
raise ClientError("Received bad schema from registry.")
Expand Down
23 changes: 23 additions & 0 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,19 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
self->timestamp);
}

static PyObject *Message_set_value (Message *self, PyObject *new_val) {
Py_DECREF(self->value);
self->value = new_val;
Py_INCREF(self->value);
Py_RETURN_NONE;
}

static PyObject *Message_set_key (Message *self, PyObject *new_key) {
Py_DECREF(self->key);
self->key = new_key;
Py_INCREF(self->key);
Py_RETURN_NONE;
}

static PyMethodDef Message_methods[] = {
{ "error", (PyCFunction)Message_error, METH_NOARGS,
Expand Down Expand Up @@ -391,6 +404,16 @@ static PyMethodDef Message_methods[] = {
" :rtype: (int, int)\n"
"\n"
},
{ "set_value", (PyCFunction)Message_set_value, METH_O,
" :returns: None.\n"
" :rtype: None\n"
"\n"
},
{ "set_key", (PyCFunction)Message_set_key, METH_O,
" :returns: None.\n"
" :rtype: None\n"
"\n"
},
{ NULL }
};

Expand Down
39 changes: 39 additions & 0 deletions tests/avro/test_avro_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys

if sys.version_info[0] < 3:
import unittest
else:
import unittest2 as unittest
from confluent_kafka.avro import AvroConsumer


class TestAvroConsumer(unittest.TestCase):
def setUp(self):
pass

def test_instantiation(self):
obj = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid',
'schema.registry.url': 'http://127.0.0.1:9002'})
self.assertTrue(isinstance(obj, AvroConsumer))
self.assertNotEqual(obj, None)


def suite():
return unittest.TestLoader().loadTestsFromTestCase(TestAvroConsumer)