Skip to content

AvroConsumer for handling schema registry #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,33 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.
avroProducer.produce(topic='my_topic', value=value, key=key)
```

**AvroConsumer**
```
import sys
import traceback
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'})
c.subscribe(['my_topic'])
running = True
while running:
try:
msg = c.poll(10)
if msg:
if not msg.error():
print(msg.value())
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False

c.close()
```

See [examples](examples) for more examples.


Expand Down Expand Up @@ -100,15 +127,15 @@ Install

$ pip install confluent-kafka

# for AvroProducer
# for AvroProducer or AvroConsumer
$ pip install confluent-kafka[avro]


**Install from source / tarball:**

$ pip install .

# for AvroProducer
# for AvroProducer or AvroConsumer
$ pip install .[avro]


Expand Down
42 changes: 41 additions & 1 deletion confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
import sys

from confluent_kafka import Producer
from confluent_kafka import Producer, Consumer

VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']

Expand Down Expand Up @@ -112,3 +112,43 @@ def produce(self, **kwargs):
raise SerializerError("Avro schema required for key")

super(AvroProducer, self).produce(topic, value, key, **kwargs)


class AvroConsumer(Consumer):
"""
Kafka Consumer client which does avro schema decoding of messages.
Handles message deserialization.

Constructor takes below parameters

@:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
"""
def __init__(self, config):

if ('schema.registry.url' not in config.keys()):
raise ValueError("Missing parameter: schema.registry.url")
schem_registry_url = config["schema.registry.url"]
del config["schema.registry.url"]

super(AvroConsumer, self).__init__(config)
self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))

def poll(self, timeout):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles message
deserialization using avro schema

@:param timeout
@:return message object with deserialized key and value as dict objects
"""
message = super(AvroConsumer, self).poll(timeout)
if not message:
return message
if not message.error():
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value())
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key())
message.set_key(decoded_key)
return message
5 changes: 3 additions & 2 deletions confluent_kafka/avro/cached_schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import requests

from . import ClientError, VALID_LEVELS
from . import loads

# Common accept header sent
ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"
Expand Down Expand Up @@ -161,7 +162,7 @@ def get_by_id(self, schema_id):
# need to parse the schema
schema_str = result.get("schema")
try:
result = avro.loads(schema_str)
result = loads(schema_str)
# cache it
self._cache_schema(result, schema_id)
return result
Expand Down Expand Up @@ -200,7 +201,7 @@ def get_latest_schema(self, subject):
schema = self.id_to_schema[schema_id]
else:
try:
schema = avro.loads(result['schema'])
schema = loads(result['schema'])
except:
# bad schema - should not happen
raise ClientError("Received bad schema from registry.")
Expand Down
33 changes: 32 additions & 1 deletion confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,23 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
self->timestamp);
}

static PyObject *Message_set_value (Message *self, PyObject *new_val) {
if (self->value)
Py_DECREF(self->value);
self->value = new_val;
Py_INCREF(self->value);

Py_RETURN_NONE;
}

static PyObject *Message_set_key (Message *self, PyObject *new_key) {
if (self->key)
Py_DECREF(self->key);
self->key = new_key;
Py_INCREF(self->key);

Py_RETURN_NONE;
}

static PyMethodDef Message_methods[] = {
{ "error", (PyCFunction)Message_error, METH_NOARGS,
Expand Down Expand Up @@ -391,6 +408,20 @@ static PyMethodDef Message_methods[] = {
" :rtype: (int, int)\n"
"\n"
},
{ "set_value", (PyCFunction)Message_set_value, METH_O,
" Set the field 'Message.value' with new value.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

" :param: object value: Message.value.\n"
" :returns: None.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring saying what the method actually does

" :rtype: None\n"
"\n"
},
{ "set_key", (PyCFunction)Message_set_key, METH_O,
" Set the field 'Message.key' with new value.\n"
" :param: object value: Message.key.\n"
" :returns: None.\n"
" :rtype: None\n"
"\n"
},
{ NULL }
};

Expand Down Expand Up @@ -1217,7 +1248,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
return NULL;
}

if (h->stats_cb) {
Py_DECREF(h->stats_cb);
h->stats_cb = NULL;
Expand Down