Skip to content

Conversation

mhowlett
Copy link
Contributor

@ewencp @edenhill

This PR is primarily a refactor of the Producer class.

Intended as an incremental step forward - still working out stuff, lots of open questions + notes to self.

Templating Producer on TKey, TValue seems almost certain at this point.


using (Producer producer = new Producer(new Dictionary<string, string> { { "bootstrap.servers", brokerList } }))
using (Topic topic = producer.Topic(topicName))
var config = new Dictionary<string, string> { { "bootstrap.servers", brokerList } };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this support generic values?
E.g.:
var config = NewDictionary<string,...> { { "bootstrap.servers": "localhost", "session.timeout.ms": 6000, "enable.auto.commit": false } }

Copy link
Contributor Author

@mhowlett mhowlett Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. there is a discussion on this in the previous PR. I'm in favor of changing this to Dictionary<string, object>, and having an extension method that takes Dictionary<string,strin> and furthermore understand's dotnet nested configuration notation of using : as a separator.

producer.KeySerializer = (ISerializer<string>)new Confluent.Kafka.Serialization.Utf8StringSerializer();
producer.ValueSerializer = producer.KeySerializer;

Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ctrl-c instead? 'q' isnt very out-of-bandish

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i was just updating existing examples, which were like this, but i'll make this change now.

}

Task<DeliveryReport> deliveryReport = topic.Produce(data, key);
Task<DeliveryReport> deliveryReport = producer.Produce(topicName, key, text);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the key part wasn't removed from 'text'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i noticed that too, but just left the example as is (doesn't really matter). ok i'll change it.

using (var topic = producer.Topic(topicName))
var config = new Dictionary<string, string> { { "bootstrap.servers", broker } };

using (var producer = new Producer<Empty, byte[]>(config, null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the key and value types specified per producer?
What if you want to produce to two different topics with different object types?
I dont think any other client does it this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. i talked at length with @ewencp about this. you can argue with him :-). I think i'm in favor of it, but i'm uneasy that it doesn't model what's going on properly. The java producer works like this.

I want to expose a way of writing byte[], bye[] efficiently (+ offset / length) so people can write something more general if they really need it.

@ewencp points out that usually the formatter is the same (i.e. json, avro), even though the schema might be different.

i'm still uneasy about it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill Why would that be an issue? You just use Producer<Object, Object> if you need something more general. With Avro types it also wouldn't be uncommon to have something like Producer<string, SpecificRecord> if you need to capture multiple concrete types.

The value in doing this is that you get type safety in the vast majority of cases. While I think it's important to support producing different messages to different topics, I think it's important to optimize for the common case. I'm pretty sure producing messages of a single type to a single topic is the dominant usage pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect we're going to leave this as is. regardless, I'm keen to not undo these changes in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp So if you use <Object,Object> you loose typesafety and you are back to square one, right?

Producer instances are quite heavy and I want to avoid giving people the impression that they should use one per K,V type.

Additionally, if we add interceptor support that is exposed to the .NET client, and there's an interceptor that needs to produce to some arbitrary topic using its own format, now that wouldn't really work with a specific types for <K,V>, right?

callbackCts.Cancel();
callbackTask.Wait();

// TODO: Why is this necessary only when disposing?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im guessing the final destructor (disposing=false) shouldnt block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I expect you're right.
I assume the callbackCts.Cancel / callbackTask can never block. I'm not 100% sure of this though, and want to verify further so will leave the todo above this one in (but take this one out).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure how this works in .Net, but for the other clients we typically dont want to flush/wait for outstanding requests to finish before shutting down if the client instance is automatically destructed (goes out of scope, GCd, etc).

Is there like an explicit dispose you call? If so, that's the case where we want to block and flush, and this seems to be the case in the code already..?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the Dispose pattern. See the discussion of the Dispose(Boolean) method: https://msdn.microsoft.com/en-us/library/fs2xkftw(v=vs.110).aspx

Copy link
Contributor Author

@mhowlett mhowlett Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a note to self that I want to check this very carefully. Still deferring it until later. What's happening in this implementation is not strictly what the Dispose pattern dictates (and i looked a bit closer now and I think it's incorrect). Also, I don't like that there are producer specific things happening here on Handle which is the base class of Producer / Consumer.

Further notes about the dispose method pattern:

The method can block. If it blocks indefinitely, the .NET runtime will deal with it and give up eventually.

Before the if statement we're supposed to dispose any unmanaged resources owned directly by this class regardless of whether the method is called by the finalizer or via Dispose. In the implementation, the poll thread is cancelled instead. I think it's possible that the callbackCts and callbackTask objects have already been collected in the event the method is being called by the finalizer (not good!), because these are managed objects and cleanup order is not deterministic. Therefore, I think they should be in the if statement (see below).

In the if statement we're supposed to call Dispose methods by any managed objects that are IDisposable.

Also, as noted, I probably don't like the factoring here of having Handle as a base class of both Producer and Consumer. But I have to think that through further.

For now I'm going to move the callbackCts and callbackTask stuff inside the if statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, i'm going to leave is and put a TODO noting the above potential flaw. I'll work all this through in a future PR.

}
}

private Topic getKafkaTopic(string topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should provide this abstraction in librdkafka instead, seeing how all bindings are now doing this exact same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



// TODO: Support the other function overloads in Topic.
// TODO: I'd like a way to produce as (byte[], offset, length) as well if possible all the way down to librdkafka (need to investigate).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there slices in .net?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is ArraySegment, which is a value type, so probably no less efficient, so you probably have a good point.
There is a related PR (that got reverted) in rdkafka-dotnet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I saw some work very recently to fix that PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. i'm monitoring it.


// TODO: Support the other function overloads in Topic.
// TODO: I'd like a way to produce as (byte[], offset, length) as well if possible all the way down to librdkafka (need to investigate).
// TODO: Name these Produce or Send?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other bindings use [pP]roduce()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me, will remove todo.

=> getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val), KeySerializer.Serialize(key));

public Topic Topic(string topic, IEnumerable<KeyValuePair<string, string>> config = null) => new Topic(handle, this, topic, config);
// TODO: do we need both the callback way of doing this and the Task way?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Qs:

  • What's the performance and memory impact of one-task-per-message?
  • How do you poll completion for multiple tasks? (thousands)
  • Can you bind variables to Tasks?
  • Can you bind variables to DeliveryHandlers (callbacks)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1 I don't know, but it worries me. (question for another PR).
#2 It's easy to Wait on many tasks Task.WaitAll(IEnumerable)
#3, #4 i don't think so. the only way I can think of right now is managing this yourself in a dictionary external to the Task or callback (not straightforward).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So re 4 (and maybe 3, not sure how though), we would need an opaque value for the produce call that is later supplied to the callback

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want both since tasks give an easy way to block and wait for completion but callbacks let you follow up on something you're doing with the message as soon as the delivery report happens?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp We argued about the same for the Python client and concluded that it is easy enough to add Tasks/Futures using callbacks, so we sohuld be fine with just providing callbacks.
But that was for Python with a dead-slow Futures implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just remembered Task has the ContinueWith method, which effectively provides a way of specifying a callback when the task completes (it's non blocking). I'll add a note about this in the todo (and think about all the details later).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill Also, the expectations across languages are different, so there are other possible tradeoffs as well. But I agree it'd be good to have an idea of the performance impact. @mhowlett What about async/await? If this is expected to work these days, then we may not be able to avoid the Task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp no we can't avoid Task..

using Confluent.Kafka.Internal;
using Confluent.Kafka.Interop;

// TODO: probably move this to Confluent.Kafka.Internal and also create Conflunet.Kafka.KafkaException.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would calling it just "Exception" (and the underlying KafkaError -> Error) be problematic?
I know it is in Python, but not in Go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's normal to prefix specific exception types with what they are. KafkaException would be more idiomatic. Actually this applies to Producer as well (see new comment there). Generally, things are often prefixed, even if this is redundantly specified in the namespace.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on KafkaException and KafkaError to avoid confusion,
-1 on Consumer and Producer, unnecessarily redundant in my view (and disjoint from other bindings).

Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");
// TODO: work out why explicit cast is needed here.
// TODO: remove need to explicitly specify string serializers - assume Utf8StringSerializer in Producer as default.
producer.KeySerializer = (ISerializer<string>)new Confluent.Kafka.Serialization.Utf8StringSerializer();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider changing the way this setup works. In the Java client serializers are handled specially as the only special thing you can include in constructors in addition to the config (and can be auto-instantiated via reflection if you specify them in your config). Having half of the producer setup after the constructor feels odd (and means you can't make those fields readonly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. i'll fix this in a future PR though.

using (var topic = producer.Topic(topicName))
var config = new Dictionary<string, string> { { "bootstrap.servers", broker } };

using (var producer = new Producer<Empty, byte[]>(config, null))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill Why would that be an issue? You just use Producer<Object, Object> if you need something more general. With Avro types it also wouldn't be uncommon to have something like Producer<string, SpecificRecord> if you need to capture multiple concrete types.

The value in doing this is that you get type safety in the vast majority of cases. While I think it's important to support producing different messages to different topics, I think it's important to optimize for the common case. I'm pretty sure producing messages of a single type to a single topic is the dominant usage pattern.

{
Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");
// TODO: figure out why the cast below is necessary and how to avoid it.
// TODO: There should be no need to specify a serializer for common types like string - I think it should default to the UTF8 serializer.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java serializer doesn't do this since it supports multiple encodings, with UTF8 being the default.

Aside from string are there any other cases where you could reasonably set a default? Seems like everything else would be specific to the serialization format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Int, Long, Empty

@@ -0,0 +1,4 @@
namespace Confluent.Kafka
{
public class Empty {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurred to me that you could also call this Null. There's apparently an implicit null type which is the type for the null value, but you can't actually use that. I think the ideal implementation of this class would be sealed and make it impossible to instantiate it at all so it guarantees you have to use null, though I'm not sure you can do that in C#.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great points.

We can have a sealed class with a private constructor.

if we ever want to pass in a value for this type - which we must do with the current Produce method overloads if TValue is Empty and TKey isn't - it either needs to be an instance of Empty or null.

That leads me to think Null is a better name, null should be the value and Null should be a class that can't be instantiated.

callbackCts.Cancel();
callbackTask.Wait();

// TODO: Why is this necessary only when disposing?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the Dispose pattern. See the discussion of the Dispose(Boolean) method: https://msdn.microsoft.com/en-us/library/fs2xkftw(v=vs.110).aspx

public void Dispose(bool disposing)
{
// TODO: Think carefully about whether the implementation of this method is correct.
if (this.Disposed)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you even need this? Isn't it invalid to invoke Dispose twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't need a finalizer so this is all irrelevant now.

}

// this is atomic.
readDictionary = writeDictionary;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is write dictionary every replaced? I feel like I must be missing something but it doesn't seem like this provides the guarantees you want?

=> getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val), KeySerializer.Serialize(key));

public Topic Topic(string topic, IEnumerable<KeyValuePair<string, string>> config = null) => new Topic(handle, this, topic, config);
// TODO: do we need both the callback way of doing this and the Task way?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want both since tasks give an easy way to block and wait for completion but callbacks let you follow up on something you're doing with the message as soon as the delivery report happens?

@@ -0,0 +1,11 @@
namespace Confluent.Kafka.Serialization
{
public class EmptySerializer : ISerializer<Empty>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this class?

Copy link
Contributor Author

@mhowlett mhowlett Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. In the case we have Producer<string, Null> then the only applicable produce method overload will require a value value which will be put through the serializer.

public static byte[] result = new byte[0];
public byte[] Serialize(Empty val)
{
return result;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this just be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be a debate here, but good point, null will work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants