diff --git a/README.md b/README.md index 4d7eacdbd..dc40452b6 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,33 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema. avroProducer.produce(topic='my_topic', value=value, key=key) ``` +**AvroConsumer** +``` +import sys +import traceback +from confluent_kafka import KafkaError +from confluent_kafka.avro import AvroConsumer +from confluent_kafka.avro.serializer import SerializerError + +c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'}) +c.subscribe(['my_topic']) +running = True +while running: + try: + msg = c.poll(10) + if msg: + if not msg.error(): + print(msg.value()) + elif msg.error().code() != KafkaError._PARTITION_EOF: + print(msg.error()) + running = False + except SerializerError as e: + print("Message deserialization failed for %s: %s" % (msg, e)) + running = False + +c.close() +``` + See [examples](examples) for more examples. @@ -100,7 +127,7 @@ Install $ pip install confluent-kafka - # for AvroProducer + # for AvroProducer or AvroConsumer $ pip install confluent-kafka[avro] @@ -108,7 +135,7 @@ Install $ pip install . - # for AvroProducer + # for AvroProducer or AvroConsumer $ pip install .[avro] diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 354a030eb..2194ea155 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -4,7 +4,7 @@ """ import sys -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD'] @@ -112,3 +112,43 @@ def produce(self, **kwargs): raise SerializerError("Avro schema required for key") super(AvroProducer, self).produce(topic, value, key, **kwargs) + + +class AvroConsumer(Consumer): + """ + Kafka Consumer client which does avro schema decoding of messages. + Handles message deserialization. + + Constructor takes below parameters + + @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url). + """ + def __init__(self, config): + + if ('schema.registry.url' not in config.keys()): + raise ValueError("Missing parameter: schema.registry.url") + schem_registry_url = config["schema.registry.url"] + del config["schema.registry.url"] + + super(AvroConsumer, self).__init__(config) + self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url)) + + def poll(self, timeout): + """ + This is an overriden method from confluent_kafka.Consumer class. This handles message + deserialization using avro schema + + @:param timeout + @:return message object with deserialized key and value as dict objects + """ + message = super(AvroConsumer, self).poll(timeout) + if not message: + return message + if not message.error(): + if message.value() is not None: + decoded_value = self._serializer.decode_message(message.value()) + message.set_value(decoded_value) + if message.key() is not None: + decoded_key = self._serializer.decode_message(message.key()) + message.set_key(decoded_key) + return message diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 61e258761..97735c329 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -26,6 +26,7 @@ import requests from . import ClientError, VALID_LEVELS +from . import loads # Common accept header sent ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json" @@ -161,7 +162,7 @@ def get_by_id(self, schema_id): # need to parse the schema schema_str = result.get("schema") try: - result = avro.loads(schema_str) + result = loads(schema_str) # cache it self._cache_schema(result, schema_id) return result @@ -200,7 +201,7 @@ def get_latest_schema(self, subject): schema = self.id_to_schema[schema_id] else: try: - schema = avro.loads(result['schema']) + schema = loads(result['schema']) except: # bad schema - should not happen raise ClientError("Received bad schema from registry.") diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index a323ba2fd..3b04c6575 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -332,6 +332,23 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { self->timestamp); } +static PyObject *Message_set_value (Message *self, PyObject *new_val) { + if (self->value) + Py_DECREF(self->value); + self->value = new_val; + Py_INCREF(self->value); + + Py_RETURN_NONE; +} + +static PyObject *Message_set_key (Message *self, PyObject *new_key) { + if (self->key) + Py_DECREF(self->key); + self->key = new_key; + Py_INCREF(self->key); + + Py_RETURN_NONE; +} static PyMethodDef Message_methods[] = { { "error", (PyCFunction)Message_error, METH_NOARGS, @@ -391,6 +408,20 @@ static PyMethodDef Message_methods[] = { " :rtype: (int, int)\n" "\n" }, + { "set_value", (PyCFunction)Message_set_value, METH_O, + " Set the field 'Message.value' with new value.\n" + " :param: object value: Message.value.\n" + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, + { "set_key", (PyCFunction)Message_set_key, METH_O, + " Set the field 'Message.key' with new value.\n" + " :param: object value: Message.key.\n" + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, { NULL } }; @@ -1217,7 +1248,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); return NULL; } - + if (h->stats_cb) { Py_DECREF(h->stats_cb); h->stats_cb = NULL;