From 6dbb5098c4aa47565200996f4c6ecbee65b2b2bb Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Fri, 18 Nov 2016 14:10:01 -0800 Subject: [PATCH 1/3] Producer client for handling avro schemas --- .travis.yml | 6 +- README.md | 20 ++ confluent_kafka/__init__.py | 2 +- confluent_kafka/avro/__init__.py | 114 +++++++ .../avro/cached_schema_registry_client.py | 313 ++++++++++++++++++ confluent_kafka/avro/serializer/__init__.py | 11 + .../avro/serializer/message_serializer.py | 208 ++++++++++++ setup.py | 13 +- tests/__init__.py | 0 tests/avro/__init__.py | 0 tests/avro/adv_schema.avsc | 61 ++++ tests/avro/basic_schema.avsc | 23 ++ tests/avro/data_gen.py | 101 ++++++ tests/avro/mock_registry.py | 189 +++++++++++ tests/avro/mock_schema_registry_client.py | 149 +++++++++ tests/avro/test_avro_producer.py | 76 +++++ tests/avro/test_cached_client.py | 137 ++++++++ tests/avro/test_message_serializer.py | 85 +++++ tests/avro/test_mock_client.py | 127 +++++++ tests/avro/test_util.py | 45 +++ tests/test_docs.py | 14 +- 21 files changed, 1684 insertions(+), 10 deletions(-) create mode 100644 confluent_kafka/avro/__init__.py create mode 100644 confluent_kafka/avro/cached_schema_registry_client.py create mode 100644 confluent_kafka/avro/serializer/__init__.py create mode 100644 confluent_kafka/avro/serializer/message_serializer.py create mode 100644 tests/__init__.py create mode 100644 tests/avro/__init__.py create mode 100644 tests/avro/adv_schema.avsc create mode 100644 tests/avro/basic_schema.avsc create mode 100644 tests/avro/data_gen.py create mode 100644 tests/avro/mock_registry.py create mode 100644 tests/avro/mock_schema_registry_client.py create mode 100644 tests/avro/test_avro_producer.py create mode 100644 tests/avro/test_cached_client.py create mode 100644 tests/avro/test_message_serializer.py create mode 100644 tests/avro/test_mock_client.py create mode 100644 tests/avro/test_util.py diff --git a/.travis.yml b/.travis.yml index 98f7cb4ba..f4dddb78f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,14 @@ language: python +sudo: required python: - "2.7" - "3.4" before_install: - bash tools/bootstrap-librdkafka.sh v0.9.2 tmp-build + - pip install --upgrade pip - pip install pytest-timeout install: - - pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . + - pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro] env: - LD_LIBRARY_PATH=$PWD/tmp-build/lib -script: py.test -v --timeout 20 --ignore=tmp-build +script: py.test -v --timeout 20 --ignore=tmp-build --import-mode append \ No newline at end of file diff --git a/README.md b/README.md index 3ca385396..4d7eacdbd 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,20 @@ while running: c.close() ``` +**AvroProducer** +``` +from confluent_kafka import avro +from confluent_kafka.avro import AvroProducer + +value_schema = avro.load('ValueSchema.avsc') +key_schema = avro.load('KeySchema.avsc') +value = {"name": "Value"} +key = {"name": "Key"} + +avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema) +avroProducer.produce(topic='my_topic', value=value, key=key) +``` + See [examples](examples) for more examples. @@ -85,12 +99,18 @@ Install **Install from PyPi:** $ pip install confluent-kafka + + # for AvroProducer + $ pip install confluent-kafka[avro] **Install from source / tarball:** $ pip install . + # for AvroProducer + $ pip install .[avro] + Build ===== diff --git a/confluent_kafka/__init__.py b/confluent_kafka/__init__.py index d998bec72..26d218c06 100644 --- a/confluent_kafka/__init__.py +++ b/confluent_kafka/__init__.py @@ -1,2 +1,2 @@ -__all__ = ['cimpl','kafkatest'] +__all__ = ['cimpl', 'avro', 'kafkatest'] from .cimpl import * diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py new file mode 100644 index 000000000..354a030eb --- /dev/null +++ b/confluent_kafka/avro/__init__.py @@ -0,0 +1,114 @@ +""" + Avro schema registry module: Deals with encoding and decoding of messages with avro schemas + +""" +import sys + +from confluent_kafka import Producer + +VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD'] + + +def loads(schema_str): + """ Parse a schema given a schema string """ + if sys.version_info[0] < 3: + return schema.parse(schema_str) + else: + return schema.Parse(schema_str) + + +def load(fp): + """ Parse a schema from a file path """ + with open(fp) as f: + return loads(f.read()) + + +# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitely as a quick fix +def _hash_func(self): + return hash(str(self)) + + +try: + from avro import schema + + schema.RecordSchema.__hash__ = _hash_func + schema.PrimitiveSchema.__hash__ = _hash_func +except ImportError: + pass + + +class ClientError(Exception): + """ Error thrown by Schema Registry clients """ + + def __init__(self, message, http_code=None): + self.message = message + self.http_code = http_code + super(ClientError, self).__init__(self.__str__()) + + def __repr__(self): + return "ClientError(error={error})".format(error=self.message) + + def __str__(self): + return self.message + + +from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient +from confluent_kafka.avro.serializer import SerializerError +from confluent_kafka.avro.serializer.message_serializer import MessageSerializer + + +class AvroProducer(Producer): + """ + Kafka Producer client which does avro schema encoding to messages. + Handles schema registration, Message serialization. + + Constructor takes below parameters + + @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url). + @:param: default_key_schema: Optional avro schema for key + @:param: default_value_schema: Optional avro schema for value + """ + + def __init__(self, config, default_key_schema=None, + default_value_schema=None): + if ('schema.registry.url' not in config.keys()): + raise ValueError("Missing parameter: schema.registry.url") + schem_registry_url = config["schema.registry.url"] + del config["schema.registry.url"] + + super(AvroProducer, self).__init__(config) + self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url)) + self._key_schema = default_key_schema + self._value_schema = default_value_schema + + def produce(self, **kwargs): + """ + Sends message to kafka by encoding with specified avro schema + @:param: topic: topic name + @:param: value: A dictionary object + @:param: value_schema : Avro schema for value + @:param: key: A dictionary object + @:param: key_schema : Avro schema for key + @:exception: SerializerError + """ + # get schemas from kwargs if defined + key_schema = kwargs.pop('key_schema', self._key_schema) + value_schema = kwargs.pop('value_schema', self._value_schema) + topic = kwargs.pop('topic', None) + if not topic: + raise ClientError("Topic name not specified.") + value = kwargs.pop('value', None) + key = kwargs.pop('key', None) + if value: + if value_schema: + value = self._serializer.encode_record_with_schema(topic, value_schema, value) + else: + raise SerializerError("Avro schema required for value") + + if key: + if key_schema: + key = self._serializer.encode_record_with_schema(topic, key_schema, key, True) + else: + raise SerializerError("Avro schema required for key") + + super(AvroProducer, self).produce(topic, value, key, **kwargs) diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py new file mode 100644 index 000000000..61e258761 --- /dev/null +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# +import json +import logging +from collections import defaultdict + +import requests + +from . import ClientError, VALID_LEVELS + +# Common accept header sent +ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json" +log = logging.getLogger(__name__) + + +class CachedSchemaRegistryClient(object): + """ + A client that talks to a Schema Registry over HTTP + + See http://confluent.io/docs/current/schema-registry/docs/intro.html + + Errors communicating to the server will result in a ClientError being raised. + + @:param: url: url to schema registry + """ + + def __init__(self, url, max_schemas_per_subject=1000): + """Construct a client by passing in the base URL of the schema registry server""" + + self.url = url.rstrip('/') + + self.max_schemas_per_subject = max_schemas_per_subject + # subj => { schema => id } + self.subject_to_schema_ids = defaultdict(dict) + # id => avro_schema + self.id_to_schema = defaultdict(dict) + # subj => { schema => version } + self.subject_to_schema_versions = defaultdict(dict) + + def _send_request(self, url, method='GET', body=None, headers=None): + if body: + body = json.dumps(body) + body = body.encode('utf8') + _headers = dict() + _headers["Accept"] = ACCEPT_HDR + if body: + _headers["Content-Length"] = str(len(body)) + _headers["Content-Type"] = "application/vnd.schemaregistry.v1+json" + + if headers: + for header_name in headers: + _headers[header_name] = headers[header_name] + if method == 'GET': + response = requests.get(url, headers=_headers) + elif method == 'POST': + response = requests.post(url, body, headers=_headers) + elif method == 'PUT': + response = requests.put(url, body, headers=_headers) + elif method == 'DELETE': + response = requests.delete(url, headers=_headers) + else: + raise ClientError("Invalid HTTP request type") + + result = json.loads(response.text) + return (result, response.status_code) + + def _add_to_cache(self, cache, subject, schema, value): + sub_cache = cache[subject] + sub_cache[schema] = value + + def _cache_schema(self, schema, schema_id, subject=None, version=None): + # don't overwrite anything + if schema_id in self.id_to_schema: + schema = self.id_to_schema[schema_id] + else: + self.id_to_schema[schema_id] = schema + + if subject: + self._add_to_cache(self.subject_to_schema_ids, + subject, schema, schema_id) + if version: + self._add_to_cache(self.subject_to_schema_versions, + subject, schema, version) + + def register(self, subject, avro_schema): + """ + POST /subjects/(string: subject)/versions + Register a schema with the registry under the given subject + and receive a schema id. + + avro_schema must be a parsed schema from the python avro library + + Multiple instances of the same schema will result in cache misses. + + @:param: subject: subject name + @:param: avro_schema: Avro schema to be registered + @:returns: schema_id: int value + """ + + schemas_to_id = self.subject_to_schema_ids[subject] + schema_id = schemas_to_id.get(avro_schema, None) + if schema_id != None: + return schema_id + # send it up + url = '/'.join([self.url, 'subjects', subject, 'versions']) + # body is { schema : json_string } + + body = {'schema': json.dumps(avro_schema.to_json())} + result, code = self._send_request(url, method='POST', body=body) + if code == 409: + raise ClientError("Incompatible Avro schema:" + str(code)) + elif code == 422: + raise ClientError("Invalid Avro schema:" + str(code)) + elif not (code >= 200 and code <= 299): + raise ClientError("Unable to register schema. Error code:" + str(code)) + # result is a dict + schema_id = result['id'] + # cache it + self._cache_schema(avro_schema, schema_id, subject) + return schema_id + + def get_by_id(self, schema_id): + """ + GET /schemas/ids/{int: id} + Retrieve a parsed avro schema by id or None if not found + @:param: schema_id: int value + @:returns: Avro schema + """ + if schema_id in self.id_to_schema: + return self.id_to_schema[schema_id] + # fetch from the registry + url = '/'.join([self.url, 'schemas', 'ids', str(schema_id)]) + + result, code = self._send_request(url) + if code == 404: + log.error("Schema not found:" + str(code)) + return None + elif not (code >= 200 and code <= 299): + log.error("Unable to get schema for the specific ID:" + str(code)) + return None + else: + # need to parse the schema + schema_str = result.get("schema") + try: + result = avro.loads(schema_str) + # cache it + self._cache_schema(result, schema_id) + return result + except: + # bad schema - should not happen + raise ClientError("Received bad schema from registry.") + + def get_latest_schema(self, subject): + """ + GET /subjects/(string: subject)/versions/(versionId: version) + + Return the latest 3-tuple of: + (the schema id, the parsed avro schema, the schema version) + for a particular subject. + + This call always contacts the registry. + + If the subject is not found, (None,None,None) is returned. + @:param: subject: subject name + @:returns: (schema_id, schema, version) + """ + url = '/'.join([self.url, 'subjects', subject, 'versions', 'latest']) + + result, code = self._send_request(url) + if code == 404: + log.error("Schema not found:" + str(code)) + return (None, None, None) + elif code == 422: + log.error("Invalid version:" + str(code)) + return (None, None, None) + elif not (code >= 200 and code <= 299): + return (None, None, None) + schema_id = result['id'] + version = result['version'] + if schema_id in self.id_to_schema: + schema = self.id_to_schema[schema_id] + else: + try: + schema = avro.loads(result['schema']) + except: + # bad schema - should not happen + raise ClientError("Received bad schema from registry.") + + self._cache_schema(schema, schema_id, subject, version) + return (schema_id, schema, version) + + def get_version(self, subject, avro_schema): + """ + POST /subjects/(string: subject) + + Get the version of a schema for a given subject. + + Returns None if not found. + @:param: subject: subject name + @:param: avro_schema: Avro schema + @:returns: version + """ + schemas_to_version = self.subject_to_schema_versions[subject] + version = schemas_to_version.get(avro_schema, None) + if version != None: + return version + + url = '/'.join([self.url, 'subjects', subject]) + body = {'schema': json.dumps(avro_schema.to_json())} + + result, code = self._send_request(url, method='POST', body=body) + if code == 404: + log.error("Not found:" + str(code)) + return None + elif not (code >= 200 and code <= 299): + log.error("Unable to get version of a schema:" + str(code)) + return None + schema_id = result['id'] + version = result['version'] + self._cache_schema(avro_schema, schema_id, subject, version) + return version + + def test_compatibility(self, subject, avro_schema, version='latest'): + """ + POST /compatibility/subjects/(string: subject)/versions/(versionId: version) + + Test the compatibility of a candidate parsed schema for a given subject. + + By default the latest version is checked against. + @:param: subject: subject name + @:param: avro_schema: Avro schema + @:return: True if compatible, False if not compatible + """ + url = '/'.join([self.url, 'compatibility', 'subjects', subject, + 'versions', str(version)]) + body = {'schema': json.dumps(avro_schema.to_json())} + try: + result, code = self._send_request(url, method='POST', body=body) + if code == 404: + log.error(("Subject or version not found:" + str(code))) + return False + elif code == 422: + log.error(("Invalid subject or schema:" + str(code))) + return False + elif code >= 200 and code <= 299: + return result.get('is_compatible') + else: + log.error("Unable to check the compatibility") + False + except: + return False + + def update_compatibility(self, level, subject=None): + """ + PUT /config/(string: subject) + + Update the compatibility level for a subject. Level must be one of: + + @:param: level: ex: 'NONE','FULL','FORWARD', or 'BACKWARD' + """ + if level not in VALID_LEVELS: + raise ClientError("Invalid level specified: %s" % (str(level))) + + url = '/'.join([self.url, 'config']) + if subject: + url += '/' + subject + + body = {"compatibility": level} + result, code = self._send_request(url, method='PUT', body=body) + if code >= 200 and code <= 299: + return result['compatibility'] + else: + raise ClientError("Unable to update level: %s. Error code: %d" % (str(level)), code) + + def get_compatibility(self, subject=None): + """ + GET /config + Get the current compatibility level for a subject. Result will be one of: + + @:param: subject: subject name + @:return: 'NONE','FULL','FORWARD', or 'BACKWARD' + """ + url = '/'.join([self.url, 'config']) + if subject: + url += '/' + subject + + result, code = self._send_request(url) + if code >= 200 and code <= 299: + compatibility = result.get('compatibility', None) + + if not compatibility: + compatibility = result.get('compatibilityLevel') + + return compatibility diff --git a/confluent_kafka/avro/serializer/__init__.py b/confluent_kafka/avro/serializer/__init__.py new file mode 100644 index 000000000..40070430e --- /dev/null +++ b/confluent_kafka/avro/serializer/__init__.py @@ -0,0 +1,11 @@ +class SerializerError(Exception): + """Generic error from serializer package""" + + def __init__(self, message): + self.message = message + + def __repr__(self): + return 'SerializerError(error={error})'.format(error=self.message) + + def __str__(self): + return self.message diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py new file mode 100644 index 000000000..2a8bf3d5a --- /dev/null +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# +import io +import logging +import struct +import sys +import traceback + +import avro +import avro.io + +from confluent_kafka.avro import ClientError +from . import SerializerError + +log = logging.getLogger(__name__) + +MAGIC_BYTE = 0 + +HAS_FAST = False +try: + from fastavro.reader import read_data + + HAS_FAST = True +except: + pass + + +class ContextStringIO(io.BytesIO): + """ + Wrapper to allow use of StringIO via 'with' constructs. + """ + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + return False + + +class MessageSerializer(object): + """ + A helper class that can serialize and deserialize messages + that need to be encoded or decoded using the schema registry. + + All encode_* methods return a buffer that can be sent to kafka. + All decode_* methods expect a buffer received from kafka. + """ + + def __init__(self, registry_client): + self.registry_client = registry_client + self.id_to_decoder_func = {} + self.id_to_writers = {} + + ''' + + ''' + + def encode_record_with_schema(self, topic, schema, record, is_key=False): + """ + Given a parsed avro schema, encode a record for the given topic. The + record is expected to be a dictionary. + + The schema is registered with the subject of 'topic-value' + @:param topic : Topic name + @:param schema : Avro Schema + @:param record : A dictionary object + @:returns : Encoded record with schema ID as bytes + """ + if not isinstance(record, dict): + raise SerializerError("record must be a dictionary") + subject_suffix = ('-key' if is_key else '-value') + # get the latest schema for the subject + subject = topic + subject_suffix + # register it + schema_id = self.registry_client.register(subject, schema) + if not schema_id: + message = "Unable to retrieve schema id for subject %s" % (subject) + raise SerializerError(message) + + # cache writer + self.id_to_writers[schema_id] = avro.io.DatumWriter(schema) + + return self.encode_record_with_schema_id(schema_id, record) + + def encode_record_with_schema_id(self, schema_id, record): + """ + Encode a record with a given schema id. The record must + be a python dictionary. + @:param: schema_id : integer ID + @:param: record : A dictionary object + @:returns: decoder function + """ + if not isinstance(record, dict): + raise SerializerError("record must be a dictionary") + # use slow avro + if schema_id not in self.id_to_writers: + # get the writer + schema + + try: + schema = self.registry_client.get_by_id(schema_id) + if not schema: + raise SerializerError("Schema does not exist") + self.id_to_writers[schema_id] = avro.io.DatumWriter(schema) + except ClientError as e: + exc_type, exc_value, exc_traceback = sys.exc_info() + raise SerializerError("Error fetching schema from registry:" + repr( + traceback.format_exception(exc_type, exc_value, exc_traceback))) + + # get the writer + writer = self.id_to_writers[schema_id] + with ContextStringIO() as outf: + # write the header + # magic byte + + outf.write(struct.pack('b', MAGIC_BYTE)) + + # write the schema ID in network byte order (big end) + + outf.write(struct.pack('>I', schema_id)) + + # write the record to the rest of it + # Create an encoder that we'll write to + encoder = avro.io.BinaryEncoder(outf) + # write the magic byte + # write the object in 'obj' as Avro to the fake file... + writer.write(record, encoder) + + return outf.getvalue() + + # Decoder support + def _get_decoder_func(self, schema_id, payload): + if schema_id in self.id_to_decoder_func: + return self.id_to_decoder_func[schema_id] + + # fetch from schema reg + try: + schema = self.registry_client.get_by_id(schema_id) + except: + schema = None + + if not schema: + err = "unable to fetch schema with id %d" % (schema_id) + raise SerializerError(err) + + curr_pos = payload.tell() + if HAS_FAST: + # try to use fast avro + try: + schema_dict = schema.to_json() + obj = read_data(payload, schema_dict) + # here means we passed so this is something fastavro can do + # seek back since it will be called again for the + # same payload - one time hit + + payload.seek(curr_pos) + decoder_func = lambda p: read_data(p, schema_dict) + self.id_to_decoder_func[schema_id] = decoder_func + return self.id_to_decoder_func[schema_id] + except: + pass + + # here means we should just delegate to slow avro + # rewind + payload.seek(curr_pos) + avro_reader = avro.io.DatumReader(schema) + + def decoder(p): + bin_decoder = avro.io.BinaryDecoder(p) + return avro_reader.read(bin_decoder) + + self.id_to_decoder_func[schema_id] = decoder + return self.id_to_decoder_func[schema_id] + + def decode_message(self, message): + """ + Decode a message from kafka that has been encoded for use with + the schema registry. + @:param: message + """ + if len(message) <= 5: + raise SerializerError("message is too small to decode") + + with ContextStringIO(message) as payload: + magic, schema_id = struct.unpack('>bI', payload.read(5)) + if magic != MAGIC_BYTE: + raise SerializerError("message does not start with magic byte") + decoder_func = self._get_decoder_func(schema_id, payload) + return decoder_func(payload) diff --git a/setup.py b/setup.py index c1e0d9c70..53a2abaaa 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,11 @@ from setuptools import setup, find_packages from distutils.core import Extension - +import sys +if sys.version_info[0] < 3: + avro = 'avro' +else: + avro = 'avro-python3' module = Extension('confluent_kafka.cimpl', libraries= ['rdkafka'], @@ -17,5 +21,8 @@ author_email='support@confluent.io', url='https://github.com/confluentinc/confluent-kafka-python', ext_modules=[module], - packages=find_packages(), - data_files = [('', ['LICENSE'])]) + packages=find_packages(exclude=("tests",)), + data_files = [('', ['LICENSE'])], + extras_require={ + 'avro': ['fastavro', 'requests', avro, 'unittest2'] + }) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/avro/__init__.py b/tests/avro/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/avro/adv_schema.avsc b/tests/avro/adv_schema.avsc new file mode 100644 index 000000000..a5f975f7a --- /dev/null +++ b/tests/avro/adv_schema.avsc @@ -0,0 +1,61 @@ +{ + "name": "advanced", + "type": "record", + "doc": "advanced schema for tests", + "namespace": "python.test.advanced", + "fields": [ + { + "name": "number", + "doc": "age", + "type": [ + "long", + "null" + ] + }, + { + "name": "name", + "doc": "a name", + "type": [ + "string" + ] + }, + { + "name": "friends", + "doc": "friends", + "type" : { + "type": "map", + "values" : { + "name": "basicPerson", + "type": "record", + "namespace": "python.test.advanced", + "fields": [ + { + "name": "number", + "doc": "friend age", + "type": [ + "long", + "null" + ] + }, + { + "name": "name", + "doc": "friend name", + "type": [ + "string" + ] + } + ] + } + } + }, + { + "name" : "family", + "doc" : "family", + "type" : { + "namespace" : "python.test.advanced", + "type" : "map", + "values" : "basicPerson" + } + } + ] +} diff --git a/tests/avro/basic_schema.avsc b/tests/avro/basic_schema.avsc new file mode 100644 index 000000000..1dfced457 --- /dev/null +++ b/tests/avro/basic_schema.avsc @@ -0,0 +1,23 @@ +{ + "name": "basic", + "type": "record", + "doc": "basic schema for tests", + "namespace": "python.test.basic", + "fields": [ + { + "name": "number", + "doc": "age", + "type": [ + "long", + "null" + ] + }, + { + "name": "name", + "doc": "a name", + "type": [ + "string" + ] + } + ] +} \ No newline at end of file diff --git a/tests/avro/data_gen.py b/tests/avro/data_gen.py new file mode 100644 index 000000000..8027b73ea --- /dev/null +++ b/tests/avro/data_gen.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# +import os +import os.path +import random + +NAMES = ['stefan', 'melanie', 'nick', 'darrel', 'kent', 'simon'] +AGES = list(range(1, 10)) + [None] + + +def get_schema_path(fname): + dname = os.path.dirname(os.path.realpath(__file__)) + return os.path.join(dname, fname) + + +def load_schema_file(fname): + fname = get_schema_path(fname) + with open(fname) as f: + return f.read() + + +avsc_dir = os.path.dirname(os.path.realpath(__file__)) + +BASIC_SCHEMA = load_schema_file(os.path.join(avsc_dir, 'basic_schema.avsc')) + + +def create_basic_item(i): + return { + 'name': random.choice(NAMES) + '-' + str(i), + 'number': random.choice(AGES) + } + + +BASIC_ITEMS = map(create_basic_item, range(1, 20)) + +ADVANCED_SCHEMA = load_schema_file(os.path.join(avsc_dir, 'adv_schema.avsc')) + + +def create_adv_item(i): + friends = map(create_basic_item, range(1, 3)) + family = map(create_basic_item, range(1, 3)) + basic = create_basic_item(i) + basic['family'] = dict(map(lambda bi: (bi['name'], bi), family)) + basic['friends'] = dict(map(lambda bi: (bi['name'], bi), friends)) + return basic + + +ADVANCED_ITEMS = map(create_adv_item, range(1, 20)) + +from avro import schema +from avro.datafile import DataFileWriter +from avro.io import DatumWriter + + +def _write_items(base_name, schema_str, items): + avro_schema = schema.Parse(schema_str) + avro_file = base_name + '.avro' + with DataFileWriter(open(avro_file, "w"), DatumWriter(), avro_schema) as writer: + for i in items: + writer.append(i) + writer.close + return (avro_file) + + +def write_basic_items(base_name): + return _write_items(base_name, BASIC_SCHEMA, BASIC_ITEMS) + + +def write_advanced_items(base_name): + return _write_items(base_name, ADVANCED_SCHEMA, ADVANCED_ITEMS) + + +def cleanup(files): + for f in files: + try: + os.remove(f) + except OSError: + pass + + +if __name__ == "__main__": + write_advanced_items("advanced") diff --git a/tests/avro/mock_registry.py b/tests/avro/mock_registry.py new file mode 100644 index 000000000..55ee031d0 --- /dev/null +++ b/tests/avro/mock_registry.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# + +import sys + +if sys.version_info[0] < 3: + import BaseHTTPServer as HTTPSERVER +else: + import http.server as HTTPSERVER +import json +import re +from threading import Thread + +from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient +from confluent_kafka import avro + + +class ReqHandler(HTTPSERVER.BaseHTTPRequestHandler): + protocol_version = "HTTP/1.0" + + def do_GET(self): + self.server._run_routes(self) + + def do_POST(self): + self.server._run_routes(self) + + def log_message(self, format, *args): + pass + + +class MockServer(HTTPSERVER.HTTPServer, object): + def __init__(self, *args, **kwargs): + super(MockServer, self).__init__(*args, **kwargs) + self.counts = {} + self.registry = MockSchemaRegistryClient() + self.schema_cache = {} + self.all_routes = { + 'GET': [ + (r"/schemas/ids/(\d+)", 'get_schema_by_id'), + (r"/subjects/(\w+)/versions/latest", 'get_latest') + ], + 'POST': [ + (r"/subjects/(\w+)/versions", 'register'), + (r"/subjects/(\w+)", 'get_version') + ] + } + + def _send_response(self, resp, status, body): + resp.send_response(status) + resp.send_header("Content-Type", "application/json") + resp.end_headers() + resp.wfile.write(json.dumps(body).encode()) + resp.finish() + + def _create_error(self, msg, status=400, err_code=1): + return (status, { + "error_code": err_code, + "message": msg + }) + + def _run_routes(self, req): + self.add_count((req.command, req.path)) + routes = self.all_routes.get(req.command, []) + for r in routes: + m = re.match(r[0], req.path) + if m: + func = getattr(self, r[1]) + status, body = func(req, m.groups()) + return self._send_response(req, status, body) + + # here means we got a bad req + status, body = self._create_error("bad path specified") + self._send_response(req, status, body) + + def get_schema_by_id(self, req, groups): + schema_id = int(groups[0]) + schema = self.registry.get_by_id(schema_id) + if not schema: + return self._create_error("schema not found", 404) + result = { + "schema": json.dumps(schema.to_json()) + } + return (200, result) + + def _get_identity_schema(self, avro_schema): + # normalized + schema_str = json.dumps(avro_schema.to_json()) + if schema_str in self.schema_cache: + return self.schema_cache[schema_str] + self.schema_cache[schema_str] = avro_schema + return avro_schema + + def _get_schema_from_body(self, req): + length = int(req.headers['content-length']) + data = req.rfile.read(length) + data = json.loads(data.decode("utf-8")) + schema = data.get("schema", None) + if not schema: + return None + try: + avro_schema = avro.loads(schema) + return self._get_identity_schema(avro_schema) + except: + return None + + def register(self, req, groups): + avro_schema = self._get_schema_from_body(req) + if not avro_schema: + return self._create_error("Invalid avro schema", 422, 42201) + subject = groups[0] + schema_id = self.registry.register(subject, avro_schema) + return (200, {'id': schema_id}) + + def get_version(self, req, groups): + avro_schema = self._get_schema_from_body(req) + if not avro_schema: + return self._create_error("Invalid avro schema", 422, 42201) + subject = groups[0] + version = self.registry.get_version(subject, avro_schema) + if version == -1: + return self._create_error("Not found", 404) + schema_id = self.registry.get_id_for_schema(subject, avro_schema) + + result = { + "schema": json.dumps(avro_schema.to_json()), + "subject": subject, + "id": schema_id, + "version": version + } + return (200, result) + + def get_latest(self, req, groups): + subject = groups[0] + schema_id, avro_schema, version = self.registry.get_latest_schema(subject) + if schema_id == None: + return self._create_error("Not found", 404) + result = { + "schema": json.dumps(avro_schema.to_json()), + "subject": subject, + "id": schema_id, + "version": version + } + return (200, result) + + def add_count(self, path): + if path not in self.counts: + self.counts[path] = 0 + self.counts[path] += 1 + + +class ServerThread(Thread): + def __init__(self, port): + Thread.__init__(self) + self.server = None + self.port = port + self.daemon = True + + def run(self): + self.server = MockServer(('127.0.0.1', self.port), ReqHandler) + self.server.serve_forever() + + def shutdown(self): + if self.server: + self.server.shutdown() + self.server.socket.close() + + +if __name__ == '__main__': + s = ServerThread(0) + s.start() diff --git a/tests/avro/mock_schema_registry_client.py b/tests/avro/mock_schema_registry_client.py new file mode 100644 index 000000000..0180c7af3 --- /dev/null +++ b/tests/avro/mock_schema_registry_client.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# + +from confluent_kafka.avro import ClientError + + +class MockSchemaRegistryClient(object): + """ + A client that acts as a schema registry locally. + + Compatibiity related methods are not implemented at this time. + """ + + def __init__(self, max_schemas_per_subject=1000): + self.max_schemas_per_subject = max_schemas_per_subject + # subj => { schema => id } + self.subject_to_schema_ids = {} + # id => avro_schema + self.id_to_schema = {} + # subj => { schema => version } + self.subject_to_schema_versions = {} + + self.subject_to_latest_schema = {} + + # counters + self.next_id = 1 + self.schema_to_id = {} + + def _get_next_id(self, schema): + if schema in self.schema_to_id: + return self.schema_to_id[schema] + result = self.next_id + self.next_id += 1 + self.schema_to_id[schema] = result + return result + + def _get_next_version(self, subject): + if subject not in self.subject_to_schema_versions: + self.subject_to_schema_versions[subject] = {} + return len(self.subject_to_schema_versions[subject]) + + def _get_all_versions(self, subject): + versions = self.subject_to_schema_versions.get(subject, {}) + return sorted(versions) + + def _add_to_cache(self, cache, subject, schema, value): + if subject not in cache: + cache[subject] = {} + sub_cache = cache[subject] + sub_cache[schema] = value + + def _cache_schema(self, schema, schema_id, subject, version): + # don't overwrite anything + if schema_id in self.id_to_schema: + schema = self.id_to_schema[schema_id] + else: + self.id_to_schema[schema_id] = schema + + self._add_to_cache(self.subject_to_schema_ids, + subject, schema, schema_id) + + self._add_to_cache(self.subject_to_schema_versions, + subject, schema, version) + + if subject in self.subject_to_latest_schema: + si, s, v = self.subject_to_latest_schema[subject] + if v > version: + return + self.subject_to_latest_schema[subject] = (schema_id, schema, version) + + def register(self, subject, avro_schema): + """ + Register a schema with the registry under the given subject + and receive a schema id. + + avro_schema must be a parsed schema from the python avro library + + Multiple instances of the same schema will result in inconsistencies. + """ + schemas_to_id = self.subject_to_schema_ids.get(subject, {}) + schema_id = schemas_to_id.get(avro_schema, -1) + if schema_id != -1: + return schema_id + + # add it + version = self._get_next_version(subject) + schema_id = self._get_next_id(avro_schema) + + # cache it + self._cache_schema(avro_schema, schema_id, subject, version) + return schema_id + + def get_by_id(self, schema_id): + """Retrieve a parsed avro schema by id or None if not found""" + return self.id_to_schema.get(schema_id, None) + + def get_latest_schema(self, subject): + """ + Return the latest 3-tuple of: + (the schema id, the parsed avro schema, the schema version) + for a particular subject. + + If the subject is not found, (None,None,None) is returned. + """ + return self.subject_to_latest_schema.get(subject, (None, None, None)) + + def get_version(self, subject, avro_schema): + """ + Get the version of a schema for a given subject. + + Returns -1 if not found. + """ + schemas_to_version = self.subject_to_schema_versions.get(subject, {}) + return schemas_to_version.get(avro_schema, -1) + + def get_id_for_schema(self, subject, avro_schema): + """ + Get the ID of a parsed schema + """ + schemas_to_id = self.subject_to_schema_ids.get(subject, {}) + return schemas_to_id.get(avro_schema, -1) + + def test_compatibility(self, subject, avro_schema, version='latest'): + raise ClientError("not implemented") + + def update_compatibility(self, level, subject=None): + raise ClientError("not implemented") + + def get_compatibility(self, subject=None): + raise ClientError("not implemented") diff --git a/tests/avro/test_avro_producer.py b/tests/avro/test_avro_producer.py new file mode 100644 index 000000000..ad055404e --- /dev/null +++ b/tests/avro/test_avro_producer.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# +import os +import sys + +from confluent_kafka import avro + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from confluent_kafka.avro import AvroProducer + +avsc_dir = os.path.dirname(os.path.realpath(__file__)) + + +class TestAvroProducer(unittest.TestCase): + def setUp(self): + pass + + def test_instantiation(self): + obj = AvroProducer({'schema.registry.url': 'http://127.0.0.1:0'}) + self.assertTrue(isinstance(obj, AvroProducer)) + self.assertNotEqual(obj, None) + + def test_Produce(self): + producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:0'}) + valueSchema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc")) + try: + producer.produce(topic='test', value={"name": 'abc"'}, value_schema=valueSchema, key='mykey') + self.fail("Should expect key_schema") + except Exception as e: + pass + + def test_produce_arguments(self): + value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc")) + producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:0'}, default_value_schema=value_schema) + + try: + producer.produce(topic='test', value={"name": 'abc"'}) + except Exception as e: + exc_type, exc_obj, exc_tb = sys.exc_info() + if exc_type.__name__ == 'SerializerError': + self.fail() + + def test_produce_arguments_list(self): + producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:0'}) + try: + producer.produce(topic='test', value={"name": 'abc"'}, key='mykey') + except Exception as e: + exc_type, exc_obj, exc_tb = sys.exc_info() + if exc_type.__name__ == 'SerializerError': + pass + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(TestAvroProducer) diff --git a/tests/avro/test_cached_client.py b/tests/avro/test_cached_client.py new file mode 100644 index 000000000..97dbcc241 --- /dev/null +++ b/tests/avro/test_cached_client.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# + +import sys +import time + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from tests.avro import mock_registry +from tests.avro import data_gen +from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient +from confluent_kafka import avro + + +class TestCacheSchemaRegistryClient(unittest.TestCase): + def setUp(self): + self.server = mock_registry.ServerThread(0) + self.server.start() + time.sleep(1) + self.client = CachedSchemaRegistryClient('http://127.0.0.1:' + str(self.server.server.server_port)) + + def tearDown(self): + self.server.shutdown() + self.server.join() + + def test_register(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + client = self.client + schema_id = client.register('test', parsed) + self.assertTrue(schema_id > 0) + self.assertEqual(len(client.id_to_schema), 1) + + def test_multi_subject_register(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + client = self.client + schema_id = client.register('test', parsed) + self.assertTrue(schema_id > 0) + + # register again under different subject + dupe_id = client.register('other', parsed) + self.assertEqual(schema_id, dupe_id) + self.assertEqual(len(client.id_to_schema), 1) + + def test_dupe_register(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + subject = 'test' + client = self.client + schema_id = client.register(subject, parsed) + self.assertTrue(schema_id > 0) + latest = client.get_latest_schema(subject) + + # register again under same subject + dupe_id = client.register(subject, parsed) + self.assertEqual(schema_id, dupe_id) + dupe_latest = client.get_latest_schema(subject) + self.assertEqual(latest, dupe_latest) + + def assertLatest(self, meta_tuple, sid, schema, version): + self.assertNotEqual(sid, -1) + self.assertNotEqual(version, -1) + self.assertEqual(meta_tuple[0], sid) + self.assertEqual(meta_tuple[1], schema) + self.assertEqual(meta_tuple[2], version) + + def test_getters(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + client = self.client + subject = 'test' + version = client.get_version(subject, parsed) + self.assertEqual(version, None) + schema = client.get_by_id(1) + self.assertEqual(schema, None) + latest = client.get_latest_schema(subject) + self.assertEqual(latest, (None, None, None)) + + # register + schema_id = client.register(subject, parsed) + latest = client.get_latest_schema(subject) + version = client.get_version(subject, parsed) + self.assertLatest(latest, schema_id, parsed, version) + + fetched = client.get_by_id(schema_id) + self.assertEqual(fetched, parsed) + + def test_multi_register(self): + basic = avro.loads(data_gen.BASIC_SCHEMA) + adv = avro.loads(data_gen.ADVANCED_SCHEMA) + subject = 'test' + client = self.client + + id1 = client.register(subject, basic) + latest1 = client.get_latest_schema(subject) + v1 = client.get_version(subject, basic) + self.assertLatest(latest1, id1, basic, v1) + + id2 = client.register(subject, adv) + latest2 = client.get_latest_schema(subject) + v2 = client.get_version(subject, adv) + self.assertLatest(latest2, id2, adv, v2) + + self.assertNotEqual(id1, id2) + self.assertNotEqual(latest1, latest2) + # ensure version is higher + self.assertTrue(latest1[2] < latest2[2]) + + client.register(subject, basic) + latest3 = client.get_latest_schema(subject) + # latest should not change with a re-reg + self.assertEqual(latest2, latest3) + + def hash_func(self): + return hash(str(self)) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(TestCacheSchemaRegistryClient) diff --git a/tests/avro/test_message_serializer.py b/tests/avro/test_message_serializer.py new file mode 100644 index 000000000..2ec7f126b --- /dev/null +++ b/tests/avro/test_message_serializer.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# +import struct +import sys + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from tests.avro import data_gen +from confluent_kafka.avro.serializer.message_serializer import MessageSerializer +from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient +from confluent_kafka import avro + + +class TestMessageSerializer(unittest.TestCase): + def setUp(self): + # need to set up the serializer + self.client = MockSchemaRegistryClient() + self.ms = MessageSerializer(self.client) + + def assertMessageIsSame(self, message, expected, schema_id): + self.assertTrue(message) + self.assertTrue(len(message) > 5) + magic, sid = struct.unpack('>bI', message[0:5]) + self.assertEqual(magic, 0) + self.assertEqual(sid, schema_id) + decoded = self.ms.decode_message(message) + self.assertTrue(decoded) + self.assertEqual(decoded, expected) + + def test_encode_with_schema_id(self): + adv = avro.loads(data_gen.ADVANCED_SCHEMA) + basic = avro.loads(data_gen.BASIC_SCHEMA) + subject = 'test' + schema_id = self.client.register(subject, basic) + + records = data_gen.BASIC_ITEMS + for record in records: + message = self.ms.encode_record_with_schema_id(schema_id, record) + self.assertMessageIsSame(message, record, schema_id) + + subject = 'test_adv' + adv_schema_id = self.client.register(subject, adv) + self.assertNotEqual(adv_schema_id, schema_id) + records = data_gen.ADVANCED_ITEMS + for record in records: + message = self.ms.encode_record_with_schema_id(adv_schema_id, record) + self.assertMessageIsSame(message, record, adv_schema_id) + + def test_encode_record_with_schema(self): + topic = 'test' + basic = avro.loads(data_gen.BASIC_SCHEMA) + subject = 'test-value' + schema_id = self.client.register(subject, basic) + records = data_gen.BASIC_ITEMS + for record in records: + message = self.ms.encode_record_with_schema(topic, basic, record) + self.assertMessageIsSame(message, record, schema_id) + + def hash_func(self): + return hash(str(self)) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(unittest.BaseTestSuite) diff --git a/tests/avro/test_mock_client.py b/tests/avro/test_mock_client.py new file mode 100644 index 000000000..823b0bb5d --- /dev/null +++ b/tests/avro/test_mock_client.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# +import sys + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from tests.avro import data_gen +from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient +from confluent_kafka import avro + + +class TestMockSchemaRegistryClient(unittest.TestCase): + def setUp(self): + self.client = MockSchemaRegistryClient() + + def test_register(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + client = self.client + schema_id = client.register('test', parsed) + self.assertTrue(schema_id > 0) + self.assertEqual(len(client.id_to_schema), 1) + + def test_multi_subject_register(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + client = self.client + schema_id = client.register('test', parsed) + self.assertTrue(schema_id > 0) + + # register again under different subject + dupe_id = client.register('other', parsed) + self.assertEqual(schema_id, dupe_id) + self.assertEqual(len(client.id_to_schema), 1) + + def test_dupe_register(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + subject = 'test' + client = self.client + schema_id = client.register(subject, parsed) + self.assertTrue(schema_id > 0) + latest = client.get_latest_schema(subject) + + # register again under same subject + dupe_id = client.register(subject, parsed) + self.assertEqual(schema_id, dupe_id) + dupe_latest = client.get_latest_schema(subject) + self.assertEqual(latest, dupe_latest) + + def assertLatest(self, meta_tuple, sid, schema, version): + self.assertNotEqual(sid, -1) + self.assertNotEqual(version, -1) + self.assertEqual(meta_tuple[0], sid) + self.assertEqual(meta_tuple[1], schema) + self.assertEqual(meta_tuple[2], version) + + def test_getters(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + client = self.client + subject = 'test' + version = client.get_version(subject, parsed) + self.assertEqual(version, -1) + schema = client.get_by_id(1) + self.assertEqual(schema, None) + latest = client.get_latest_schema(subject) + self.assertEqual(latest, (None, None, None)) + + # register + schema_id = client.register(subject, parsed) + latest = client.get_latest_schema(subject) + version = client.get_version(subject, parsed) + self.assertLatest(latest, schema_id, parsed, version) + + fetched = client.get_by_id(schema_id) + self.assertEqual(fetched, parsed) + + def test_multi_register(self): + basic = avro.loads(data_gen.BASIC_SCHEMA) + adv = avro.loads(data_gen.ADVANCED_SCHEMA) + subject = 'test' + client = self.client + + id1 = client.register(subject, basic) + latest1 = client.get_latest_schema(subject) + v1 = client.get_version(subject, basic) + self.assertLatest(latest1, id1, basic, v1) + + id2 = client.register(subject, adv) + latest2 = client.get_latest_schema(subject) + v2 = client.get_version(subject, adv) + self.assertLatest(latest2, id2, adv, v2) + + self.assertNotEqual(id1, id2) + self.assertNotEqual(latest1, latest2) + # ensure version is higher + self.assertTrue(latest1[2] < latest2[2]) + + client.register(subject, basic) + latest3 = client.get_latest_schema(subject) + # latest should not change with a re-reg + self.assertEqual(latest2, latest3) + + def hash_func(self): + return hash(str(self)) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(TestMockSchemaRegistryClient) diff --git a/tests/avro/test_util.py b/tests/avro/test_util.py new file mode 100644 index 000000000..9ed9045d3 --- /dev/null +++ b/tests/avro/test_util.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# +# derived from https://github.com/verisign/python-confluent-schemaregistry.git +# + +import sys + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from avro import schema +from tests.avro import data_gen +from confluent_kafka import avro + + +class TestUtil(unittest.TestCase): + def test_schema_from_string(self): + parsed = avro.loads(data_gen.BASIC_SCHEMA) + self.assertTrue(isinstance(parsed, schema.Schema)) + + def test_schema_from_file(self): + parsed = avro.load(data_gen.get_schema_path('adv_schema.avsc')) + self.assertTrue(isinstance(parsed, schema.Schema)) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(TestUtil) diff --git a/tests/test_docs.py b/tests/test_docs.py index 1c78d37c4..1c22baf72 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -1,10 +1,12 @@ #!/usr/bin/env python -import confluent_kafka import re -from types import ModuleType -from collections import defaultdict import sys +from collections import defaultdict +from types import ModuleType + +import confluent_kafka + def build_doctree (tree, prefix, parent): """ Build doctree dict with format: @@ -22,7 +24,11 @@ def build_doctree (tree, prefix, parent): tree[full].append(o) if hasattr(o, '__dict__'): - build_doctree(tree, full + '.', o) + is_module = isinstance(o, ModuleType) + is_ck_package = o.__dict__.get('__module__', '').startswith('confluent_kafka.') + is_cimpl_package = o.__dict__.get('__module__', '').startswith('cimpl.') + if not is_module or is_ck_package or is_cimpl_package: + build_doctree(tree, full + '.', o) def test_verify_docs(): From e7c7c0cd9b2469629792993be404eb05cb10cd51 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Mon, 21 Nov 2016 14:18:02 -0800 Subject: [PATCH 2/3] Client code for AvroConsumer --- README.md | 18 ++++++++ confluent_kafka/avro/__init__.py | 44 ++++++++++++++++++- .../avro/cached_schema_registry_client.py | 5 ++- confluent_kafka/src/confluent_kafka.c | 23 ++++++++++ tests/avro/test_avro_consumer.py | 39 ++++++++++++++++ 5 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 tests/avro/test_avro_consumer.py diff --git a/README.md b/README.md index 4d7eacdbd..3feb3460a 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,24 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema. avroProducer.produce(topic='my_topic', value=value, key=key) ``` +**AvroConsumer** +``` +from confluent_kafka.avro import AvroConsumer + +c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'}) +c.subscribe(['my_topic']) +running = True +while running: + msg = c.poll(10) + if msg: + if not msg.error(): + print(msg.value()) + elif msg.error().code() != KafkaError._PARTITION_EOF: + print(msg.error()) + running = False +c.close() +``` + See [examples](examples) for more examples. diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 354a030eb..ec4365928 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -4,7 +4,7 @@ """ import sys -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD'] @@ -112,3 +112,45 @@ def produce(self, **kwargs): raise SerializerError("Avro schema required for key") super(AvroProducer, self).produce(topic, value, key, **kwargs) + + +class AvroConsumer(Consumer): + """ + Kafka Consumer client which does avro schema decoding of messages. + Handles message deserialization. + + Constructor takes below parameters + + @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url). + """ + def __init__(self, config): + + if ('schema.registry.url' not in config.keys()): + raise ValueError("Missing parameter: schema.registry.url") + schem_registry_url = config["schema.registry.url"] + del config["schema.registry.url"] + + super(AvroConsumer, self).__init__(config) + self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url)) + + def poll(self, timeout): + """ + This is an overriden method from confluent_kafka.Consumer class. This handles message + deserialization using avro schema + + @:param timeout + @:return message object with deserialized key and value as dict objects + """ + message = super(AvroConsumer, self).poll(timeout) + if not message: + return message + print(message) + if not message.error(): + if message.value(): + decoded_value = self._serializer.decode_message(message.value()) + message.set_value(decoded_value) + if message.key(): + decoded_key = self._serializer.decode_message(message.key()) + message.set_key(decoded_key) + return message + diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 61e258761..97735c329 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -26,6 +26,7 @@ import requests from . import ClientError, VALID_LEVELS +from . import loads # Common accept header sent ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json" @@ -161,7 +162,7 @@ def get_by_id(self, schema_id): # need to parse the schema schema_str = result.get("schema") try: - result = avro.loads(schema_str) + result = loads(schema_str) # cache it self._cache_schema(result, schema_id) return result @@ -200,7 +201,7 @@ def get_latest_schema(self, subject): schema = self.id_to_schema[schema_id] else: try: - schema = avro.loads(result['schema']) + schema = loads(result['schema']) except: # bad schema - should not happen raise ClientError("Received bad schema from registry.") diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index a323ba2fd..db410ff05 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -332,6 +332,19 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { self->timestamp); } +static PyObject *Message_set_value (Message *self, PyObject *new_val) { + Py_DECREF(self->value); + self->value = new_val; + Py_INCREF(self->value); + Py_RETURN_NONE; +} + +static PyObject *Message_set_key (Message *self, PyObject *new_key) { + Py_DECREF(self->key); + self->key = new_key; + Py_INCREF(self->key); + Py_RETURN_NONE; +} static PyMethodDef Message_methods[] = { { "error", (PyCFunction)Message_error, METH_NOARGS, @@ -391,6 +404,16 @@ static PyMethodDef Message_methods[] = { " :rtype: (int, int)\n" "\n" }, + { "set_value", (PyCFunction)Message_set_value, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, + { "set_key", (PyCFunction)Message_set_key, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, { NULL } }; diff --git a/tests/avro/test_avro_consumer.py b/tests/avro/test_avro_consumer.py new file mode 100644 index 000000000..1581deb06 --- /dev/null +++ b/tests/avro/test_avro_consumer.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from confluent_kafka.avro import AvroConsumer + + +class TestAvroConsumer(unittest.TestCase): + def setUp(self): + pass + + def test_instantiation(self): + obj = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', + 'schema.registry.url': 'http://127.0.0.1:9002'}) + self.assertTrue(isinstance(obj, AvroConsumer)) + self.assertNotEqual(obj, None) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(TestAvroConsumer) From 5fcfa11d95db3002696b321febc4b7d21a823a64 Mon Sep 17 00:00:00 2001 From: Roopa Hiremath Chandrasekaraiah Date: Mon, 21 Nov 2016 14:18:02 -0800 Subject: [PATCH 3/3] Client code for AvroConsumer --- README.md | 18 ++++++++ confluent_kafka/avro/__init__.py | 44 ++++++++++++++++++- .../avro/cached_schema_registry_client.py | 5 ++- confluent_kafka/src/confluent_kafka.c | 23 ++++++++++ tests/avro/test_avro_consumer.py | 39 ++++++++++++++++ 5 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 tests/avro/test_avro_consumer.py diff --git a/README.md b/README.md index 4d7eacdbd..3feb3460a 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,24 @@ avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema. avroProducer.produce(topic='my_topic', value=value, key=key) ``` +**AvroConsumer** +``` +from confluent_kafka.avro import AvroConsumer + +c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'}) +c.subscribe(['my_topic']) +running = True +while running: + msg = c.poll(10) + if msg: + if not msg.error(): + print(msg.value()) + elif msg.error().code() != KafkaError._PARTITION_EOF: + print(msg.error()) + running = False +c.close() +``` + See [examples](examples) for more examples. diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 354a030eb..ec4365928 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -4,7 +4,7 @@ """ import sys -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD'] @@ -112,3 +112,45 @@ def produce(self, **kwargs): raise SerializerError("Avro schema required for key") super(AvroProducer, self).produce(topic, value, key, **kwargs) + + +class AvroConsumer(Consumer): + """ + Kafka Consumer client which does avro schema decoding of messages. + Handles message deserialization. + + Constructor takes below parameters + + @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url). + """ + def __init__(self, config): + + if ('schema.registry.url' not in config.keys()): + raise ValueError("Missing parameter: schema.registry.url") + schem_registry_url = config["schema.registry.url"] + del config["schema.registry.url"] + + super(AvroConsumer, self).__init__(config) + self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url)) + + def poll(self, timeout): + """ + This is an overriden method from confluent_kafka.Consumer class. This handles message + deserialization using avro schema + + @:param timeout + @:return message object with deserialized key and value as dict objects + """ + message = super(AvroConsumer, self).poll(timeout) + if not message: + return message + print(message) + if not message.error(): + if message.value(): + decoded_value = self._serializer.decode_message(message.value()) + message.set_value(decoded_value) + if message.key(): + decoded_key = self._serializer.decode_message(message.key()) + message.set_key(decoded_key) + return message + diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 61e258761..97735c329 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -26,6 +26,7 @@ import requests from . import ClientError, VALID_LEVELS +from . import loads # Common accept header sent ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json" @@ -161,7 +162,7 @@ def get_by_id(self, schema_id): # need to parse the schema schema_str = result.get("schema") try: - result = avro.loads(schema_str) + result = loads(schema_str) # cache it self._cache_schema(result, schema_id) return result @@ -200,7 +201,7 @@ def get_latest_schema(self, subject): schema = self.id_to_schema[schema_id] else: try: - schema = avro.loads(result['schema']) + schema = loads(result['schema']) except: # bad schema - should not happen raise ClientError("Received bad schema from registry.") diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index a323ba2fd..db410ff05 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -332,6 +332,19 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) { self->timestamp); } +static PyObject *Message_set_value (Message *self, PyObject *new_val) { + Py_DECREF(self->value); + self->value = new_val; + Py_INCREF(self->value); + Py_RETURN_NONE; +} + +static PyObject *Message_set_key (Message *self, PyObject *new_key) { + Py_DECREF(self->key); + self->key = new_key; + Py_INCREF(self->key); + Py_RETURN_NONE; +} static PyMethodDef Message_methods[] = { { "error", (PyCFunction)Message_error, METH_NOARGS, @@ -391,6 +404,16 @@ static PyMethodDef Message_methods[] = { " :rtype: (int, int)\n" "\n" }, + { "set_value", (PyCFunction)Message_set_value, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, + { "set_key", (PyCFunction)Message_set_key, METH_O, + " :returns: None.\n" + " :rtype: None\n" + "\n" + }, { NULL } }; diff --git a/tests/avro/test_avro_consumer.py b/tests/avro/test_avro_consumer.py new file mode 100644 index 000000000..1581deb06 --- /dev/null +++ b/tests/avro/test_avro_consumer.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +if sys.version_info[0] < 3: + import unittest +else: + import unittest2 as unittest +from confluent_kafka.avro import AvroConsumer + + +class TestAvroConsumer(unittest.TestCase): + def setUp(self): + pass + + def test_instantiation(self): + obj = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', + 'schema.registry.url': 'http://127.0.0.1:9002'}) + self.assertTrue(isinstance(obj, AvroConsumer)) + self.assertNotEqual(obj, None) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(TestAvroConsumer)