Skip to content

AvroConsumer for handling schema registry #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 28, 2016

Conversation

roopahc
Copy link
Contributor

@roopahc roopahc commented Nov 22, 2016

Fixes #36

@roopahc roopahc force-pushed the schema_registry_producer branch from 926e55e to 550688d Compare November 22, 2016 00:41
```
from confluent_kafka.avro import AvroConsumer

c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'greoupid', 'schema.registry.url': 'http://127.0.0.1:9002'})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: greoupid

the schema registry url port almost looks like 9092 (kafka port), while the default port is 8081, so might want to use that instead

msg = c.poll(10)
if msg:
if not msg.error():
print(msg.value())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want an example that actually uses the deserialized type? This looks just like a standard non-avro consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill How do we show the type of msg? msg value is going to be a dict object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea, but it is boring if the example is no different than the vanilla example, right? :)

message = super(AvroConsumer, self).poll(timeout)
if not message:
return message
print(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove print

@@ -332,6 +332,19 @@ static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
self->timestamp);
}

static PyObject *Message_set_value (Message *self, PyObject *new_val) {
Py_DECREF(self->value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe both value and key can be NULL, do:

if (self->value)
  Py_DECREF(self->value);
```

and same for key

@@ -391,6 +404,16 @@ static PyMethodDef Message_methods[] = {
" :rtype: (int, int)\n"
"\n"
},
{ "set_value", (PyCFunction)Message_set_value, METH_O,
" :returns: None.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring saying what the method actually does

c.subscribe(['my_topic'])
running = True
while running:
msg = c.poll(10)
if msg:
if not msg.error():
if (msg.value() && isinstance(msg.value(), dict)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever not true?
From what I understand the deserializer will raise an exception if it can't deserialize a message.
Speaking of which, this example should probably have a proper try:..except block catching such errors, right?

Py_INCREF(self->value);
if (self->value) {
Py_DECREF(self->value);
self->value = new_val;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only sets a new value if there was an old one, which isnt right.
Look at my initial comment.

@@ -405,11 +409,13 @@ static PyMethodDef Message_methods[] = {
"\n"
},
{ "set_value", (PyCFunction)Message_set_value, METH_O,
" Set the field 'Message.value' with new value.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
except SerializerError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is SerializerError defined? I can't seem to find it

print(msg.error())
running = False
except SerializerError:
print("Unable to decode the message")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print the exception message

@@ -410,12 +410,14 @@ static PyMethodDef Message_methods[] = {
},
{ "set_value", (PyCFunction)Message_set_value, METH_O,
" Set the field 'Message.value' with new value.\n"
" :param: PyObject value: Message.value.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyObject is a C thing, right? Maybe you 'object' or 'anyobject' or such

if not message:
return message
if not message.error():
if message.value():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to check if message.value() is not None here. An empty bytes can be a valid message but wouldn't be deserialized here. (In particular this can be an issue if the serialization format does not encode fields that contain their default values, resulting in an empty, but non-null, value.) Same for the check of the key below.

@roopahc
Copy link
Contributor Author

roopahc commented Nov 23, 2016

@edenhill @ewencp Let me know if it looks OK now.

except SerializerError:
print("Unable to decode the message")
except SerializerError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, but maybe a bit excessive for an example, I just ment to print the exception string, something like:
print("Message deserialization failed for %s: %s" % (msg, e))

@edenhill
Copy link
Contributor

I'm happy with this. @ewencp ?

@ewencp
Copy link
Contributor

ewencp commented Nov 28, 2016

LGTM

Roopa Hiremath Chandrasekaraiah added 2 commits November 28, 2016 11:40
@edenhill edenhill merged commit a9601c4 into confluentinc:master Nov 28, 2016
@edenhill
Copy link
Contributor

Thanks again, great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants