Skip to content

Conversation

mhowlett
Copy link
Contributor

@ewencp @edenhill
new producer API:

  • create serializing producer: new Producer<TKey, TValue>(...)
  • create untyped producer: new Producer(...)
  • ability to wrap untyped producer to add serializers if desired.
  • ArraySegment capability to SafeTopicHandle and up (marshaling impl. similar to open PR on rdkafka-dotnet).

return (long) LibRdKafka.produce(
handle,
partition,
(IntPtr) (MsgFlags.MSG_F_COPY | (blockIfQueueFull ? MsgFlags.MSG_F_BLOCK : 0)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how long does librdkafka need the memory for? rather than copying it, we could pin it until after the delivery report comes back. this may or may not be more performant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until the delivery report callback returns.

Re .._MSG_F_BLOCK:
since the message queue threshold limit also includes delivery reports, some other thread of the application will need to call poll() for a blocking produce() to ever unblock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: .._MSG_F_BLOCK - there is a thread in Handle which is devoted to calling poll. If the Task based ProduceAsync methods are used, I believe the continuations are always on a different thread, so there will never be a problem. If the callback ProduceAsync methods are used, and the callbacks produce messages, then there is potentially a problem though.

Re memcpy vs pinning, I'm currently thinking I'll leave as is for this version and possibly investigate in a future version.


if (val != null)
{
gchValue = GCHandle.Alloc(val, GCHandleType.Pinned);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect marshaling of byte[] does this guff behind the scenes.

if (val != null)
{
gchValue = GCHandle.Alloc(val, GCHandleType.Pinned);
pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the safety of this is enforced higher up by use of ArraySegment which performs run time bounds checking on parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should arguably (would be better) enforced here. But then we get run time bounds checking twice. yay! maybe we do want (byte[], int, int) in the API after all instead of ArraySegment.

public Task<DeliveryReport> ProduceAsync(string topic, byte[] key, byte[] val, int? partition = null, bool blockIfQueueFull = true)
=> getKafkaTopic(topic).Produce(val, 0, val.Length, key, 0, key.Length, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull);

public Task<DeliveryReport> ProduceAsync(string topic, ArraySegment<byte> key, ArraySegment<byte> val, int? partition = null, bool blockIfQueueFull = true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used ArraySegment here rather than (byte[], int, int) parameters explicitly. This is at odds with many methods in the .NET Framework (eg. BitConverter.ToString). I don't know if there is a good reason for this or whether it's a function of the relative age of ArraySegment struct compared to these other methods. One benefit is ArraySegment provides runtime bounds checking (we could do this explicitly of course though). Note that ArraySegment is a struct (stack allocated), so no GC overhead.

var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };

using (var producer = new Producer<string, string>(config))
using (var producer = new Producer<string, string>(config, new StringSerializer(), new StringSerializer()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp argues that these should be explicit, as most of the time defaults won't be obvious so most of the time if they're left off this has a good chance of being a user error. I agree, though It's kind of a shame.

var sProducer2 = producer.Wrap<Null, int>(new NullSerializer(), new IntSerializer());

// write (string, string) data to topic "first-topic", statically type checked.
sProducer1.ProduceAsync("first-topic", "my-key-value", "my-value");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i forgot to wait on these tasks.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned elsewhere, ideally this could just be a Flush() on the unwrapped producer.

For that matter, this does raise the question of how "aggregate" operations behave for the wrapper producers. i.e. would Flush() in any way isolate itself to messages for the single format (presumably no, given the way this is implemented)? How about things like Dispose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want Flush (though I'm not 100% sure i'm seeing the world correctly here). I think we just want to wait on the tasks, or a collection of tasks.

If we have a flush method, it doesn't make sense to put it on ISerializingProducer. It'd be on the concrete Producer and Producer<TKey, TValue> only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispose method of Producer effectively flushes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have en explicit Flush() method, and not to do an implicit Flush() when disposing, to allow applications to exit quickly without waiting for message transmission (which may block for a long time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about not wanting to flush in Dispose... shutdown is not the only consideration - a using statement will typically be used to wrap a producer and this is equivalent to try / finally. We probably don't want messages being flushed before an exception gets handled.

I guess Flush is a good thing to have in addition to the Tasks, as some people will probably just want to fire and forget and ignore the Tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand not flushing in .Dispose makes the API more error prone as users will often be in a situation where there may be messages in flight when they want to exit because all the calls are async. So, calling Flush will usually be an appropriate thing to do right at the end of the Producer using block.

Also, if an exception makes it outside the Producer scope, there is no reference to the producer, so no option to Flush.

Thoughts on having a property .FlushOnDispose, which by default is true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the other clients does an implicit flush on dispose, so I dont think we should alter that behaviour in this client.

Also note that flush, and thus dispose, might block for up to message.timeout.ms which defaults to 5 minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree.

I think I'm only still thinking about flush on dispose because that is the existing behavior of rdkafka-dotnet (and there are some positives to the idea).

But now I'm seeing it from a different point of view - it's not normal to wait a long time on dispose, so it's a counter intuitive thing to do.

I'll work out something similar to the other clients.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as indication that it's counter intuitive to flush in dispose, see my first comment in this thread "oh, i forgot to wait on these tasks." - this was before I started thinking about what was happening in the dispose method and before people started suggesting a flush method. my intuition then was dispose was not going to wait on anything.

public static void Produce(string broker, string topicName, long numMessages)
{
var deliveryHandler = new DeliveryHandler();
public static void WaitForAllDeliveryReports()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we just lacking Flush() right now? That's how I'd normally expect to wait for all sends to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this benchmark, I'm using the non-Task ProduceAsync method. Probably a premature optimization to even have this option. Maybe we want to get rid of this. One property of this is all the callbacks happen on the same thread.

Anyway, with Tasks, you can do WaitAll on a collection of Tasks, and that is probably the idiomatic way to 'flush'.

Also, if order is guaranteed, you could also just wait on the last Task. I think if messages are produced to different partitions though, order is not guaranteed?

If we include the non task based methods, you're right, Flush is probably arguably necessary.

currently thinking they should be removed.

Copy link
Contributor Author

@mhowlett mhowlett Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did another benchmark test using the Task produce method / WaitAll. It's substantially terser, and there is no noticeable change in perf. But I can't easily profile memory usage - The Task way makes an addition 5M objects.

The callback methods are here because they're in rdkafka-dotnet and I wanted to take them out only after careful consideration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm taking them out. I can't think of any strong reason to have them and if it turns out there is one, it'll be much easier to change the API to add them back in than take them out.


byte cnt = 0;
var val = new byte[100].Select(a => ++cnt).ToArray();
var key = new byte[0];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required to be byte[0] rather than null? I'd normally expect null. I think they end up having the same overhead assuming compression is off, but strictly speaking they are different since null gets encoded as a length of -1 and a zero length array is encoded as 0 followed by 0 bytes. Just want to make sure we're not losing the ability to encode null in this patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about the len == -1 encoding of null? cc @edenhill - quickly looking in rdkafka_msg.c it looks like it gets set to 0 to me if the data is null.

rdkafka-dotnet handles null ok and sets the length to 0.

and whoops, i'd previously noticed this and forgot to make null work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

librdkafka treats key=NULL as Kafka null key, while key!=NULL and size=0 as an empty key, which are not the same thing.
We should allow for the same semantics in this client.

Value is identical.

var sProducer2 = producer.Wrap<Null, int>(new NullSerializer(), new IntSerializer());

// write (string, string) data to topic "first-topic", statically type checked.
sProducer1.ProduceAsync("first-topic", "my-key-value", "my-value");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned elsewhere, ideally this could just be a Flush() on the unwrapped producer.

For that matter, this does raise the question of how "aggregate" operations behave for the wrapper producers. i.e. would Flush() in any way isolate itself to messages for the single format (presumably no, given the way this is implemented)? How about things like Dispose?

@@ -0,0 +1,8 @@
namespace Confluent.Kafka
{
public struct DeliveryReport
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The equivalent in the Python client fills in more information -- the callbacks accept (err, msg) parameters, where the latter is http://docs.confluent.io/3.1.0/clients/confluent-kafka-python/index.html#confluent_kafka.Message Same deal with Go where the DR channel gets one of these: http://docs.confluent.io/3.1.0/clients/confluent-kafka-go/index.html#Message Is this being kept more minimal intentionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unchanged from rdkafka-dotnet.
If we include the message in the delivery report (given the precedent, we should), we're going to need to think about generics here too. I propose doing this in the next PR which is going to be a refactor of the Consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A delivery report will need at least:

  • topic
  • partition
  • offset
  • error
  • msg opaque (or bound variable through other means)

Other stuff that might be useful:

  • value-object
  • key-object
  • value
  • key
  • timestamp
  • future fields that the community makes up, e.g. headers

Wrapping this in a Message type is consistent with other clients.

{
public sealed class Null
{
public static Null Instance = new Null();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? The point of this is that you can't instantiate it, right? It's not Singleton, it's Null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's 'cause I screwed up thinking about the deserializer. it's unnecessary.

}


public class Producer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDisposable
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should implement IDisposable. Supposedly you should only implement IDisposable if you directly handle unmanaged resources. In this case you are just using an IDisposable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely needs to implement IDisposable otherwise producer's resources can't be deterministically cleaned up.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I think it's easy to lose track of where the disposable pattern vs SafeHandle needs to be used. Seems we have one layer of SafeHandles around the underlying C resources and then use IDisposable everywhere else to allow proactive cleanup.

I think part of the confusion comes from overloading what IDisposable means. The docs for IDisposable even say

Implement IDisposable only if you are using unmanaged resources directly. If your app simply uses an object that implements IDisposable, don't provide an IDisposable implementation.

Unfortunately it seems people also use this to also be the equivalent of Closeable in Java (which doesn't need to imply anything about whether something will be garbage collected even if you don't call close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Note there is still some cleaning up to do I think when I get to reviewing Handle. Also wanted to note that inheritance makes this more difficult to think through properly.


public void Dispose()
{
producer.Dispose();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this implementation is incorrect anyway. If I call wrap() twice and then Dispose() on the first wrapper, I'll dispose all the underlying resources and the other ISerializingProducer will break. Docs seem to indicate a SafeHandle or implementing Finalize on the wrapped class is the way to fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't call Dispose on a wrapped object (Wrap returns ISerializingProducer, and concretely the internal SerializingProducer).

If you call Dispose on producer that is wrapped, the SerializingProducer will no longer work and will throw some sort of an exception if it's tried to be used. I could be more explicit about detecting this and throw a more explicit exception.

No one's going to do that in practice, I don't see it as a problem with the concept.

}


public class Producer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDisposable
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do docs work for classes with the same name but different # of generic parameters? Are we going to have to constantly maintain duplicate docstrings (as I assume we are going to fill these all in to get automated generation of docs)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure, but guess it's going to mean duplication.
Yes, I'll fill all these out, but want to get the API right first.

{
public Null Deserialize(byte[] data)
{
return Null.Instance;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of weird. I can put a null into a serializer and get an object back out of the deserializer...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right, this is completely weird and unnecessary. I didn't think too hard about this yet as deserializers aren't used yet.


public class Producer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDisposable
{
private Producer producer;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both should be readonly as they are always set in the constructor and should never change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep.

{ "queue.buffering.max.messages", 500000 },
{ "message.send.max.retries", 3 },
{ "retry.backoff.ms", 500 },
{ "queued.min.messages", 1000000 },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a consumer property, and so is session.timeout.ms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, your example probably tests consumer as well. removed.

var config = new Dictionary<string, object>
{
{ "bootstrap.servers", broker },
{ "queue.buffering.max.messages", 500000 },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for setting these properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"mirrors the librdkafka performance test example.". Was trying to get something directly comparable to your numbers.


byte cnt = 0;
var val = new byte[100].Select(a => ++cnt).ToArray();
var key = new byte[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

librdkafka treats key=NULL as Kafka null key, while key!=NULL and size=0 as an empty key, which are not the same thing.
We should allow for the same semantics in this client.

Value is identical.

var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };

using (var producer = new Producer<Null, string>(config))
using (var producer = new Producer<Null, string>(config, new NullSerializer(), new StringSerializer()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's a NullSerializer and how is it different from not setting a serializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The choice is to either have a NullSerializer class or an explicit check whether the serializer is null in the ProduceAsync method. I'm sort of on the fence here, erring on the side of having NullSerializer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or simply defaulting to NullSerializer if null is passed as serializer in the constructor.
Not sure if this matters though. @ewencp ?

// TODO: There should be no need to specify a serializer for common types like string - I think it should default to the UTF8 serializer.
producer.ValueSerializer = (ISerializer<string>)new Confluent.Kafka.Serialization.Utf8StringSerializer();

Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ctrl-c baby, not "q".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've decided I disagree. The purpose of these examples is to demonstrate usage of the client in a straightforward, easy to understand way. Turns out that detecting Ctrl-C is a bit convoluted, in fact there are as many lines dedicated to doing this properly as demonstrating the producer. None of the code is rocket science of course, so i'm sort of indifferent, but on balance, I think using q to exit is better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might seem like a tiny nitpick thing, but there are two proper reasons:

  • examples should be correct, even if for unrelated stuff, people will base their own code on this.
  • out-of-band cancellation shows an interesting problem: how do I break out of the consume loop. If we can't show people how to do that in an effective and correct manner they will get it wrong and that will bite us back.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is AppDomain.ProcessExit not a workable solution that will be relatively small?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding #1 - i wouldn't call using 'q' to exit 'incorrect' as such.
regarding #2 - you've convinced me it's useful. People won't be using Console.ReadLine, but many will be making console apps, and the CancelKeyPress handler is the way to detect Ctrl-C.

how about i leave it in the advanced producer example and keep it out of the simple producer example - keep that as dead simple as possible - i want the first example people look at to be inviting and not scary in any way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp AppDomain is changed a lot or doesn't exist in .net core. http://www.michael-whelan.net/replacing-appdomain-in-dotnet-core/

The new assembly unload event mentioned in the above article is not effective at catching Ctrl-C. Examples I see around the web use Console.CancelKeyPress. I'm not certain there is not a better way, but it seems likely CancelKeyPress is good.

Another reason not to include this: there are higher priorities than figuring this out.

// TODO: specify serializers via config file as well? In which case, default keySerializer and valueSerizlizer params to null.

if (KeySerializer == null)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really require a KeySerializer? Can't we allow this to be null instead of having the phony NullSerializer thingie?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added that option of using null rather than NullSerializer. Can remove NullSerializer if that is the consensus. See other comment.

/// <paramref name="val" /> cannot be null.
/// TODO: well it shouldn't be other there is ambiguity on deserialization. check this.
/// </remarks>
public class StringSerializer : ISerializer<string>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to assume that people will know this means UTF-8? Maybe being explicit about it is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm on the fence on this. removed.


namespace Confluent.Kafka.Serialization
{
public class IntSerializer : ISerializer<int>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document what serialization this is in practise.
Big endian? varint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depends on architecture. added remark (will do propper docs later).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huhm, you sure? A serializer shouldn't depend on the architecture, that's what makes the serialized format portable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. maybe i need to go home for a nap.

}

public Task<DeliveryReport> Produce(byte[] val, int valLength, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true)
public Task<DeliveryReport> Produce(byte[] val, int valOffset, int valLength, byte[] key = null, int keyOffset = 0, int keyLength = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This valOffset API is funky, aren't there slices or similar in .NET?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this for now. will get rid of it when I refactor out the topic method.

{
throw RdKafkaException.FromErr(LibRdKafka.last_error(), "Could not produce message");
}
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use proper error string (rd_kafka_err2str(..last_error))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll put a todo for this, have a separate JIRA for sorting out exceptions / errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The risk of doing it outside the review process is that we might loose track of requested changes and that means we'll eventually end up having to re-review the entire code base.
If things are fixed in a followup commit in the same PR it is much easier to track.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this class is going to be completely removed, so this is going to need to be re-reviewed at some point anyway - I'm in the middle of a big refactor - I see a lot of value in getting things broadly in place first then focussing on the detail. I see capturing things as todo's which get coppied around pretty efficient (keeping in mind my first point in this comment). Also, it's the best way for me to get a holistic view of the whole project, which I believe reduces risk in making bad design decisions. This is particularly important for me as i'm new to clients so can foresee less than I otherwise might.


public void Flush()
{
while (OutQueueLength > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is actually a rd_kafka_flush() call that should beused.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I checked and this wasn't exposed in the LibRdKafka layer yet whereas the OutQueueLength stuff was. Agreed that we should use the correct internal version though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I was just doing whatever ah- was here. will change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll put a todo. I want to prioritize getting the high level API right, and i'm going to be reviewing / addressing a lot more lower level stuff in future PRs

{
producer.Dispose();
}
public bool FlushOnDispose => producer.FlushOnDispose;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should really try to keep everything as config dict properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very application code specific to me (users of an application shouldn't ever want to set it) and setting it in the config unnatural, so I think best left as a property. @ewencp ?

Copy link
Contributor

@edenhill edenhill Nov 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is what we do in librdkafka, Python and Go.
Since it is up to the application to allow users to set configuration properties it can also block these ones if it so desires (we could add a helper that assists in this: rd_kafka_conf_property_is_probably_not_for_the_user(str) bool).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's more difficult to use and not at all idimoatic the way you suggest.
And it should never be exposed outside the app (why should a user even know what Dispose is - it's a c-sharp thing - let alone how and when the app uses it).

public class IntDeserializer : IDeserializer<int>
{
/// <remark>
/// Endianness depends on architecture
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serializer must be stable and not dependent on arch.
If we have a producer on little endian and a consumer on big endian they need to be compatible using the same Serializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right you are.

// (null, int) serialization. When you do not wish to write any data to a key
// or value, the Null type should be used.
var sProducer2 = producer.Wrap<Null, int>(null, new IntSerializer());
var sProducer2 = producer.Wrap<Null, int>(null, new IntSerializer(Endianness.LittleEndian));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a new enum Edianness. Alternatively could use a bool here. This is more self-descriptive, but if the value is determined at runtime, could be a bit more annoying to use. opinions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the IntSerializer is only aimed at being compatible with itself then it must not have a configuration option to specify endian-ness, but instead be hardcoded to little or big endian.
Otoh if we think this Serializer will need to be compatible with IntSerializers in other languages we shall investigate if there is any prior art and if so adhere to that endian ness.
E.g., Avro uses little endian, Kafka uses big endian.

I dont really see the point of having the endian configurable, that'll create more problem than it solves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. let's just make it network order.

// TODO: Add timout parameter (with default option == block indefinitely) when use rd_kafka_flush.
public void Flush()
{
// TODO: use rd_kafka_flush here instead..
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would've been less code calling rd_kafka_flush() than adding these comments ;)

public int Deserialize(byte[] data)
{
return BitConverter.ToInt32(data, 0);
// network byte order -> big endian -> most significant byte in the smallest address.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be a system lib function for this (which also avoids doing anything on big-endian systems)

http://stackoverflow.com/questions/2420227/ntohs-and-ntohl-equivalent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What i'm wondering is if calling that then BitConverter will be faster or slower than what I've got, also considering I expect most people won't be running on big endian systems (i have no idea).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something irks me about using this function that takes an int and returns and int, because what it's returning semantically isn't actually an int ... I'm also not convinced it's going to be faster in the the arithmetic expression I've got (i think it's fine to optimize for little endian systems).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using System.Net.IPAddress.HostToNetworkOrder() also brings in a dependency on System.Net which is not currently required. I don't have a clear idea of what this means. It should always be on the host system because it's part of the platform, but it might mean a larger memory footprint (dll loaded when it otherwise wouldn't have been).

return BitConverter.ToInt32(data, 0);
// network byte order -> big endian -> most significant byte in the smallest address.
return
(((int)data[0]) << 24) +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this is for a little-endian host, I think this might be wrong, it should be other way around:

return   (data[3] << 24) | (data[2] << 16) | (data[1] << 8) | (data[0]);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Endianness of the host doesn't matter with << and >> operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I have it the right way around.
Good point about the | operator though, that'll be quicker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, you are right, sorry.

Same deal with Go where the DR channel gets one of these:
http://docs.confluent.io/3.1.0/clients/confluent-kafka-go/index.html#Message Is this being
kept more minimal intentionally?
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on Slack, the message metadata contains an ever growing number of fields, so passing a rich Message object to the delivery report , like the other clients, is most likely the best way forward.
And that Message object should be the same as returned by consumer.poll()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. however if a very common use case is to just produce a key and value (I think it is), then it's worth having a Producer.ProducerAsync overload for this as well to make the interface easier to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to make assumptions on what information the dr callback needs based on the produce() arguments. We should provide whatever librdkafka provides in the dr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants