-
Notifications
You must be signed in to change notification settings - Fork 883
Producer Refactor Mk II #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
using System; | ||
using System.Text; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using Confluent.Kafka.Serialization; | ||
|
@@ -14,23 +15,22 @@ public static void Main(string[] args) | |
|
||
var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; | ||
|
||
using (var producer = new Producer<Null, string>(config)) | ||
using (var producer = new Producer<Null, string>(config, new NullSerializer(), new StringSerializer(Encoding.UTF8))) | ||
{ | ||
// TODO: figure out why the cast below is necessary and how to avoid it. | ||
// TODO: There should be no need to specify a serializer for common types like string - I think it should default to the UTF8 serializer. | ||
producer.ValueSerializer = (ISerializer<string>)new Confluent.Kafka.Serialization.Utf8StringSerializer(); | ||
|
||
Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ctrl-c baby, not "q". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've decided I disagree. The purpose of these examples is to demonstrate usage of the client in a straightforward, easy to understand way. Turns out that detecting Ctrl-C is a bit convoluted, in fact there are as many lines dedicated to doing this properly as demonstrating the producer. None of the code is rocket science of course, so i'm sort of indifferent, but on balance, I think using q to exit is better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might seem like a tiny nitpick thing, but there are two proper reasons:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. regarding #1 - i wouldn't call using 'q' to exit 'incorrect' as such. how about i leave it in the advanced producer example and keep it out of the simple producer example - keep that as dead simple as possible - i want the first example people look at to be inviting and not scary in any way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ewencp AppDomain is changed a lot or doesn't exist in .net core. http://www.michael-whelan.net/replacing-appdomain-in-dotnet-core/ The new assembly unload event mentioned in the above article is not effective at catching Ctrl-C. Examples I see around the web use Console.CancelKeyPress. I'm not certain there is not a better way, but it seems likely CancelKeyPress is good. Another reason not to include this: there are higher priorities than figuring this out. |
||
|
||
string text; | ||
while ((text = Console.ReadLine()) != "q") | ||
{ | ||
Task<DeliveryReport> deliveryReport = producer.Produce(topicName, text); | ||
Task<DeliveryReport> deliveryReport = producer.ProduceAsync(topicName, null, text); | ||
var unused = deliveryReport.ContinueWith(task => | ||
{ | ||
Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); | ||
}); | ||
} | ||
|
||
// Tasks are not waited on, it's possible they may still in progress here. | ||
producer.Flush(); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
using System.Text; | ||
using System.Collections.Generic; | ||
using Confluent.Kafka.Serialization; | ||
|
||
namespace Confluent.Kafka.Wrapped | ||
{ | ||
/// <summary> | ||
/// An example showing how to wrap a single Producer to produce messages using | ||
/// different serializers. | ||
/// </summary> | ||
/// <remarks> | ||
/// If you only want to use a single pair of serializers in your application, | ||
/// you should use the Producer<TKey, TValue> constructor instead. | ||
/// </remarks> | ||
public class Program | ||
{ | ||
public static void Main(string[] args) | ||
{ | ||
var config = new Dictionary<string, object> { { "bootstrap.servers", args[0] } }; | ||
|
||
using (var producer = new Producer(config)) | ||
{ | ||
// sProducer1 is a lightweight wrapper around a Producer instance that adds | ||
// (string, string) serialization. Note that sProducer1 does not need to be | ||
// (and cannot be) disposed. | ||
var sProducer1 = producer.Wrap<string, string>(new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)); | ||
|
||
// sProducer2 is another lightweight wrapper around kafkaProducer that adds | ||
// (null, int) serialization. When you do not wish to write any data to a key | ||
// or value, the Null type should be used. | ||
var sProducer2 = producer.Wrap<Null, int>(new NullSerializer(), new IntSerializer()); | ||
|
||
// write (string, string) data to topic "first-topic", statically type checked. | ||
sProducer1.ProduceAsync("first-topic", "my-key-value", "my-value"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, i forgot to wait on these tasks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned elsewhere, ideally this could just be a For that matter, this does raise the question of how "aggregate" operations behave for the wrapper producers. i.e. would There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want If we have a flush method, it doesn't make sense to put it on ISerializingProducer. It'd be on the concrete Producer and Producer<TKey, TValue> only. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dispose method of Producer effectively flushes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to have en explicit Flush() method, and not to do an implicit Flush() when disposing, to allow applications to exit quickly without waiting for message transmission (which may block for a long time) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point about not wanting to flush in Dispose... shutdown is not the only consideration - a using statement will typically be used to wrap a producer and this is equivalent to try / finally. We probably don't want messages being flushed before an exception gets handled. I guess Flush is a good thing to have in addition to the Tasks, as some people will probably just want to fire and forget and ignore the Tasks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the other hand not flushing in .Dispose makes the API more error prone as users will often be in a situation where there may be messages in flight when they want to exit because all the calls are async. So, calling Flush will usually be an appropriate thing to do right at the end of the Producer using block. Also, if an exception makes it outside the Producer scope, there is no reference to the producer, so no option to Flush. Thoughts on having a property .FlushOnDispose, which by default is true? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None of the other clients does an implicit flush on dispose, so I dont think we should alter that behaviour in this client. Also note that flush, and thus dispose, might block for up to message.timeout.ms which defaults to 5 minutes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I agree. I think I'm only still thinking about flush on dispose because that is the existing behavior of rdkafka-dotnet (and there are some positives to the idea). But now I'm seeing it from a different point of view - it's not normal to wait a long time on dispose, so it's a counter intuitive thing to do. I'll work out something similar to the other clients. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as indication that it's counter intuitive to flush in dispose, see my first comment in this thread "oh, i forgot to wait on these tasks." - this was before I started thinking about what was happening in the dispose method and before people started suggesting a flush method. my intuition then was dispose was not going to wait on anything. |
||
|
||
// write (null, int) data to topic "second-data". statically type checked, using | ||
// the same underlying producer as the producer1. | ||
sProducer2.ProduceAsync("second-topic", null, 42); | ||
|
||
// producers are NOT tied to topics. Although it's unusual that you might want to | ||
// do so, you can use different serializing producers to write to the same topic. | ||
sProducer2.ProduceAsync("first-topic", null, 107); | ||
|
||
// ProducerAsync tasks are not waited on - there is a good chance they are still | ||
// in flight. | ||
producer.Flush(); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
{ | ||
"version": "1.0.0", | ||
"authors": ["Confluent Inc"], | ||
|
||
"buildOptions": { | ||
"emitEntryPoint": true | ||
}, | ||
|
||
"dependencies": { | ||
"Confluent.Kafka": { | ||
"target": "project" | ||
} | ||
}, | ||
|
||
"frameworks": { | ||
"netcoreapp1.0": { | ||
"dependencies": { | ||
"Microsoft.NETCore.App": { | ||
"type": "platform", | ||
"version": "1.0.0" | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
namespace Confluent.Kafka | ||
{ | ||
/* | ||
TODO: (via ewencp): The equivalent in the Python client fills in more information -- | ||
the callbacks accept (err, msg) parameters, where the latter is | ||
http://docs.confluent.io/3.1.0/clients/confluent-kafka-python/index.html#confluent_kafka.Message | ||
Same deal with Go where the DR channel gets one of these: | ||
http://docs.confluent.io/3.1.0/clients/confluent-kafka-go/index.html#Message Is this being | ||
kept more minimal intentionally? | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed on Slack, the message metadata contains an ever growing number of fields, so passing a rich Message object to the delivery report , like the other clients, is most likely the best way forward. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right. however if a very common use case is to just produce a key and value (I think it is), then it's worth having a Producer.ProducerAsync overload for this as well to make the interface easier to use. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is hard to make assumptions on what information the dr callback needs based on the produce() arguments. We should provide whatever librdkafka provides in the dr. |
||
|
||
public struct DeliveryReport | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The equivalent in the Python client fills in more information -- the callbacks accept There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unchanged from rdkafka-dotnet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A delivery report will need at least:
Other stuff that might be useful:
Wrapping this in a Message type is consistent with other clients. |
||
{ | ||
public int Partition; | ||
public long Offset; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,6 +69,16 @@ internal void Init(RdKafkaType type, IntPtr config, Config.LogCallback logger) | |
callbackTask = StartCallbackTask(callbackCts.Token); | ||
} | ||
|
||
// TODO: Add timout parameter (with default option == block indefinitely) when use rd_kafka_flush. | ||
public void Flush() | ||
{ | ||
// TODO: use rd_kafka_flush here instead.. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would've been less code calling rd_kafka_flush() than adding these comments ;) |
||
while (OutQueueLength > 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is actually a rd_kafka_flush() call that should beused. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I checked and this wasn't exposed in the LibRdKafka layer yet whereas the OutQueueLength stuff was. Agreed that we should use the correct internal version though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. I was just doing whatever ah- was here. will change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll put a todo. I want to prioritize getting the high level API right, and i'm going to be reviewing / addressing a lot more lower level stuff in future PRs |
||
{ | ||
handle.Poll((IntPtr) 100); | ||
} | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
Dispose(true); | ||
|
@@ -83,12 +93,6 @@ protected virtual void Dispose(bool disposing) | |
|
||
if (disposing) | ||
{ | ||
// Wait until all outstanding sends have completed. | ||
while (OutQueueLength > 0) | ||
{ | ||
handle.Poll((IntPtr) 100); | ||
} | ||
|
||
handle.Dispose(); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
using System.Threading.Tasks; | ||
using Confluent.Kafka.Serialization; | ||
|
||
|
||
namespace Confluent.Kafka | ||
{ | ||
public interface ISerializingProducer<TKey, TValue> | ||
{ | ||
string Name { get; } | ||
|
||
ISerializer<TKey> KeySerializer { get; } | ||
|
||
ISerializer<TValue> ValueSerializer { get; } | ||
|
||
Task<DeliveryReport> ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this plethora of variants? And then the full one: Or even better: Produce(Message) where Message is a class that we expand as necessary when new functionality (e.g. timestamp, soon headers) are added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been thinking we should get rid of the val only variant because it's not common enough to justify. @ewencp please agree/disagree. The reason this variant existed is i'm bringing forward the capability from rdkafka-dotnet. So now i only have one ProduceAsync variant on the typed producer (takes TKey, TValue) I wanted to avoid forcing people to do new Message(...), but you bring up a good point about extensibility. However, I propose keeping things the way they are and not introducing a Message class/struct now. I think in the future it will still be the most common use case to send (key, value) an not set timestamp / header explicitly - so it would be nice to have this short hand way of doing that. In the future, we introduce a new overload that takes a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, sort of makes sense, but timestamp is available today already so it should probably be supported. |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for setting these properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"mirrors the librdkafka performance test example.". Was trying to get something directly comparable to your numbers.