Skip to content

Integrate client with Confluent schema registry and Avro #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
language: python
sudo: required
python:
- "2.7"
- "3.4"
before_install:
- bash tools/bootstrap-librdkafka.sh v0.9.2 tmp-build
- pip install --upgrade pip
- pip install pytest-timeout
install:
- pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" .
- pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro]
env:
- LD_LIBRARY_PATH=$PWD/tmp-build/lib
script: py.test -v --timeout 20 --ignore=tmp-build
script: py.test -v --timeout 20 --ignore=tmp-build --import-mode append
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ while running:
c.close()
```

**AvroProducer**
```
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
```

See [examples](examples) for more examples.


Expand Down Expand Up @@ -85,12 +99,18 @@ Install
**Install from PyPi:**

$ pip install confluent-kafka

# for AvroProducer
$ pip install confluent-kafka[avro]


**Install from source / tarball:**

$ pip install .

# for AvroProducer
$ pip install .[avro]


Build
=====
Expand Down
2 changes: 1 addition & 1 deletion confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__all__ = ['cimpl','kafkatest']
__all__ = ['cimpl', 'avro', 'kafkatest']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be conditional based on install type?
What happens if people dont install the avro submodule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill This should not affect using the client normally, just with basic producer and consumer.
The only catch is you would not be able to use AvroProducer unless the dependencies are installed.

from .cimpl import *
114 changes: 114 additions & 0 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas

"""
import sys

from confluent_kafka import Producer

VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']


def loads(schema_str):
""" Parse a schema given a schema string """
if sys.version_info[0] < 3:
return schema.parse(schema_str)
else:
return schema.Parse(schema_str)


def load(fp):
""" Parse a schema from a file path """
with open(fp) as f:
return loads(f.read())


# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitely as a quick fix
def _hash_func(self):
return hash(str(self))


try:
from avro import schema

schema.RecordSchema.__hash__ = _hash_func
schema.PrimitiveSchema.__hash__ = _hash_func
except ImportError:
pass


class ClientError(Exception):
""" Error thrown by Schema Registry clients """

def __init__(self, message, http_code=None):
self.message = message
self.http_code = http_code
super(ClientError, self).__init__(self.__str__())

def __repr__(self):
return "ClientError(error={error})".format(error=self.message)

def __str__(self):
return self.message


from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer


class AvroProducer(Producer):
"""
Kafka Producer client which does avro schema encoding to messages.
Handles schema registration, Message serialization.

Constructor takes below parameters

@:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
@:param: default_key_schema: Optional avro schema for key
@:param: default_value_schema: Optional avro schema for value
"""

def __init__(self, config, default_key_schema=None,
default_value_schema=None):
if ('schema.registry.url' not in config.keys()):
raise ValueError("Missing parameter: schema.registry.url")
schem_registry_url = config["schema.registry.url"]
del config["schema.registry.url"]

super(AvroProducer, self).__init__(config)
self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
self._key_schema = default_key_schema
self._value_schema = default_value_schema

def produce(self, **kwargs):
"""
Sends message to kafka by encoding with specified avro schema
@:param: topic: topic name
@:param: value: A dictionary object
@:param: value_schema : Avro schema for value
@:param: key: A dictionary object
@:param: key_schema : Avro schema for key
@:exception: SerializerError
"""
# get schemas from kwargs if defined
key_schema = kwargs.pop('key_schema', self._key_schema)
value_schema = kwargs.pop('value_schema', self._value_schema)
topic = kwargs.pop('topic', None)
if not topic:
raise ClientError("Topic name not specified.")
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)
if value:
if value_schema:
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
else:
raise SerializerError("Avro schema required for value")

if key:
if key_schema:
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
else:
raise SerializerError("Avro schema required for key")

super(AvroProducer, self).produce(topic, value, key, **kwargs)
Loading