From 550688d0cea5910efa23998106e634c4fc2cf068 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Mon, 21 Nov 2016 16:13:36 -0800 Subject: [PATCH 1/7] AvroProducer for handling schema registry --- README.md | 18 ++++++++ confluent_kafka/avro/__init__.py | 43 ++++++++++++++++++- .../avro/cached_schema_registry_client.py | 5 ++- confluent_kafka/src/confluent_kafka.c | 25 ++++++++++- 4 files changed, 87 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4d7eacdbd..3feb3460a 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,24 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema. avroProducer.produce(topic='my_topic', value=value, key=key) ``` +**AvroConsumer** +``` +from confluent_kafka.avro import AvroConsumer + +c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'}) +c.subscribe(['my_topic']) +running = True +while running: + msg = c.poll(10) + if msg: + if not msg.error(): + print(msg.value()) + elif msg.error().code() != KafkaError._PARTITION_EOF: + print(msg.error()) + running = False +c.close() +``` + See [examples](examples) for more examples. diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 354a030eb..e3f110e1d 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -4,7 +4,7 @@ """ import sys -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD'] @@ -112,3 +112,44 @@ def produce(self, **kwargs): raise SerializerError("Avro schema required for key") super(AvroProducer, self).produce(topic, value, key, **kwargs) + + +class AvroConsumer(Consumer): + """ + Kafka Consumer client which does avro schema decoding of messages. + Handles message deserialization. + + Constructor takes below parameters + + @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url). + """ + def __init__(self, config): + + if ('schema.registry.url' not in config.keys()): + raise ValueError("Missing parameter: schema.registry.url") + schem_registry_url = config["schema.registry.url"] + del config["schema.registry.url"] + + super(AvroConsumer, self).__init__(config) + self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url)) + + def poll(self, timeout): + """ + This is an overriden method from confluent_kafka.Consumer class. This handles message + deserialization using avro schema + + @:param timeout + @:return message object with deserialized key and value as dict objects + """ + message = super(AvroConsumer, self).poll(timeout) + if not message: + return message + print(message) + if not message.error(): + if message.value(): + decoded_value = self._serializer.decode_message(message.value()) + message.set_value(decoded_value) + if message.key(): + decoded_key = self._serializer.decode_message(message.key()) + message.set_key(decoded_key) + return message diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 61e258761..97735c329 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -26,6 +26,7 @@ import requests from . import ClientError, VALID_LEVELS +from . import loads # Common accept header sent ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json" @@ -161,7 +162,7 @@ def get_by_id(self, schema_id): # need to parse the schema schema_str = result.get("schema") try: - result = avro.loads(schema_str) + result = loads(schema_str) # cache it self._cache_schema(result, schema_id) return result @@ -200,7 +201,7 @@ def get_latest_schema(self, subject): schema = self.id_to_schema[schema_id] else: try: - schema = avro.loads(result['schema']) + schema = loads(result['schema']) except: # bad schema - should not happen raise ClientError("Received bad schema from registry.") diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index a323ba2fd..5717f3c49 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -332,6 +332,19 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { self->timestamp); } +static PyObject *Message_set_value (Message *self, PyObject *new_val) { + Py_DECREF(self->value); + self->value = new_val; + Py_INCREF(self->value); + Py_RETURN_NONE; +} + +static PyObject *Message_set_key (Message *self, PyObject *new_key) { + Py_DECREF(self->key); + self->key = new_key; + Py_INCREF(self->key); + Py_RETURN_NONE; +} static PyMethodDef Message_methods[] = { { "error", (PyCFunction)Message_error, METH_NOARGS, @@ -391,6 +404,16 @@ static PyMethodDef Message_methods[] = { " :rtype: (int, int)\n" "\n" }, + { "set_value", (PyCFunction)Message_set_value, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, + { "set_key", (PyCFunction)Message_set_key, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, { NULL } }; @@ -1217,7 +1240,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); return NULL; } - + if (h->stats_cb) { Py_DECREF(h->stats_cb); h->stats_cb = NULL; From 5b88a9eadbd0e6fdea6047fb19c85f214b8d0ee6 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Tue, 22 Nov 2016 10:04:58 -0800 Subject: [PATCH 2/7] Implemented revie feedback --- README.md | 4 +++- confluent_kafka/avro/__init__.py | 1 - confluent_kafka/src/confluent_kafka.c | 18 ++++++++++++------ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 3feb3460a..065c4b4b3 100644 --- a/README.md +++ b/README.md @@ -64,13 +64,15 @@ avroProducer.produce(topic='my_topic', value=value, key=key) ``` from confluent_kafka.avro import AvroConsumer -c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'}) +c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'}) c.subscribe(['my_topic']) running = True while running: msg = c.poll(10) if msg: if not msg.error(): + if (msg.value() && isinstance(msg.value(), dict)): + print("Got back deserialized dict object") print(msg.value()) elif msg.error().code() != KafkaError._PARTITION_EOF: print(msg.error()) diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index e3f110e1d..0fc2ab62b 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -144,7 +144,6 @@ def poll(self, timeout): message = super(AvroConsumer, self).poll(timeout) if not message: return message - print(message) if not message.error(): if message.value(): decoded_value = self._serializer.decode_message(message.value()) diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 5717f3c49..1bf71d98a 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -333,16 +333,20 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { } static PyObject *Message_set_value (Message *self, PyObject *new_val) { - Py_DECREF(self->value); - self->value = new_val; - Py_INCREF(self->value); + if (self->value) { + Py_DECREF(self->value); + self->value = new_val; + Py_INCREF(self->value); + } Py_RETURN_NONE; } static PyObject *Message_set_key (Message *self, PyObject *new_key) { - Py_DECREF(self->key); - self->key = new_key; - Py_INCREF(self->key); + if (self->key) { + Py_DECREF(self->key); + self->key = new_key; + Py_INCREF(self->key); + } Py_RETURN_NONE; } @@ -405,11 +409,13 @@ static PyMethodDef Message_methods[] = { "\n" }, { "set_value", (PyCFunction)Message_set_value, METH_O, + " Set the field 'Message.value' with new value.\n" " :returns: None.\n" " :rtype: None\n" "\n" }, { "set_key", (PyCFunction)Message_set_key, METH_O, + " Set the field 'Message.key' with new value.\n" " :returns: None.\n" " :rtype: None\n" "\n" From f8ec6016c66b495792f524a15e37f6171111f475 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Tue, 22 Nov 2016 10:32:34 -0800 Subject: [PATCH 3/7] Feedback --- README.md | 20 +++++++++++--------- confluent_kafka/src/confluent_kafka.c | 18 ++++++++++-------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 065c4b4b3..72eb49d41 100644 --- a/README.md +++ b/README.md @@ -68,15 +68,17 @@ c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupi c.subscribe(['my_topic']) running = True while running: - msg = c.poll(10) - if msg: - if not msg.error(): - if (msg.value() && isinstance(msg.value(), dict)): - print("Got back deserialized dict object") - print(msg.value()) - elif msg.error().code() != KafkaError._PARTITION_EOF: - print(msg.error()) - running = False + try: + msg = c.poll(10) + if msg: + if not msg.error(): + print(msg.value()) + elif msg.error().code() != KafkaError._PARTITION_EOF: + print(msg.error()) + running = False + except SerializerError: + print("Unable to decode the message") + running = False c.close() ``` diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 1bf71d98a..62141429b 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -333,20 +333,20 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { } static PyObject *Message_set_value (Message *self, PyObject *new_val) { - if (self->value) { + if (self->value) Py_DECREF(self->value); - self->value = new_val; - Py_INCREF(self->value); - } + self->value = new_val; + Py_INCREF(self->value); + Py_RETURN_NONE; } static PyObject *Message_set_key (Message *self, PyObject *new_key) { - if (self->key) { + if (self->key) Py_DECREF(self->key); - self->key = new_key; - Py_INCREF(self->key); - } + self->key = new_key; + Py_INCREF(self->key); + Py_RETURN_NONE; } @@ -410,12 +410,14 @@ static PyMethodDef Message_methods[] = { }, { "set_value", (PyCFunction)Message_set_value, METH_O, " Set the field 'Message.value' with new value.\n" + " :param: PyObject value: Message.value.\n" " :returns: None.\n" " :rtype: None\n" "\n" }, { "set_key", (PyCFunction)Message_set_key, METH_O, " Set the field 'Message.key' with new value.\n" + " :param: PyObject value: Message.key.\n" " :returns: None.\n" " :rtype: None\n" "\n" From 4bac7cdaf7a96dac55193d88c76281b6b91f46d9 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Tue, 22 Nov 2016 19:59:44 -0800 Subject: [PATCH 4/7] Implemented review comments --- README.md | 14 ++++++++++---- confluent_kafka/src/confluent_kafka.c | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 72eb49d41..cec61020d 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,11 @@ avroProducer.produce(topic='my_topic', value=value, key=key) **AvroConsumer** ``` +import sys +import traceback +from confluent_kafka import KafkaError from confluent_kafka.avro import AvroConsumer +from confluent_kafka.avro.serializer import SerializerError c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'}) c.subscribe(['my_topic']) @@ -76,9 +80,11 @@ while running: elif msg.error().code() != KafkaError._PARTITION_EOF: print(msg.error()) running = False - except SerializerError: - print("Unable to decode the message") + except SerializerError as e: + exc_type, exc_value, exc_traceback = sys.exc_info() + print(repr(traceback.format_exception(exc_type, exc_value, exc_traceback))) running = False + c.close() ``` @@ -122,7 +128,7 @@ Install $ pip install confluent-kafka - # for AvroProducer + # for AvroProducer or AvroConsumer $ pip install confluent-kafka[avro] @@ -130,7 +136,7 @@ Install $ pip install . - # for AvroProducer + # for AvroProducer or AvroConsumer $ pip install .[avro] diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 62141429b..3b04c6575 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -410,14 +410,14 @@ static PyMethodDef Message_methods[] = { }, { "set_value", (PyCFunction)Message_set_value, METH_O, " Set the field 'Message.value' with new value.\n" - " :param: PyObject value: Message.value.\n" + " :param: object value: Message.value.\n" " :returns: None.\n" " :rtype: None\n" "\n" }, { "set_key", (PyCFunction)Message_set_key, METH_O, " Set the field 'Message.key' with new value.\n" - " :param: PyObject value: Message.key.\n" + " :param: object value: Message.key.\n" " :returns: None.\n" " :rtype: None\n" "\n" From 21c0b6485a91bd99cdccc67d5590b7c5610ed973 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Tue, 22 Nov 2016 20:11:58 -0800 Subject: [PATCH 5/7] Implemented review comments --- confluent_kafka/avro/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 0fc2ab62b..2194ea155 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -145,10 +145,10 @@ def poll(self, timeout): if not message: return message if not message.error(): - if message.value(): + if message.value() is not None: decoded_value = self._serializer.decode_message(message.value()) message.set_value(decoded_value) - if message.key(): + if message.key() is not None: decoded_key = self._serializer.decode_message(message.key()) message.set_key(decoded_key) return message From badd3bb29bc48df53ceeb63f6892447f7bcce467 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Fri, 25 Nov 2016 10:18:04 -0800 Subject: [PATCH 6/7] Implemented review comments --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index cec61020d..dc40452b6 100644 --- a/README.md +++ b/README.md @@ -81,8 +81,7 @@ while running: print(msg.error()) running = False except SerializerError as e: - exc_type, exc_value, exc_traceback = sys.exc_info() - print(repr(traceback.format_exception(exc_type, exc_value, exc_traceback))) + print("Message deserialization failed for %s: %s" % (msg, e)) running = False c.close() From 9797fefffbaa13e72e42f291e66f3be9aadc0624 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Mon, 21 Nov 2016 16:13:36 -0800 Subject: [PATCH 7/7] Consumer client for handling avro schemas --- README.md | 31 +++++++++++++- confluent_kafka/avro/__init__.py | 42 ++++++++++++++++++- .../avro/cached_schema_registry_client.py | 5 ++- confluent_kafka/src/confluent_kafka.c | 33 ++++++++++++++- 4 files changed, 105 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 4d7eacdbd..dc40452b6 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,33 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema. avroProducer.produce(topic='my_topic', value=value, key=key) ``` +**AvroConsumer** +``` +import sys +import traceback +from confluent_kafka import KafkaError +from confluent_kafka.avro import AvroConsumer +from confluent_kafka.avro.serializer import SerializerError + +c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'}) +c.subscribe(['my_topic']) +running = True +while running: + try: + msg = c.poll(10) + if msg: + if not msg.error(): + print(msg.value()) + elif msg.error().code() != KafkaError._PARTITION_EOF: + print(msg.error()) + running = False + except SerializerError as e: + print("Message deserialization failed for %s: %s" % (msg, e)) + running = False + +c.close() +``` + See [examples](examples) for more examples. @@ -100,7 +127,7 @@ Install $ pip install confluent-kafka - # for AvroProducer + # for AvroProducer or AvroConsumer $ pip install confluent-kafka[avro] @@ -108,7 +135,7 @@ Install $ pip install . - # for AvroProducer + # for AvroProducer or AvroConsumer $ pip install .[avro] diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 354a030eb..2194ea155 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -4,7 +4,7 @@ """ import sys -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD'] @@ -112,3 +112,43 @@ def produce(self, **kwargs): raise SerializerError("Avro schema required for key") super(AvroProducer, self).produce(topic, value, key, **kwargs) + + +class AvroConsumer(Consumer): + """ + Kafka Consumer client which does avro schema decoding of messages. + Handles message deserialization. + + Constructor takes below parameters + + @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url). + """ + def __init__(self, config): + + if ('schema.registry.url' not in config.keys()): + raise ValueError("Missing parameter: schema.registry.url") + schem_registry_url = config["schema.registry.url"] + del config["schema.registry.url"] + + super(AvroConsumer, self).__init__(config) + self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url)) + + def poll(self, timeout): + """ + This is an overriden method from confluent_kafka.Consumer class. This handles message + deserialization using avro schema + + @:param timeout + @:return message object with deserialized key and value as dict objects + """ + message = super(AvroConsumer, self).poll(timeout) + if not message: + return message + if not message.error(): + if message.value() is not None: + decoded_value = self._serializer.decode_message(message.value()) + message.set_value(decoded_value) + if message.key() is not None: + decoded_key = self._serializer.decode_message(message.key()) + message.set_key(decoded_key) + return message diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 61e258761..97735c329 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -26,6 +26,7 @@ import requests from . import ClientError, VALID_LEVELS +from . import loads # Common accept header sent ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json" @@ -161,7 +162,7 @@ def get_by_id(self, schema_id): # need to parse the schema schema_str = result.get("schema") try: - result = avro.loads(schema_str) + result = loads(schema_str) # cache it self._cache_schema(result, schema_id) return result @@ -200,7 +201,7 @@ def get_latest_schema(self, subject): schema = self.id_to_schema[schema_id] else: try: - schema = avro.loads(result['schema']) + schema = loads(result['schema']) except: # bad schema - should not happen raise ClientError("Received bad schema from registry.") diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index a323ba2fd..3b04c6575 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -332,6 +332,23 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { self->timestamp); } +static PyObject *Message_set_value (Message *self, PyObject *new_val) { + if (self->value) + Py_DECREF(self->value); + self->value = new_val; + Py_INCREF(self->value); + + Py_RETURN_NONE; +} + +static PyObject *Message_set_key (Message *self, PyObject *new_key) { + if (self->key) + Py_DECREF(self->key); + self->key = new_key; + Py_INCREF(self->key); + + Py_RETURN_NONE; +} static PyMethodDef Message_methods[] = { { "error", (PyCFunction)Message_error, METH_NOARGS, @@ -391,6 +408,20 @@ static PyMethodDef Message_methods[] = { " :rtype: (int, int)\n" "\n" }, + { "set_value", (PyCFunction)Message_set_value, METH_O, + " Set the field 'Message.value' with new value.\n" + " :param: object value: Message.value.\n" + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, + { "set_key", (PyCFunction)Message_set_key, METH_O, + " Set the field 'Message.key' with new value.\n" + " :param: object value: Message.key.\n" + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, { NULL } }; @@ -1217,7 +1248,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, Py_DECREF(ks); return NULL; } - + if (h->stats_cb) { Py_DECREF(h->stats_cb); h->stats_cb = NULL;