diff --git a/README.md b/README.md index 4d7eacdbd..3feb3460a 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,24 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema. avroProducer.produce(topic='my_topic', value=value, key=key) ``` +**AvroConsumer** +``` +from confluent_kafka.avro import AvroConsumer + +c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'}) +c.subscribe(['my_topic']) +running = True +while running: + msg = c.poll(10) + if msg: + if not msg.error(): + print(msg.value()) + elif msg.error().code() != KafkaError._PARTITION_EOF: + print(msg.error()) + running = False +c.close() +``` + See [examples](examples) for more examples. diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 354a030eb..ec4365928 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -4,7 +4,7 @@ """ import sys -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD'] @@ -112,3 +112,45 @@ def produce(self, **kwargs): raise SerializerError("Avro schema required for key") super(AvroProducer, self).produce(topic, value, key, **kwargs) + + +class AvroConsumer(Consumer): + """ + Kafka Consumer client which does avro schema decoding of messages. + Handles message deserialization. + + Constructor takes below parameters + + @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url). + """ + def __init__(self, config): + + if ('schema.registry.url' not in config.keys()): + raise ValueError("Missing parameter: schema.registry.url") + schem_registry_url = config["schema.registry.url"] + del config["schema.registry.url"] + + super(AvroConsumer, self).__init__(config) + self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url)) + + def poll(self, timeout): + """ + This is an overriden method from confluent_kafka.Consumer class. This handles message + deserialization using avro schema + + @:param timeout + @:return message object with deserialized key and value as dict objects + """ + message = super(AvroConsumer, self).poll(timeout) + if not message: + return message + print(message) + if not message.error(): + if message.value(): + decoded_value = self._serializer.decode_message(message.value()) + message.set_value(decoded_value) + if message.key(): + decoded_key = self._serializer.decode_message(message.key()) + message.set_key(decoded_key) + return message + diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 61e258761..97735c329 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -26,6 +26,7 @@ import requests from . import ClientError, VALID_LEVELS +from . import loads # Common accept header sent ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json" @@ -161,7 +162,7 @@ def get_by_id(self, schema_id): # need to parse the schema schema_str = result.get("schema") try: - result = avro.loads(schema_str) + result = loads(schema_str) # cache it self._cache_schema(result, schema_id) return result @@ -200,7 +201,7 @@ def get_latest_schema(self, subject): schema = self.id_to_schema[schema_id] else: try: - schema = avro.loads(result['schema']) + schema = loads(result['schema']) except: # bad schema - should not happen raise ClientError("Received bad schema from registry.") diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index a323ba2fd..db410ff05 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -332,6 +332,19 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { self->timestamp); } +static PyObject *Message_set_value (Message *self, PyObject *new_val) { + Py_DECREF(self->value); + self->value = new_val; + Py_INCREF(self->value); + Py_RETURN_NONE; +} + +static PyObject *Message_set_key (Message *self, PyObject *new_key) { + Py_DECREF(self->key); + self->key = new_key; + Py_INCREF(self->key); + Py_RETURN_NONE; +} static PyMethodDef Message_methods[] = { { "error", (PyCFunction)Message_error, METH_NOARGS, @@ -391,6 +404,16 @@ static PyMethodDef Message_methods[] = { " :rtype: (int, int)\n" "\n" }, + { "set_value", (PyCFunction)Message_set_value, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, + { "set_key", (PyCFunction)Message_set_key, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, { NULL } }; diff --git a/tests/avro/test_avro_consumer.py b/tests/avro/test_avro_consumer.py new file mode 100644 index 000000000..1581deb06 --- /dev/null +++ b/tests/avro/test_avro_consumer.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from confluent_kafka.avro import AvroConsumer + + +class TestAvroConsumer(unittest.TestCase): + def setUp(self): + pass + + def test_instantiation(self): + obj = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', + 'schema.registry.url': 'http://127.0.0.1:9002'}) + self.assertTrue(isinstance(obj, AvroConsumer)) + self.assertNotEqual(obj, None) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(TestAvroConsumer)