From c57e904bae326d5215ba76a6790f9330c9fb1730 Mon Sep 17 00:00:00 2001 From: Matt Howlett Date: Wed, 16 Nov 2016 13:07:44 -0800 Subject: [PATCH 1/5] Producer Refactor Mk II --- examples/AdvancedProducer/Program.cs | 12 +- examples/Benchmark/Program.cs | 120 ++++++++----- examples/Misc/Program.cs | 6 +- examples/SimpleProducer/Program.cs | 8 +- examples/Wrapped/Program.cs | 48 +++++ examples/Wrapped/project.json | 25 +++ src/Confluent.Kafka/DeliveryReport.cs | 8 + src/Confluent.Kafka/ISerializingProducer.cs | 21 +++ src/Confluent.Kafka/Impl/LibRdKafka.cs | 11 +- src/Confluent.Kafka/Impl/SafeTopicHandle.cs | 51 +++++- src/Confluent.Kafka/Null.cs | 2 + src/Confluent.Kafka/Producer.cs | 166 +++++++++++++----- .../Serialization/ByteArraySerializer.cs | 21 --- .../Serialization/IntDeserializer.cs | 13 ++ .../Serialization/IntSerializer.cs | 12 ++ .../Serialization/NullDeserializer.cs | 13 ++ .../Serialization/StringDeserializer.cs | 24 +++ .../Serialization/StringSerializer.cs | 37 ++++ .../Serialization/Utf8StringDeserializer.cs | 12 -- .../Serialization/Utf8StringSerializer.cs | 26 --- src/Confluent.Kafka/Topic.cs | 68 ++----- src/Confluent.Kafka/project.json | 1 + 22 files changed, 482 insertions(+), 223 deletions(-) create mode 100644 examples/Wrapped/Program.cs create mode 100644 examples/Wrapped/project.json create mode 100644 src/Confluent.Kafka/DeliveryReport.cs create mode 100644 src/Confluent.Kafka/ISerializingProducer.cs delete mode 100644 src/Confluent.Kafka/Serialization/ByteArraySerializer.cs create mode 100644 src/Confluent.Kafka/Serialization/IntDeserializer.cs create mode 100644 src/Confluent.Kafka/Serialization/IntSerializer.cs create mode 100644 src/Confluent.Kafka/Serialization/NullDeserializer.cs create mode 100644 src/Confluent.Kafka/Serialization/StringDeserializer.cs create mode 100644 src/Confluent.Kafka/Serialization/StringSerializer.cs delete mode 100644 src/Confluent.Kafka/Serialization/Utf8StringDeserializer.cs delete mode 100644 src/Confluent.Kafka/Serialization/Utf8StringSerializer.cs diff --git a/examples/AdvancedProducer/Program.cs b/examples/AdvancedProducer/Program.cs index bf303a29f..55da08584 100644 --- a/examples/AdvancedProducer/Program.cs +++ b/examples/AdvancedProducer/Program.cs @@ -36,14 +36,8 @@ public static void Main(string[] args) var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config, new StringSerializer(), new StringSerializer())) { - // TODO: work out why explicit cast is needed here. - // TODO: remove need to explicitly specify string serializers - assume Utf8StringSerializer in Producer as default. - // TODO: allow be be set only in constructor. make readonly. - producer.KeySerializer = (ISerializer)new Confluent.Kafka.Serialization.Utf8StringSerializer(); - producer.ValueSerializer = producer.KeySerializer; - Console.WriteLine("\n-----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); Console.WriteLine("-----------------------------------------------------------------------"); @@ -82,10 +76,10 @@ public static void Main(string[] args) if (index != -1) { key = text.Substring(0, index); - val = text.Substring(index); + val = text.Substring(index + 1); } - Task deliveryReport = producer.Produce(topicName, key, val); + Task deliveryReport = producer.ProduceAsync(topicName, key, val); var result = deliveryReport.Result; // synchronously waits for message to be produced. Console.WriteLine($"Partition: {result.Partition}, Offset: {result.Offset}"); } diff --git a/examples/Benchmark/Program.cs b/examples/Benchmark/Program.cs index 7e4dbf6cc..19bd89f99 100644 --- a/examples/Benchmark/Program.cs +++ b/examples/Benchmark/Program.cs @@ -1,50 +1,105 @@ using System; +using System.Linq; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -using Confluent.Kafka.Serialization; namespace Confluent.Kafka.Benchmark { public class Program { - public class DeliveryHandler : IDeliveryHandler + + public class BenchmarkProducer { - public void SetException(Exception exception) + class DeliveryHandler : IDeliveryHandler { - throw exception; - } + private static long startTime; + private static int messageCount; + private static AutoResetEvent autoEvent; - public void SetResult(DeliveryReport deliveryReport) - { - } - } + public static void Init(int numberOfMessagesToProduce) + { + startTime = 0; + messageCount = numberOfMessagesToProduce; + autoEvent = new AutoResetEvent(false); + } - public static void Produce(string broker, string topicName, long numMessages) - { - var deliveryHandler = new DeliveryHandler(); + public static void WaitForAllDeliveryReports() + { + autoEvent.WaitOne(); + } - var config = new Dictionary { { "bootstrap.servers", broker } }; + public static long Duration { get; private set; } - using (var producer = new Producer(config)) - { - // TODO: remove need to explicitly specify this serializer. - producer.ValueSerializer = (ISerializer)new ByteArraySerializer(); + public void SetException(Exception exception) + { + throw exception; + } - Console.WriteLine($"{producer.Name} producing on {topicName}"); - // TODO: think more about exactly what we want to benchmark. - var payload = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; - for (int i = 0; i < numMessages; i++) + public void SetResult(DeliveryReport deliveryReport) { - producer.ProduceWithDeliveryReport(topicName, payload, deliveryHandler); + messageCount -= 1; + + if (startTime == 0) + { + startTime = DateTime.Now.Ticks; + } + + if (messageCount == 0) + { + Duration = DateTime.Now.Ticks - startTime; + autoEvent.Set(); + } } + } - Console.WriteLine("Shutting down"); + public static void Run(string broker, string topicName, int numberOfMessagesToProduce, int numberOfTests) + { + // mirrors the librdkafka performance test example. + var config = new Dictionary + { + { "bootstrap.servers", broker }, + { "queue.buffering.max.messages", 500000 }, + { "message.send.max.retries", 3 }, + { "retry.backoff.ms", 500 }, + { "queued.min.messages", 1000000 }, + { "session.timeout.ms", 6000 } + }; + + var deliveryHandler = new DeliveryHandler(); + + using (var producer = new Producer(config)) + { + for (var j=0; j ++cnt).ToArray(); + var key = new byte[0]; + + for (int i = 0; i < numberOfMessagesToProduce; i++) + { + producer.ProduceAsync(topicName, key, val, deliveryHandler); + } + + DeliveryHandler.WaitForAllDeliveryReports(); + + Console.WriteLine($"Produced {numberOfMessagesToProduce} in {DeliveryHandler.Duration/10000.0:F0}ms"); + Console.WriteLine($"{numberOfMessagesToProduce / (DeliveryHandler.Duration/10000.0):F0} messages/ms"); + } + + Console.WriteLine("Disposing producer"); + } } } + + // TODO: Update Consumer benchmark for new Consumer when it's written. public static async Task Consume(string broker, string topic) { long n = 0; @@ -86,24 +141,7 @@ public static void Main(string[] args) string brokerList = args[0]; string topic = args[1]; - long numMessages = 1000000; - - var stopwatch = new Stopwatch(); - - // TODO: we really want time from first ack. as it is, this includes producer startup time. - stopwatch.Start(); - Produce(brokerList, topic, numMessages); - stopwatch.Stop(); - - Console.WriteLine($"Sent {numMessages} messages in {stopwatch.Elapsed}"); - Console.WriteLine($"{numMessages / stopwatch.Elapsed.TotalSeconds:F0} messages/second"); - - stopwatch.Restart(); - long n = Consume(brokerList, topic).Result; - stopwatch.Stop(); - - Console.WriteLine($"Received {n} messages in {stopwatch.Elapsed}"); - Console.WriteLine($"{n / stopwatch.Elapsed.TotalSeconds:F0} messages/second"); + BenchmarkProducer.Run(brokerList, topic, 5000000, 4); } } } diff --git a/examples/Misc/Program.cs b/examples/Misc/Program.cs index 9f86612d8..ca0ac7049 100644 --- a/examples/Misc/Program.cs +++ b/examples/Misc/Program.cs @@ -13,7 +13,7 @@ static async Task ListGroups(string brokerList) { var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config)) { var groups = await producer.ListGroups(TimeSpan.FromSeconds(10)); Console.WriteLine($"Consumer Groups:"); @@ -27,9 +27,7 @@ static async Task ListGroups(string brokerList) { Console.WriteLine($" {m.MemberId} {m.ClientId} {m.ClientHost}"); Console.WriteLine($" Metadata: {m.MemberMetadata.Length} bytes"); - //Console.WriteLine(System.Text.Encoding.UTF8.GetString(m.MemberMetadata)); Console.WriteLine($" Assignment: {m.MemberAssignment.Length} bytes"); - //Console.WriteLine(System.Text.Encoding.UTF8.GetString(m.MemberAssignment)); } } } @@ -38,7 +36,7 @@ static async Task ListGroups(string brokerList) static async Task PrintMetadata(string brokerList) { var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config)) { var meta = await producer.Metadata(); Console.WriteLine($"{meta.OriginatingBrokerId} {meta.OriginatingBrokerName}"); diff --git a/examples/SimpleProducer/Program.cs b/examples/SimpleProducer/Program.cs index 9aa122554..622b3da1b 100644 --- a/examples/SimpleProducer/Program.cs +++ b/examples/SimpleProducer/Program.cs @@ -14,18 +14,14 @@ public static void Main(string[] args) var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config, new NullSerializer(), new StringSerializer())) { - // TODO: figure out why the cast below is necessary and how to avoid it. - // TODO: There should be no need to specify a serializer for common types like string - I think it should default to the UTF8 serializer. - producer.ValueSerializer = (ISerializer)new Confluent.Kafka.Serialization.Utf8StringSerializer(); - Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit."); string text; while ((text = Console.ReadLine()) != "q") { - Task deliveryReport = producer.Produce(topicName, text); + Task deliveryReport = producer.ProduceAsync(topicName, text); var unused = deliveryReport.ContinueWith(task => { Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); diff --git a/examples/Wrapped/Program.cs b/examples/Wrapped/Program.cs new file mode 100644 index 000000000..a0f050a07 --- /dev/null +++ b/examples/Wrapped/Program.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Confluent.Kafka.Serialization; + +namespace Confluent.Kafka.Wrapped +{ + /// + /// An example showing how to wrap a single Producer to produce messages using + /// different serializers. + /// + /// + /// If you only want to use a single pair of serializers in your application, + /// you should use the Producer<TKey, TValue> constructor instead. + /// + public class Program + { + public static void Main(string[] args) + { + var config = new Dictionary { { "bootstrap.servers", args[0] } }; + + using (var producer = new Producer(config)) + { + // sProducer1 is a lightweight wrapper around a Producer instance that adds + // (string, string) serialization. + // Note that sProducer1 does not need to be (and cannot be) disposed. + var sProducer1 = producer.Wrap(new StringSerializer(), new StringSerializer()); + + // sProducer2 is another lightweight wrapper around kafkaProducer that adds + // (null, int) serialization. + // When you do not wish to write any data to a key or value, the Null type should be + // used. + var sProducer2 = producer.Wrap(new NullSerializer(), new IntSerializer()); + + // write (string, string) data to topic "first-topic", statically type checked. + sProducer1.ProduceAsync("first-topic", "my-key-value", "my-value"); + + // write (null, int) data to topic "second-data". statically type checked, using + // the same underlying producer as the producer1. + sProducer2.ProduceAsync("second-topic", 42); + + // producers are NOT tied to topics. Although it's unusual that you might want to + // do so, you can use different serializing producers to write to the same topic. + sProducer2.ProduceAsync("first-topic", 107); + } + } + } +} diff --git a/examples/Wrapped/project.json b/examples/Wrapped/project.json new file mode 100644 index 000000000..08578640a --- /dev/null +++ b/examples/Wrapped/project.json @@ -0,0 +1,25 @@ +{ + "version": "1.0.0", + "authors": ["Confluent Inc"], + + "buildOptions": { + "emitEntryPoint": true + }, + + "dependencies": { + "Confluent.Kafka": { + "target": "project" + } + }, + + "frameworks": { + "netcoreapp1.0": { + "dependencies": { + "Microsoft.NETCore.App": { + "type": "platform", + "version": "1.0.0" + } + } + } + } +} diff --git a/src/Confluent.Kafka/DeliveryReport.cs b/src/Confluent.Kafka/DeliveryReport.cs new file mode 100644 index 000000000..d0e7af11e --- /dev/null +++ b/src/Confluent.Kafka/DeliveryReport.cs @@ -0,0 +1,8 @@ +namespace Confluent.Kafka +{ + public struct DeliveryReport + { + public int Partition; + public long Offset; + } +} diff --git a/src/Confluent.Kafka/ISerializingProducer.cs b/src/Confluent.Kafka/ISerializingProducer.cs new file mode 100644 index 000000000..883f13abd --- /dev/null +++ b/src/Confluent.Kafka/ISerializingProducer.cs @@ -0,0 +1,21 @@ +using System.Threading.Tasks; +using Confluent.Kafka.Serialization; + + +namespace Confluent.Kafka +{ + public interface ISerializingProducer + { + string Name { get; } + + ISerializer KeySerializer { get; } + + ISerializer ValueSerializer { get; } + + Task ProduceAsync(string topic, TValue val, int? partition, bool blockIfQueueFull); + + Task ProduceAsync(string topic, TKey key, TValue val, int? partition, bool blockIfQueueFull); + + void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler deliveryHandler, int? partition, bool blockIfQueueFull); + } +} diff --git a/src/Confluent.Kafka/Impl/LibRdKafka.cs b/src/Confluent.Kafka/Impl/LibRdKafka.cs index fd683c7a4..4ab495957 100644 --- a/src/Confluent.Kafka/Impl/LibRdKafka.cs +++ b/src/Confluent.Kafka/Impl/LibRdKafka.cs @@ -341,14 +341,14 @@ internal static ErrorCode committed(IntPtr rk, IntPtr partitions, IntPtr timeout internal static ErrorCode position(IntPtr rk, IntPtr partitions) => _position(rk, partitions); - private static Func _produce; internal static IntPtr produce( IntPtr rkt, int partition, IntPtr msgflags, - byte[] val, UIntPtr len, - byte[] key, UIntPtr keylen, + IntPtr val, UIntPtr len, + IntPtr key, UIntPtr keylen, IntPtr msg_opaque) => _produce(rkt, partition, msgflags, val, len, key, keylen, msg_opaque); @@ -607,8 +607,8 @@ internal static extern IntPtr rd_kafka_produce( IntPtr rkt, int partition, IntPtr msgflags, - byte[] val, UIntPtr len, - byte[] key, UIntPtr keylen, + IntPtr val, UIntPtr len, + IntPtr key, UIntPtr keylen, IntPtr msg_opaque); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -644,5 +644,6 @@ internal static extern IntPtr rd_kafka_brokers_add(IntPtr rk, [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_wait_destroyed(IntPtr timeout_ms); } + } } diff --git a/src/Confluent.Kafka/Impl/SafeTopicHandle.cs b/src/Confluent.Kafka/Impl/SafeTopicHandle.cs index fc2227046..5e608216d 100644 --- a/src/Confluent.Kafka/Impl/SafeTopicHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeTopicHandle.cs @@ -28,14 +28,49 @@ protected override bool ReleaseHandle() internal string GetName() => Marshal.PtrToStringAnsi(LibRdKafka.topic_name(handle)); - internal long Produce(byte[] val, int valLength, byte[] key, int keyCount, int partition, IntPtr opaque, bool blockIfQueueFull) - => (long) LibRdKafka.produce( - handle, - partition, - (IntPtr) (MsgFlags.MSG_F_COPY | (blockIfQueueFull ? MsgFlags.MSG_F_BLOCK : 0)), - val, (UIntPtr) valLength, - key, (UIntPtr) keyCount, - opaque); + internal long Produce(byte[] val, int valOffset, int valLength, byte[] key, int keyOffset, int keyLength, int partition, IntPtr opaque, bool blockIfQueueFull) + { + var pValue = IntPtr.Zero; + var pKey = IntPtr.Zero; + + var gchValue = default(GCHandle); + var gchKey = default(GCHandle); + + if (val != null) + { + gchValue = GCHandle.Alloc(val, GCHandleType.Pinned); + pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset); + } + + if (key != null) + { + gchKey = GCHandle.Alloc(key, GCHandleType.Pinned); + pKey = Marshal.UnsafeAddrOfPinnedArrayElement(key, keyOffset); + } + + try + { + return (long) LibRdKafka.produce( + handle, + partition, + (IntPtr) (MsgFlags.MSG_F_COPY | (blockIfQueueFull ? MsgFlags.MSG_F_BLOCK : 0)), + pValue, (UIntPtr) valLength, + pKey, (UIntPtr) keyLength, + opaque); + } + finally + { + if (val != null) + { + gchValue.Free(); + } + + if (key != null) + { + gchKey.Free(); + } + } + } internal bool PartitionAvailable(int partition) => LibRdKafka.topic_partition_available(handle, partition); } diff --git a/src/Confluent.Kafka/Null.cs b/src/Confluent.Kafka/Null.cs index 8b5a71a3b..28341e04c 100644 --- a/src/Confluent.Kafka/Null.cs +++ b/src/Confluent.Kafka/Null.cs @@ -2,6 +2,8 @@ namespace Confluent.Kafka { public sealed class Null { + public static Null Instance = new Null(); + private Null() {} } } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index a83180c5d..920055537 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -9,34 +9,26 @@ namespace Confluent.Kafka { - /// - /// High-level, asynchronous message producer. - /// - public class Producer : Handle + public class Producer : Handle { - // TODO: allow be be set only in constructor. make readonly. - // TODO: These should arguably be left as properties here. - // TODO: And/or these could be in config + use reflection to find the class. - public ISerializer KeySerializer { get; set; } - - // TODO: allow be be set only in constructor. make readonly. - public ISerializer ValueSerializer { get; set; } + private const int RD_KAFKA_PARTITION_UA = -1; private IEnumerable> topicConfig; + // TODO: get rid of the Topic class. private SafeDictionary topics = new SafeDictionary(); public Producer(IEnumerable> config) { this.topicConfig = (IEnumerable>)config.FirstOrDefault(prop => prop.Key == "default.topic.config").Value; - var rdKafkaConfig = new Config(config.Where(prop => prop.Key != "default.topic.config")); - // TODO: If serializers aren't specified in config, then we could use defaults associated with TKey, TValue if - // we have matching Confluent.Kafka.Serialization serializers available. + // setup the rdkafka config object, including pointer to the delivery report callback. + var rdKafkaConfig = new Config(config.Where(prop => prop.Key != "default.topic.config")); + IntPtr rdKafkaConfigPtr = rdKafkaConfig.handle.Dup(); + // TODO: we might want to defer setting the report callback until we know we actually need it. + LibRdKafka.conf_set_dr_msg_cb(rdKafkaConfigPtr, DeliveryReportCallback); - IntPtr cfgPtr = rdKafkaConfig.handle.Dup(); - LibRdKafka.conf_set_dr_msg_cb(cfgPtr, DeliveryReportDelegate); - Init(RdKafkaType.Producer, cfgPtr, rdKafkaConfig.Logger); + Init(RdKafkaType.Producer, rdKafkaConfigPtr, rdKafkaConfig.Logger); } private Topic getKafkaTopic(string topic) @@ -52,28 +44,15 @@ private Topic getKafkaTopic(string topic) return kafkaTopic; } + private static readonly LibRdKafka.DeliveryReportCallback DeliveryReportCallback = DeliveryReportCallbackImpl; - // TODO: Support the other function overloads in Topic. - // TODO: I'd like a way to produce as (byte[], offset, length) as well if possible all the way down to librdkafka (need to investigate). - // TODO: should we append the Produce methods with Async? Seems to be a convention. - - public Task Produce(string topic, TValue val) - => getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val)); - - public Task Produce(string topic, TKey key, TValue val) - => getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val), KeySerializer.Serialize(key)); - - // TODO: do we need both the callback way of doing this and the Task way? - // i think this was added late to rdkafka-dotnet, so there is probably a need. - // What about Task.ContinueWith? I belive this can even handle exceptions? - public void ProduceWithDeliveryReport(string topic, TValue val, IDeliveryHandler deliveryHandler) - => getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val), deliveryHandler); - - // Explicitly keep reference to delegate so it stays alive - private static readonly LibRdKafka.DeliveryReportCallback DeliveryReportDelegate = DeliveryReportCallback; - - private static void DeliveryReportCallback(IntPtr rk, ref rd_kafka_message rkmessage, IntPtr opaque) + private static void DeliveryReportCallbackImpl(IntPtr rk, ref rd_kafka_message rkmessage, IntPtr opaque) { + if (rkmessage._private == IntPtr.Zero) + { + return; + } + // msg_opaque was set by Topic.Produce var gch = GCHandle.FromIntPtr(rkmessage._private); var deliveryHandler = (IDeliveryHandler) gch.Target; @@ -88,10 +67,115 @@ private static void DeliveryReportCallback(IntPtr rk, ref rd_kafka_message rkmes return; } - deliveryHandler.SetResult(new DeliveryReport { - Offset = rkmessage.offset, - Partition = rkmessage.partition - }); + deliveryHandler.SetResult( + new DeliveryReport + { + Offset = rkmessage.offset, + Partition = rkmessage.partition + } + ); + } + + public ISerializingProducer Wrap(ISerializer keySerializer, ISerializer valueSerializer) + { + return new SerializingProducer(this, keySerializer, valueSerializer); + } + + public Task ProduceAsync(string topic, byte[] val, int? partition = null, bool blockIfQueueFull = true) + => getKafkaTopic(topic).Produce(val, 0, val.Length, null, 0, 0, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + + public Task ProduceAsync(string topic, byte[] key, byte[] val, int? partition = null, bool blockIfQueueFull = true) + => getKafkaTopic(topic).Produce(val, 0, val.Length, key, 0, key.Length, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + + public Task ProduceAsync(string topic, ArraySegment key, ArraySegment val, int? partition = null, bool blockIfQueueFull = true) + => getKafkaTopic(topic).Produce(val.Array, val.Offset, val.Count, key.Array, key.Offset, key.Count, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + + // There are (or have been) 3 different patterns for doing async requests in .NET (two of them depreciated). + // https://msdn.microsoft.com/en-us/library/jj152938(v=vs.110).aspx + // None of them are to use a callback (as below), but I believe this makes the most sense in the current situation. + // A reason we might want these methods is the callbacks all happen on the same thread, whereas the Task completion ones don't. + // This is used by the benchmark example to avoid locking the counter (maybe this is a common thing we want to do?) + public void ProduceAsync(string topic, byte[] key, byte[] val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) + => getKafkaTopic(topic).Produce(val, 0, val.Length, deliveryHandler, key, 0, key.Length, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + + public void ProduceAsync(string topic, ArraySegment key, ArraySegment val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) + => getKafkaTopic(topic).Produce(val.Array, val.Offset, val.Count, deliveryHandler, key.Array, key.Offset, key.Count, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + } + + + internal class SerializingProducer : ISerializingProducer + { + protected Producer producer; + + public ISerializer KeySerializer { get; } + + public ISerializer ValueSerializer { get; } + + public SerializingProducer(Producer producer, ISerializer keySerializer, ISerializer valueSerializer) + { + this.producer = producer; + KeySerializer = keySerializer; + ValueSerializer = valueSerializer; + + // TODO: specify serializers via config file as well? In which case, default keySerializer and valueSerizlizer params to null. + + if (KeySerializer == null) + { + throw new ArgumentNullException("Key serializer must be specified."); + } + + if (ValueSerializer == null) + { + throw new ArgumentNullException("Value serializer must be specified."); + } + } + + public Task ProduceAsync(string topic, TValue val, int? partition = null, bool blockIfQueueFull = true) + => producer.ProduceAsync(topic, ValueSerializer.Serialize(val), partition, blockIfQueueFull); + + public Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true) + => producer.ProduceAsync(topic, KeySerializer.Serialize(key), ValueSerializer.Serialize(val), partition, blockIfQueueFull); + + public void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) + => producer.ProduceAsync(topic, KeySerializer.Serialize(key), ValueSerializer.Serialize(val), deliveryHandler, partition, blockIfQueueFull); + + public string Name => producer.Name; + } + + + public class Producer : ISerializingProducer, IDisposable + { + private Producer producer; + private ISerializingProducer serializingProducer; + + + public ISerializer KeySerializer => serializingProducer.KeySerializer; + + public ISerializer ValueSerializer => serializingProducer.ValueSerializer; + + public string Name => serializingProducer.Name; + + public Task ProduceAsync(string topic, TValue val, int? partition = null, bool blockIfQueueFull = true) + => serializingProducer.ProduceAsync(topic, val, partition, blockIfQueueFull); + + public Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true) + => serializingProducer.ProduceAsync(topic, key, val, partition, blockIfQueueFull); + + public void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) + => serializingProducer.ProduceAsync(topic, key, val, deliveryHandler, partition, blockIfQueueFull); + + public Producer( + IEnumerable> config, + ISerializer keySerializer, + ISerializer valueSerializer) + { + producer = new Producer(config); + serializingProducer = producer.Wrap(keySerializer, valueSerializer); + } + + public void Dispose() + { + producer.Dispose(); } } } diff --git a/src/Confluent.Kafka/Serialization/ByteArraySerializer.cs b/src/Confluent.Kafka/Serialization/ByteArraySerializer.cs deleted file mode 100644 index 3fb1d23b6..000000000 --- a/src/Confluent.Kafka/Serialization/ByteArraySerializer.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System.Text; - -namespace Confluent.Kafka.Serialization -{ - /// - /// A byte[] pass-through serializer. - /// - /// - /// The byte[] to pass through - /// - /// - /// A reference to . - /// - public class ByteArraySerializer : ISerializer - { - public byte[] Serialize(byte[] val) - { - return val; - } - } -} diff --git a/src/Confluent.Kafka/Serialization/IntDeserializer.cs b/src/Confluent.Kafka/Serialization/IntDeserializer.cs new file mode 100644 index 000000000..5ba2e9266 --- /dev/null +++ b/src/Confluent.Kafka/Serialization/IntDeserializer.cs @@ -0,0 +1,13 @@ +using System; + + +namespace Confluent.Kafka.Serialization +{ + public class IntDeserializer : IDeserializer + { + public int Deserialize(byte[] data) + { + return BitConverter.ToInt32(data, 0); + } + } +} diff --git a/src/Confluent.Kafka/Serialization/IntSerializer.cs b/src/Confluent.Kafka/Serialization/IntSerializer.cs new file mode 100644 index 000000000..cf599de15 --- /dev/null +++ b/src/Confluent.Kafka/Serialization/IntSerializer.cs @@ -0,0 +1,12 @@ +using System; + +namespace Confluent.Kafka.Serialization +{ + public class IntSerializer : ISerializer + { + public byte[] Serialize(int val) + { + return BitConverter.GetBytes(val); + } + } +} diff --git a/src/Confluent.Kafka/Serialization/NullDeserializer.cs b/src/Confluent.Kafka/Serialization/NullDeserializer.cs new file mode 100644 index 000000000..945c50c75 --- /dev/null +++ b/src/Confluent.Kafka/Serialization/NullDeserializer.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Confluent.Kafka.Serialization; + +namespace Confluent.Kafka.Serialization +{ + public class NullDeserializer : IDeserializer + { + public Null Deserialize(byte[] data) + { + return Null.Instance; + } + } +} diff --git a/src/Confluent.Kafka/Serialization/StringDeserializer.cs b/src/Confluent.Kafka/Serialization/StringDeserializer.cs new file mode 100644 index 000000000..a5bd1a695 --- /dev/null +++ b/src/Confluent.Kafka/Serialization/StringDeserializer.cs @@ -0,0 +1,24 @@ +using System.Text; + +namespace Confluent.Kafka.Serialization +{ + public class StringDeserializer : IDeserializer + { + Encoding encoding; + + StringDeserializer() + { + encoding = Encoding.UTF8; + } + + StringDeserializer(Encoding encoding) + { + this.encoding = encoding; + } + + public string Deserialize(byte[] data) + { + return Encoding.UTF8.GetString(data); + } + } +} diff --git a/src/Confluent.Kafka/Serialization/StringSerializer.cs b/src/Confluent.Kafka/Serialization/StringSerializer.cs new file mode 100644 index 000000000..d7b9a27f4 --- /dev/null +++ b/src/Confluent.Kafka/Serialization/StringSerializer.cs @@ -0,0 +1,37 @@ +using System.Text; + +namespace Confluent.Kafka.Serialization +{ + /// + /// String serializer. + /// + /// + /// The string value to serialize. + /// + /// + /// encoded in a byte array. + /// + /// + /// cannot be null. + /// TODO: well it shouldn't be other there is ambiguity on deserialization. check this. + /// + public class StringSerializer : ISerializer + { + private Encoding encoding; + + public StringSerializer() + { + encoding = Encoding.UTF8; + } + + public StringSerializer(Encoding encoding) + { + this.encoding = encoding; + } + + public byte[] Serialize(string val) + { + return encoding.GetBytes(val); + } + } +} diff --git a/src/Confluent.Kafka/Serialization/Utf8StringDeserializer.cs b/src/Confluent.Kafka/Serialization/Utf8StringDeserializer.cs deleted file mode 100644 index 1104dcfa9..000000000 --- a/src/Confluent.Kafka/Serialization/Utf8StringDeserializer.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Text; - -namespace Confluent.Kafka.Serialization -{ - public class Utf8StringDeserializer : IDeserializer - { - public string Deserialize(byte[] data) - { - return Encoding.UTF8.GetString(data); - } - } -} diff --git a/src/Confluent.Kafka/Serialization/Utf8StringSerializer.cs b/src/Confluent.Kafka/Serialization/Utf8StringSerializer.cs deleted file mode 100644 index 969736e40..000000000 --- a/src/Confluent.Kafka/Serialization/Utf8StringSerializer.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System.Text; - -namespace Confluent.Kafka.Serialization -{ - // TODO: Perhaps have a general string serializer + encoding as a type parameter - - /// - /// A UTF-8 encoding string serializer. - /// - /// - /// The string value to serialize. - /// - /// - /// encoded in a UTF-8 byte array. - /// - /// - /// cannot be null. - /// - public class Utf8StringSerializer : ISerializer - { - public byte[] Serialize(string val) - { - return Encoding.UTF8.GetBytes(val); - } - } -} diff --git a/src/Confluent.Kafka/Topic.cs b/src/Confluent.Kafka/Topic.cs index 76cd39785..4f8ebdfe4 100644 --- a/src/Confluent.Kafka/Topic.cs +++ b/src/Confluent.Kafka/Topic.cs @@ -6,12 +6,6 @@ namespace Confluent.Kafka { - public struct DeliveryReport - { - public int Partition; - public long Offset; - } - // TODO: We want to get rid of this class entirely. /// @@ -62,48 +56,15 @@ public void Dispose() public string Name => handle.GetName(); - public Task Produce(byte[] val, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) - { - return Produce(val, val?.Length ?? 0, key, key?.Length ?? 0, partition, blockIfQueueFull); - } - - public Task Produce(byte[] val, int valLength, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) + public Task Produce(byte[] val, int valOffset, int valLength, byte[] key = null, int keyOffset = 0, int keyLength = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) { // Passes the TaskCompletionSource to the delivery report callback // via the msg_opaque pointer var deliveryCompletionSource = new TaskDeliveryHandler(); - Produce(val, valLength, key, keyCount, partition, deliveryCompletionSource, blockIfQueueFull); + Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, deliveryCompletionSource, blockIfQueueFull); return deliveryCompletionSource.Task; } - /// - /// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface. - /// - /// - /// Value to send to Kafka. Can be null. - /// - /// - /// IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs. - /// - /// - /// (Optional) The key associated with (or null if no key is specified). - /// - /// - /// (Optional) The topic partition to which will be sent (or -1 if no partition is specified). - /// - /// - /// Thrown if is null. - /// - /// - /// Methods of will be executed in an RdKafka internal thread and will block other operations - /// - consider this when implementing IDeliveryHandler. - /// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations. - /// - public void Produce(byte[] val, IDeliveryHandler deliveryHandler, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) - { - Produce(val, val?.Length ?? 0, deliveryHandler, key, key?.Length ?? 0, partition, blockIfQueueFull); - } - /// /// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface. /// @@ -115,11 +76,12 @@ public void Produce(byte[] val, IDeliveryHandler deliveryHandler, byte[] key = n /// /// /// IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs. + /// Can be null. /// /// /// (Optional) The key associated with (or null if no key is specified). /// - /// + /// /// Number of bytes to use from key buffer /// /// @@ -133,20 +95,25 @@ public void Produce(byte[] val, IDeliveryHandler deliveryHandler, byte[] key = n /// - consider this when implementing IDeliveryHandler. /// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations. /// - public void Produce(byte[] value, int valLength, IDeliveryHandler deliveryHandler, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) + public void Produce(byte[] val, int valOffset, int valLength, IDeliveryHandler deliveryHandler, byte[] key = null, int keyOffset = 0, int keyLength = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) { - if (deliveryHandler == null) - throw new ArgumentNullException(nameof(deliveryHandler)); - Produce(value, valLength, key, keyCount, partition, deliveryHandler, blockIfQueueFull); + Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, deliveryHandler, blockIfQueueFull); } - - private void Produce(byte[] val, int valLength, byte[] key, int keyCount, Int32 partition, object deliveryHandler, bool blockIfQueueFull) + private void Produce(byte[] val, int valOffset, int valLength, byte[] key, int keyOffset, int keyLength, Int32 partition, object deliveryHandler, bool blockIfQueueFull) { + if (deliveryHandler == null) + { + if (handle.Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, IntPtr.Zero, blockIfQueueFull) != 0) + { + throw RdKafkaException.FromErr(LibRdKafka.last_error(), "Could not produce message"); + } + return; + } + var gch = GCHandle.Alloc(deliveryHandler); var ptr = GCHandle.ToIntPtr(gch); - - if (handle.Produce(val, valLength, key, keyCount, partition, ptr, blockIfQueueFull) != 0) + if (handle.Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, ptr, blockIfQueueFull) != 0) { var err = LibRdKafka.last_error(); gch.Free(); @@ -162,5 +129,6 @@ private void Produce(byte[] val, int valLength, byte[] key, int keyCount, Int32 /// This function must only be called from inside a partitioner function. /// public bool PartitionAvailable(int partition) => handle.PartitionAvailable(partition); + } } diff --git a/src/Confluent.Kafka/project.json b/src/Confluent.Kafka/project.json index e44a12723..5a951b93b 100644 --- a/src/Confluent.Kafka/project.json +++ b/src/Confluent.Kafka/project.json @@ -18,6 +18,7 @@ "System.Console": "4.0.0", "System.Linq": "4.1.0", "System.Runtime.InteropServices": "4.1.0", + "System.Runtime.Extensions": "4.1.0", "System.Threading": "4.0.11" } } From 74cdb4338b00c88bc31d31feef802ec77d0da630 Mon Sep 17 00:00:00 2001 From: Matt Howlett Date: Tue, 22 Nov 2016 08:26:12 -0800 Subject: [PATCH 2/5] review changes --- examples/AdvancedProducer/Program.cs | 26 ++------ examples/Benchmark/Program.cs | 62 +++---------------- examples/SimpleProducer/Program.cs | 5 +- examples/Wrapped/Program.cs | 20 +++--- src/Confluent.Kafka/Handle.cs | 16 +++-- src/Confluent.Kafka/ISerializingProducer.cs | 6 +- src/Confluent.Kafka/Null.cs | 2 - src/Confluent.Kafka/Producer.cs | 56 +++++------------ .../Serialization/IntDeserializer.cs | 3 + .../Serialization/IntSerializer.cs | 3 + .../Serialization/NullDeserializer.cs | 5 +- .../Serialization/NullSerializer.cs | 2 +- .../Serialization/StringSerializer.cs | 5 -- src/Confluent.Kafka/Topic.cs | 60 +++--------------- 14 files changed, 66 insertions(+), 205 deletions(-) diff --git a/examples/AdvancedProducer/Program.cs b/examples/AdvancedProducer/Program.cs index 55da08584..c1fe67277 100644 --- a/examples/AdvancedProducer/Program.cs +++ b/examples/AdvancedProducer/Program.cs @@ -36,7 +36,7 @@ public static void Main(string[] args) var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config, new StringSerializer(), new StringSerializer())) + using (var producer = new Producer(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8))) { Console.WriteLine("\n-----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); @@ -45,29 +45,11 @@ public static void Main(string[] args) Console.WriteLine("> key value"); Console.WriteLine("To create a kafka message with empty key and UTF-8 encoded value:"); Console.WriteLine("> value"); - Console.WriteLine("Ctrl-C to quit.\n"); + Console.WriteLine("'q' to quit.\n"); - var cancelled = false; - Console.CancelKeyPress += (object sender, ConsoleCancelEventArgs e) => { - e.Cancel = true; // prevent the process from terminating. - cancelled = true; - }; - - while (!cancelled) + string text; + while ((text = Console.ReadLine()) != "q") { - Console.Write("> "); - - string text; - try - { - text = Console.ReadLine(); - } - catch - { - // IO exception is thrown when ConsoleCancelEventArgs.Cancel == true. - break; - } - var key = ""; var val = text; diff --git a/examples/Benchmark/Program.cs b/examples/Benchmark/Program.cs index 19bd89f99..73f4821a5 100644 --- a/examples/Benchmark/Program.cs +++ b/examples/Benchmark/Program.cs @@ -10,51 +10,8 @@ namespace Confluent.Kafka.Benchmark { public class Program { - public class BenchmarkProducer { - class DeliveryHandler : IDeliveryHandler - { - private static long startTime; - private static int messageCount; - private static AutoResetEvent autoEvent; - - public static void Init(int numberOfMessagesToProduce) - { - startTime = 0; - messageCount = numberOfMessagesToProduce; - autoEvent = new AutoResetEvent(false); - } - - public static void WaitForAllDeliveryReports() - { - autoEvent.WaitOne(); - } - - public static long Duration { get; private set; } - - public void SetException(Exception exception) - { - throw exception; - } - - public void SetResult(DeliveryReport deliveryReport) - { - messageCount -= 1; - - if (startTime == 0) - { - startTime = DateTime.Now.Ticks; - } - - if (messageCount == 0) - { - Duration = DateTime.Now.Ticks - startTime; - autoEvent.Set(); - } - } - } - public static void Run(string broker, string topicName, int numberOfMessagesToProduce, int numberOfTests) { // mirrors the librdkafka performance test example. @@ -64,33 +21,29 @@ public static void Run(string broker, string topicName, int numberOfMessagesToPr { "queue.buffering.max.messages", 500000 }, { "message.send.max.retries", 3 }, { "retry.backoff.ms", 500 }, - { "queued.min.messages", 1000000 }, { "session.timeout.ms", 6000 } }; - var deliveryHandler = new DeliveryHandler(); - using (var producer = new Producer(config)) { for (var j=0; j ++cnt).ToArray(); - var key = new byte[0]; + var startTime = DateTime.Now.Ticks; + var tasks = new Task[numberOfMessagesToProduce]; for (int i = 0; i < numberOfMessagesToProduce; i++) { - producer.ProduceAsync(topicName, key, val, deliveryHandler); + tasks[i] = producer.ProduceAsync(topicName, null, val); } + Task.WaitAll(tasks); + var duration = DateTime.Now.Ticks - startTime; - DeliveryHandler.WaitForAllDeliveryReports(); - - Console.WriteLine($"Produced {numberOfMessagesToProduce} in {DeliveryHandler.Duration/10000.0:F0}ms"); - Console.WriteLine($"{numberOfMessagesToProduce / (DeliveryHandler.Duration/10000.0):F0} messages/ms"); + Console.WriteLine($"Produced {numberOfMessagesToProduce} in {duration/10000.0:F0}ms"); + Console.WriteLine($"{numberOfMessagesToProduce / (duration/10000.0):F0} messages/ms"); } Console.WriteLine("Disposing producer"); @@ -98,7 +51,6 @@ public static void Run(string broker, string topicName, int numberOfMessagesToPr } } - // TODO: Update Consumer benchmark for new Consumer when it's written. public static async Task Consume(string broker, string topic) { diff --git a/examples/SimpleProducer/Program.cs b/examples/SimpleProducer/Program.cs index 622b3da1b..6108d4438 100644 --- a/examples/SimpleProducer/Program.cs +++ b/examples/SimpleProducer/Program.cs @@ -1,4 +1,5 @@ using System; +using System.Text; using System.Collections.Generic; using System.Threading.Tasks; using Confluent.Kafka.Serialization; @@ -14,14 +15,14 @@ public static void Main(string[] args) var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config, new NullSerializer(), new StringSerializer())) + using (var producer = new Producer(config, new NullSerializer(), new StringSerializer(Encoding.UTF8))) { Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit."); string text; while ((text = Console.ReadLine()) != "q") { - Task deliveryReport = producer.ProduceAsync(topicName, text); + Task deliveryReport = producer.ProduceAsync(topicName, null, text); var unused = deliveryReport.ContinueWith(task => { Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); diff --git a/examples/Wrapped/Program.cs b/examples/Wrapped/Program.cs index a0f050a07..45abd28e8 100644 --- a/examples/Wrapped/Program.cs +++ b/examples/Wrapped/Program.cs @@ -1,6 +1,5 @@ -using System; +using System.Text; using System.Collections.Generic; -using System.Threading.Tasks; using Confluent.Kafka.Serialization; namespace Confluent.Kafka.Wrapped @@ -22,26 +21,25 @@ public static void Main(string[] args) using (var producer = new Producer(config)) { // sProducer1 is a lightweight wrapper around a Producer instance that adds - // (string, string) serialization. - // Note that sProducer1 does not need to be (and cannot be) disposed. - var sProducer1 = producer.Wrap(new StringSerializer(), new StringSerializer()); + // (string, string) serialization. Note that sProducer1 does not need to be + // (and cannot be) disposed. + var sProducer1 = producer.Wrap(new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)); // sProducer2 is another lightweight wrapper around kafkaProducer that adds - // (null, int) serialization. - // When you do not wish to write any data to a key or value, the Null type should be - // used. - var sProducer2 = producer.Wrap(new NullSerializer(), new IntSerializer()); + // (null, int) serialization. When you do not wish to write any data to a key + // or value, the Null type should be used. + var sProducer2 = producer.Wrap(null, new IntSerializer()); // write (string, string) data to topic "first-topic", statically type checked. sProducer1.ProduceAsync("first-topic", "my-key-value", "my-value"); // write (null, int) data to topic "second-data". statically type checked, using // the same underlying producer as the producer1. - sProducer2.ProduceAsync("second-topic", 42); + sProducer2.ProduceAsync("second-topic", null, 42); // producers are NOT tied to topics. Although it's unusual that you might want to // do so, you can use different serializing producers to write to the same topic. - sProducer2.ProduceAsync("first-topic", 107); + sProducer2.ProduceAsync("first-topic", null, 107); } } } diff --git a/src/Confluent.Kafka/Handle.cs b/src/Confluent.Kafka/Handle.cs index e616274f6..700af7672 100644 --- a/src/Confluent.Kafka/Handle.cs +++ b/src/Confluent.Kafka/Handle.cs @@ -69,6 +69,16 @@ internal void Init(RdKafkaType type, IntPtr config, Config.LogCallback logger) callbackTask = StartCallbackTask(callbackCts.Token); } + public bool FlushOnDispose { get; set; } = true; + + public void Flush() + { + while (OutQueueLength > 0) + { + handle.Poll((IntPtr) 100); + } + } + public void Dispose() { Dispose(true); @@ -83,12 +93,10 @@ protected virtual void Dispose(bool disposing) if (disposing) { - // Wait until all outstanding sends have completed. - while (OutQueueLength > 0) + if (FlushOnDispose) { - handle.Poll((IntPtr) 100); + Flush(); } - handle.Dispose(); } } diff --git a/src/Confluent.Kafka/ISerializingProducer.cs b/src/Confluent.Kafka/ISerializingProducer.cs index 883f13abd..006d4975e 100644 --- a/src/Confluent.Kafka/ISerializingProducer.cs +++ b/src/Confluent.Kafka/ISerializingProducer.cs @@ -12,10 +12,6 @@ public interface ISerializingProducer ISerializer ValueSerializer { get; } - Task ProduceAsync(string topic, TValue val, int? partition, bool blockIfQueueFull); - - Task ProduceAsync(string topic, TKey key, TValue val, int? partition, bool blockIfQueueFull); - - void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler deliveryHandler, int? partition, bool blockIfQueueFull); + Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true); } } diff --git a/src/Confluent.Kafka/Null.cs b/src/Confluent.Kafka/Null.cs index 28341e04c..8b5a71a3b 100644 --- a/src/Confluent.Kafka/Null.cs +++ b/src/Confluent.Kafka/Null.cs @@ -2,8 +2,6 @@ namespace Confluent.Kafka { public sealed class Null { - public static Null Instance = new Null(); - private Null() {} } } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 920055537..49a48847a 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -25,7 +25,9 @@ public Producer(IEnumerable> config) // setup the rdkafka config object, including pointer to the delivery report callback. var rdKafkaConfig = new Config(config.Where(prop => prop.Key != "default.topic.config")); IntPtr rdKafkaConfigPtr = rdKafkaConfig.handle.Dup(); - // TODO: we might want to defer setting the report callback until we know we actually need it. + + // TODO: some use cases may never need DeliveryReport callbacks. In this case, we may + // be able to avoid setting this callback entirely. LibRdKafka.conf_set_dr_msg_cb(rdKafkaConfigPtr, DeliveryReportCallback); Init(RdKafkaType.Producer, rdKafkaConfigPtr, rdKafkaConfig.Logger); @@ -48,11 +50,6 @@ private Topic getKafkaTopic(string topic) private static void DeliveryReportCallbackImpl(IntPtr rk, ref rd_kafka_message rkmessage, IntPtr opaque) { - if (rkmessage._private == IntPtr.Zero) - { - return; - } - // msg_opaque was set by Topic.Produce var gch = GCHandle.FromIntPtr(rkmessage._private); var deliveryHandler = (IDeliveryHandler) gch.Target; @@ -81,31 +78,18 @@ public ISerializingProducer Wrap(ISerializer k return new SerializingProducer(this, keySerializer, valueSerializer); } - public Task ProduceAsync(string topic, byte[] val, int? partition = null, bool blockIfQueueFull = true) - => getKafkaTopic(topic).Produce(val, 0, val.Length, null, 0, 0, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); - public Task ProduceAsync(string topic, byte[] key, byte[] val, int? partition = null, bool blockIfQueueFull = true) - => getKafkaTopic(topic).Produce(val, 0, val.Length, key, 0, key.Length, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + => getKafkaTopic(topic).Produce(val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); public Task ProduceAsync(string topic, ArraySegment key, ArraySegment val, int? partition = null, bool blockIfQueueFull = true) => getKafkaTopic(topic).Produce(val.Array, val.Offset, val.Count, key.Array, key.Offset, key.Count, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); - // There are (or have been) 3 different patterns for doing async requests in .NET (two of them depreciated). - // https://msdn.microsoft.com/en-us/library/jj152938(v=vs.110).aspx - // None of them are to use a callback (as below), but I believe this makes the most sense in the current situation. - // A reason we might want these methods is the callbacks all happen on the same thread, whereas the Task completion ones don't. - // This is used by the benchmark example to avoid locking the counter (maybe this is a common thing we want to do?) - public void ProduceAsync(string topic, byte[] key, byte[] val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) - => getKafkaTopic(topic).Produce(val, 0, val.Length, deliveryHandler, key, 0, key.Length, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); - - public void ProduceAsync(string topic, ArraySegment key, ArraySegment val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) - => getKafkaTopic(topic).Produce(val.Array, val.Offset, val.Count, deliveryHandler, key.Array, key.Offset, key.Count, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); } internal class SerializingProducer : ISerializingProducer { - protected Producer producer; + protected readonly Producer producer; public ISerializer KeySerializer { get; } @@ -117,7 +101,7 @@ public SerializingProducer(Producer producer, ISerializer keySerializer, I KeySerializer = keySerializer; ValueSerializer = valueSerializer; - // TODO: specify serializers via config file as well? In which case, default keySerializer and valueSerizlizer params to null. + // TODO: allow serializers to be set in the producer config IEnumerable>. if (KeySerializer == null) { @@ -130,14 +114,8 @@ public SerializingProducer(Producer producer, ISerializer keySerializer, I } } - public Task ProduceAsync(string topic, TValue val, int? partition = null, bool blockIfQueueFull = true) - => producer.ProduceAsync(topic, ValueSerializer.Serialize(val), partition, blockIfQueueFull); - public Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true) - => producer.ProduceAsync(topic, KeySerializer.Serialize(key), ValueSerializer.Serialize(val), partition, blockIfQueueFull); - - public void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) - => producer.ProduceAsync(topic, KeySerializer.Serialize(key), ValueSerializer.Serialize(val), deliveryHandler, partition, blockIfQueueFull); + => producer.ProduceAsync(topic, KeySerializer?.Serialize(key), ValueSerializer?.Serialize(val), partition, blockIfQueueFull); public string Name => producer.Name; } @@ -145,8 +123,8 @@ public void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler de public class Producer : ISerializingProducer, IDisposable { - private Producer producer; - private ISerializingProducer serializingProducer; + private readonly Producer producer; + private readonly ISerializingProducer serializingProducer; public ISerializer KeySerializer => serializingProducer.KeySerializer; @@ -155,15 +133,9 @@ public class Producer : ISerializingProducer, IDispo public string Name => serializingProducer.Name; - public Task ProduceAsync(string topic, TValue val, int? partition = null, bool blockIfQueueFull = true) - => serializingProducer.ProduceAsync(topic, val, partition, blockIfQueueFull); - public Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true) => serializingProducer.ProduceAsync(topic, key, val, partition, blockIfQueueFull); - public void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler deliveryHandler = null, int? partition = null, bool blockIfQueueFull = true) - => serializingProducer.ProduceAsync(topic, key, val, deliveryHandler, partition, blockIfQueueFull); - public Producer( IEnumerable> config, ISerializer keySerializer, @@ -173,9 +145,11 @@ public Producer( serializingProducer = producer.Wrap(keySerializer, valueSerializer); } - public void Dispose() - { - producer.Dispose(); - } + public bool FlushOnDispose => producer.FlushOnDispose; + + public void Flush() => producer.Flush(); + + public void Dispose() => producer.Dispose(); + } } diff --git a/src/Confluent.Kafka/Serialization/IntDeserializer.cs b/src/Confluent.Kafka/Serialization/IntDeserializer.cs index 5ba2e9266..8e4d15c61 100644 --- a/src/Confluent.Kafka/Serialization/IntDeserializer.cs +++ b/src/Confluent.Kafka/Serialization/IntDeserializer.cs @@ -5,6 +5,9 @@ namespace Confluent.Kafka.Serialization { public class IntDeserializer : IDeserializer { + /// + /// Endianness depends on architecture + /// public int Deserialize(byte[] data) { return BitConverter.ToInt32(data, 0); diff --git a/src/Confluent.Kafka/Serialization/IntSerializer.cs b/src/Confluent.Kafka/Serialization/IntSerializer.cs index cf599de15..a1df7bfa1 100644 --- a/src/Confluent.Kafka/Serialization/IntSerializer.cs +++ b/src/Confluent.Kafka/Serialization/IntSerializer.cs @@ -4,6 +4,9 @@ namespace Confluent.Kafka.Serialization { public class IntSerializer : ISerializer { + /// + /// Endianness depends on architecture + /// public byte[] Serialize(int val) { return BitConverter.GetBytes(val); diff --git a/src/Confluent.Kafka/Serialization/NullDeserializer.cs b/src/Confluent.Kafka/Serialization/NullDeserializer.cs index 945c50c75..cea351d43 100644 --- a/src/Confluent.Kafka/Serialization/NullDeserializer.cs +++ b/src/Confluent.Kafka/Serialization/NullDeserializer.cs @@ -1,13 +1,10 @@ -using Confluent.Kafka; -using Confluent.Kafka.Serialization; - namespace Confluent.Kafka.Serialization { public class NullDeserializer : IDeserializer { public Null Deserialize(byte[] data) { - return Null.Instance; + return null; } } } diff --git a/src/Confluent.Kafka/Serialization/NullSerializer.cs b/src/Confluent.Kafka/Serialization/NullSerializer.cs index 7b478c261..1ac6a9243 100644 --- a/src/Confluent.Kafka/Serialization/NullSerializer.cs +++ b/src/Confluent.Kafka/Serialization/NullSerializer.cs @@ -7,4 +7,4 @@ public byte[] Serialize(Null val) return null; } } -} +} \ No newline at end of file diff --git a/src/Confluent.Kafka/Serialization/StringSerializer.cs b/src/Confluent.Kafka/Serialization/StringSerializer.cs index d7b9a27f4..397f6f225 100644 --- a/src/Confluent.Kafka/Serialization/StringSerializer.cs +++ b/src/Confluent.Kafka/Serialization/StringSerializer.cs @@ -19,11 +19,6 @@ public class StringSerializer : ISerializer { private Encoding encoding; - public StringSerializer() - { - encoding = Encoding.UTF8; - } - public StringSerializer(Encoding encoding) { this.encoding = encoding; diff --git a/src/Confluent.Kafka/Topic.cs b/src/Confluent.Kafka/Topic.cs index 4f8ebdfe4..946b045ae 100644 --- a/src/Confluent.Kafka/Topic.cs +++ b/src/Confluent.Kafka/Topic.cs @@ -58,67 +58,21 @@ public void Dispose() public Task Produce(byte[] val, int valOffset, int valLength, byte[] key = null, int keyOffset = 0, int keyLength = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) { - // Passes the TaskCompletionSource to the delivery report callback - // via the msg_opaque pointer - var deliveryCompletionSource = new TaskDeliveryHandler(); - Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, deliveryCompletionSource, blockIfQueueFull); - return deliveryCompletionSource.Task; - } + // Passes the TaskCompletionSource to the delivery report callback via the msg_opaque pointer - /// - /// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface. - /// - /// - /// Value to send to Kafka. Can be null. - /// - /// - /// Number of bytes to use from val buffer - /// - /// - /// IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs. - /// Can be null. - /// - /// - /// (Optional) The key associated with (or null if no key is specified). - /// - /// - /// Number of bytes to use from key buffer - /// - /// - /// (Optional) The topic partition to which will be sent (or -1 if no partition is specified). - /// - /// - /// Thrown if is null. - /// - /// - /// Methods of will be executed in an RdKafka-internal thread and will block other operations - /// - consider this when implementing IDeliveryHandler. - /// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations. - /// - public void Produce(byte[] val, int valOffset, int valLength, IDeliveryHandler deliveryHandler, byte[] key = null, int keyOffset = 0, int keyLength = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) - { - Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, deliveryHandler, blockIfQueueFull); - } - - private void Produce(byte[] val, int valOffset, int valLength, byte[] key, int keyOffset, int keyLength, Int32 partition, object deliveryHandler, bool blockIfQueueFull) - { - if (deliveryHandler == null) - { - if (handle.Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, IntPtr.Zero, blockIfQueueFull) != 0) - { - throw RdKafkaException.FromErr(LibRdKafka.last_error(), "Could not produce message"); - } - return; - } - - var gch = GCHandle.Alloc(deliveryHandler); + var deliveryCompletionSource = new TaskDeliveryHandler(); + var gch = GCHandle.Alloc(deliveryCompletionSource); var ptr = GCHandle.ToIntPtr(gch); + if (handle.Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, ptr, blockIfQueueFull) != 0) { var err = LibRdKafka.last_error(); gch.Free(); + // TODO: Use proper error string (rd_kafka_err2str(..last_error)) throw RdKafkaException.FromErr(err, "Could not produce message"); } + + return deliveryCompletionSource.Task; } /// From acd0ab28661ebb715aac56e11f95e0f1c012d455 Mon Sep 17 00:00:00 2001 From: Matt Howlett Date: Tue, 22 Nov 2016 14:08:09 -0800 Subject: [PATCH 3/5] review II --- examples/AdvancedProducer/Program.cs | 25 ++++++++++++++++--- examples/Benchmark/Program.cs | 1 - examples/SimpleProducer/Program.cs | 3 +++ examples/Wrapped/Program.cs | 10 +++++--- src/Confluent.Kafka/Handle.cs | 8 ++---- src/Confluent.Kafka/Producer.cs | 2 -- .../Serialization/IntDeserializer.cs | 12 ++++----- .../Serialization/IntSerializer.cs | 12 +++++---- .../Serialization/StringDeserializer.cs | 5 ---- 9 files changed, 47 insertions(+), 31 deletions(-) diff --git a/examples/AdvancedProducer/Program.cs b/examples/AdvancedProducer/Program.cs index c1fe67277..20787656c 100644 --- a/examples/AdvancedProducer/Program.cs +++ b/examples/AdvancedProducer/Program.cs @@ -1,4 +1,5 @@ using System; +using System.IO; using System.Text; using System.Collections.Generic; using System.Threading.Tasks; @@ -45,11 +46,29 @@ public static void Main(string[] args) Console.WriteLine("> key value"); Console.WriteLine("To create a kafka message with empty key and UTF-8 encoded value:"); Console.WriteLine("> value"); - Console.WriteLine("'q' to quit.\n"); + Console.WriteLine("Ctrl-C to quit.\n"); - string text; - while ((text = Console.ReadLine()) != "q") + var cancelled = false; + Console.CancelKeyPress += (object sender, ConsoleCancelEventArgs e) => { + e.Cancel = true; // prevent the process from terminating. + cancelled = true; + }; + + while (!cancelled) { + Console.Write("> "); + + string text; + try + { + text = Console.ReadLine(); + } + catch (IOException) + { + // IO exception is thrown when ConsoleCancelEventArgs.Cancel == true. + break; + } + var key = ""; var val = text; diff --git a/examples/Benchmark/Program.cs b/examples/Benchmark/Program.cs index 73f4821a5..a75c6456d 100644 --- a/examples/Benchmark/Program.cs +++ b/examples/Benchmark/Program.cs @@ -1,7 +1,6 @@ using System; using System.Linq; using System.Collections.Generic; -using System.Diagnostics; using System.Threading; using System.Threading.Tasks; diff --git a/examples/SimpleProducer/Program.cs b/examples/SimpleProducer/Program.cs index 6108d4438..ed18cb2af 100644 --- a/examples/SimpleProducer/Program.cs +++ b/examples/SimpleProducer/Program.cs @@ -28,6 +28,9 @@ public static void Main(string[] args) Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); }); } + + // Tasks are not waited on, it's possible they may still in progress here. + producer.Flush(); } } } diff --git a/examples/Wrapped/Program.cs b/examples/Wrapped/Program.cs index 45abd28e8..748af0f8e 100644 --- a/examples/Wrapped/Program.cs +++ b/examples/Wrapped/Program.cs @@ -28,18 +28,22 @@ public static void Main(string[] args) // sProducer2 is another lightweight wrapper around kafkaProducer that adds // (null, int) serialization. When you do not wish to write any data to a key // or value, the Null type should be used. - var sProducer2 = producer.Wrap(null, new IntSerializer()); + var sProducer2 = producer.Wrap(new NullSerializer(), new IntSerializer()); // write (string, string) data to topic "first-topic", statically type checked. sProducer1.ProduceAsync("first-topic", "my-key-value", "my-value"); // write (null, int) data to topic "second-data". statically type checked, using - // the same underlying producer as the producer1. + // the same underlying producer as the producer1. sProducer2.ProduceAsync("second-topic", null, 42); // producers are NOT tied to topics. Although it's unusual that you might want to - // do so, you can use different serializing producers to write to the same topic. + // do so, you can use different serializing producers to write to the same topic. sProducer2.ProduceAsync("first-topic", null, 107); + + // ProducerAsync tasks are not waited on - there is a good chance they are still + // in flight. + producer.Flush(); } } } diff --git a/src/Confluent.Kafka/Handle.cs b/src/Confluent.Kafka/Handle.cs index 700af7672..849c73a25 100644 --- a/src/Confluent.Kafka/Handle.cs +++ b/src/Confluent.Kafka/Handle.cs @@ -69,10 +69,10 @@ internal void Init(RdKafkaType type, IntPtr config, Config.LogCallback logger) callbackTask = StartCallbackTask(callbackCts.Token); } - public bool FlushOnDispose { get; set; } = true; - + // TODO: Add timout parameter (with default option == block indefinitely) when use rd_kafka_flush. public void Flush() { + // TODO: use rd_kafka_flush here instead.. while (OutQueueLength > 0) { handle.Poll((IntPtr) 100); @@ -93,10 +93,6 @@ protected virtual void Dispose(bool disposing) if (disposing) { - if (FlushOnDispose) - { - Flush(); - } handle.Dispose(); } } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 49a48847a..c7730a4a4 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -145,8 +145,6 @@ public Producer( serializingProducer = producer.Wrap(keySerializer, valueSerializer); } - public bool FlushOnDispose => producer.FlushOnDispose; - public void Flush() => producer.Flush(); public void Dispose() => producer.Dispose(); diff --git a/src/Confluent.Kafka/Serialization/IntDeserializer.cs b/src/Confluent.Kafka/Serialization/IntDeserializer.cs index 8e4d15c61..9f86b67d2 100644 --- a/src/Confluent.Kafka/Serialization/IntDeserializer.cs +++ b/src/Confluent.Kafka/Serialization/IntDeserializer.cs @@ -1,16 +1,16 @@ -using System; - namespace Confluent.Kafka.Serialization { public class IntDeserializer : IDeserializer { - /// - /// Endianness depends on architecture - /// public int Deserialize(byte[] data) { - return BitConverter.ToInt32(data, 0); + // network byte order -> big endian -> most significant byte in the smallest address. + return + (((int)data[0]) << 24) + + (((int)data[1]) << 16) + + (((int)data[2]) << 8) + + (int)data[3]; } } } diff --git a/src/Confluent.Kafka/Serialization/IntSerializer.cs b/src/Confluent.Kafka/Serialization/IntSerializer.cs index a1df7bfa1..347db699c 100644 --- a/src/Confluent.Kafka/Serialization/IntSerializer.cs +++ b/src/Confluent.Kafka/Serialization/IntSerializer.cs @@ -1,15 +1,17 @@ -using System; namespace Confluent.Kafka.Serialization { public class IntSerializer : ISerializer { - /// - /// Endianness depends on architecture - /// public byte[] Serialize(int val) { - return BitConverter.GetBytes(val); + var result = new byte[4]; // int is always 32 bits on .NET. + // network byte order -> big endian -> most significant byte in the smallest address. + result[0] = (byte)(val >> 24); + result[1] = (byte)(val >> 16); + result[2] = (byte)(val >> 8); + result[3] = (byte)val; + return result; } } } diff --git a/src/Confluent.Kafka/Serialization/StringDeserializer.cs b/src/Confluent.Kafka/Serialization/StringDeserializer.cs index a5bd1a695..9c0466b1d 100644 --- a/src/Confluent.Kafka/Serialization/StringDeserializer.cs +++ b/src/Confluent.Kafka/Serialization/StringDeserializer.cs @@ -6,11 +6,6 @@ public class StringDeserializer : IDeserializer { Encoding encoding; - StringDeserializer() - { - encoding = Encoding.UTF8; - } - StringDeserializer(Encoding encoding) { this.encoding = encoding; From 6c7553bc5177c0c7d793ca6cd002c0672d022ab2 Mon Sep 17 00:00:00 2001 From: Matt Howlett Date: Mon, 28 Nov 2016 08:52:10 -0800 Subject: [PATCH 4/5] review III --- src/Confluent.Kafka/DeliveryReport.cs | 9 +++++++++ src/Confluent.Kafka/Impl/SafeTopicHandle.cs | 3 +++ src/Confluent.Kafka/Producer.cs | 4 ++++ src/Confluent.Kafka/Serialization/ISerializer.cs | 1 + src/Confluent.Kafka/Serialization/IntSerializer.cs | 2 ++ src/Confluent.Kafka/Serialization/StringDeserializer.cs | 3 +++ src/Confluent.Kafka/Serialization/StringSerializer.cs | 2 ++ 7 files changed, 24 insertions(+) diff --git a/src/Confluent.Kafka/DeliveryReport.cs b/src/Confluent.Kafka/DeliveryReport.cs index d0e7af11e..80e751cd5 100644 --- a/src/Confluent.Kafka/DeliveryReport.cs +++ b/src/Confluent.Kafka/DeliveryReport.cs @@ -1,5 +1,14 @@ namespace Confluent.Kafka { + /* + TODO: (via ewencp): The equivalent in the Python client fills in more information -- + the callbacks accept (err, msg) parameters, where the latter is + http://docs.confluent.io/3.1.0/clients/confluent-kafka-python/index.html#confluent_kafka.Message + Same deal with Go where the DR channel gets one of these: + http://docs.confluent.io/3.1.0/clients/confluent-kafka-go/index.html#Message Is this being + kept more minimal intentionally? + */ + public struct DeliveryReport { public int Partition; diff --git a/src/Confluent.Kafka/Impl/SafeTopicHandle.cs b/src/Confluent.Kafka/Impl/SafeTopicHandle.cs index 5e608216d..120737a87 100644 --- a/src/Confluent.Kafka/Impl/SafeTopicHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeTopicHandle.cs @@ -50,6 +50,9 @@ internal long Produce(byte[] val, int valOffset, int valLength, byte[] key, int try { + // TODO: when refactor complete, reassess the below note. + // Note: since the message queue threshold limit also includes delivery reports, it is important that another + // thread of the application calls poll() for a blocking produce() to ever unblock. return (long) LibRdKafka.produce( handle, partition, diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index c7730a4a4..5a41415cc 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -133,6 +133,10 @@ public class Producer : ISerializingProducer, IDispo public string Name => serializingProducer.Name; + // TODO: In the future, we can introduce a new overload that takes a Message + // rather than T, V and add timestamp, headers etc into that. + // TODO: put partition in there also and implement in this version. + public Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true) => serializingProducer.ProduceAsync(topic, key, val, partition, blockIfQueueFull); diff --git a/src/Confluent.Kafka/Serialization/ISerializer.cs b/src/Confluent.Kafka/Serialization/ISerializer.cs index 63fb8e7e4..e672f7c8a 100644 --- a/src/Confluent.Kafka/Serialization/ISerializer.cs +++ b/src/Confluent.Kafka/Serialization/ISerializer.cs @@ -1,5 +1,6 @@ namespace Confluent.Kafka.Serialization { + // TODO: Consider not having this. Consider replacing with Func. This would be more normal, and I don't think we need the flexibility provided by having an interface. public interface ISerializer { byte[] Serialize(T data); diff --git a/src/Confluent.Kafka/Serialization/IntSerializer.cs b/src/Confluent.Kafka/Serialization/IntSerializer.cs index 347db699c..bac36bbf4 100644 --- a/src/Confluent.Kafka/Serialization/IntSerializer.cs +++ b/src/Confluent.Kafka/Serialization/IntSerializer.cs @@ -1,4 +1,6 @@ +// TODO: Consider whether we want to allow this serializer to handle nulls. Perhaps a separate serialzer that can? + namespace Confluent.Kafka.Serialization { public class IntSerializer : ISerializer diff --git a/src/Confluent.Kafka/Serialization/StringDeserializer.cs b/src/Confluent.Kafka/Serialization/StringDeserializer.cs index 9c0466b1d..fa77cd06e 100644 --- a/src/Confluent.Kafka/Serialization/StringDeserializer.cs +++ b/src/Confluent.Kafka/Serialization/StringDeserializer.cs @@ -1,5 +1,8 @@ using System.Text; +// TODO: Deserializer needs to be able to handle nulls and differentiate between null and empty string. + + namespace Confluent.Kafka.Serialization { public class StringDeserializer : IDeserializer diff --git a/src/Confluent.Kafka/Serialization/StringSerializer.cs b/src/Confluent.Kafka/Serialization/StringSerializer.cs index 397f6f225..98acb1512 100644 --- a/src/Confluent.Kafka/Serialization/StringSerializer.cs +++ b/src/Confluent.Kafka/Serialization/StringSerializer.cs @@ -1,5 +1,7 @@ using System.Text; +// TODO: Serializer needs to be able to handle nulls and differentiate between null and empty string. + namespace Confluent.Kafka.Serialization { /// From 207db7f681b018ea600cd3095547fa5a3debfd06 Mon Sep 17 00:00:00 2001 From: Matt Howlett Date: Mon, 28 Nov 2016 11:10:27 -0800 Subject: [PATCH 5/5] review IV --- .../Serialization/IntDeserializer.cs | 6 +- .../Serialization/IntSerializer.cs | 9 ++- test/Confluent.Kafka.Tests/IntSerdeTests.cs | 64 +++++++++++++++++++ 3 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 test/Confluent.Kafka.Tests/IntSerdeTests.cs diff --git a/src/Confluent.Kafka/Serialization/IntDeserializer.cs b/src/Confluent.Kafka/Serialization/IntDeserializer.cs index 9f86b67d2..822786ebf 100644 --- a/src/Confluent.Kafka/Serialization/IntDeserializer.cs +++ b/src/Confluent.Kafka/Serialization/IntDeserializer.cs @@ -7,9 +7,9 @@ public int Deserialize(byte[] data) { // network byte order -> big endian -> most significant byte in the smallest address. return - (((int)data[0]) << 24) + - (((int)data[1]) << 16) + - (((int)data[2]) << 8) + + (((int)data[0]) << 24) | + (((int)data[1]) << 16) | + (((int)data[2]) << 8) | (int)data[3]; } } diff --git a/src/Confluent.Kafka/Serialization/IntSerializer.cs b/src/Confluent.Kafka/Serialization/IntSerializer.cs index bac36bbf4..719a0a79b 100644 --- a/src/Confluent.Kafka/Serialization/IntSerializer.cs +++ b/src/Confluent.Kafka/Serialization/IntSerializer.cs @@ -9,10 +9,13 @@ public byte[] Serialize(int val) { var result = new byte[4]; // int is always 32 bits on .NET. // network byte order -> big endian -> most significant byte in the smallest address. + // Note: At the IL level, the conv.u1 operator is used to cast int to byte which truncates + // the high order bits if overflow occurs. + // https://msdn.microsoft.com/en-us/library/system.reflection.emit.opcodes.conv_u1.aspx result[0] = (byte)(val >> 24); - result[1] = (byte)(val >> 16); - result[2] = (byte)(val >> 8); - result[3] = (byte)val; + result[1] = (byte)(val >> 16); // & 0xff; + result[2] = (byte)(val >> 8); // & 0xff; + result[3] = (byte)val; // & 0xff; return result; } } diff --git a/test/Confluent.Kafka.Tests/IntSerdeTests.cs b/test/Confluent.Kafka.Tests/IntSerdeTests.cs new file mode 100644 index 000000000..3653a3a30 --- /dev/null +++ b/test/Confluent.Kafka.Tests/IntSerdeTests.cs @@ -0,0 +1,64 @@ +using System; +using Xunit; +using Confluent.Kafka.Serialization; + + +namespace Confluent.Kafka.Tests +{ + public class IntSerdeTests + { + private static readonly int[] toTest = new int[] + { + 0, 1, -1, 42, -42, 127, 128, 128, -127, -128, -129, + 254, 255, 256, 257, -254, -255, -256, -257, + (int)short.MinValue-1, (int)short.MinValue, (int)short.MinValue+1, + (int)short.MaxValue-1, (int)short.MaxValue, (int)short.MaxValue+1, + int.MaxValue-1, int.MaxValue, int.MinValue, int.MinValue + 1 + }; + + [Fact] + public void IsBigEndian() + { + var serializer = new IntSerializer(); + var bytes = serializer.Serialize(42); + Assert.Equal(bytes.Length, 4); + // most significant byte in smallest address. + Assert.Equal(bytes[0], 0); + Assert.Equal(bytes[3], 42); + } + + [Fact] + public void SerializationAgreesWithSystemNetHostToNetworkOrder() + { + foreach (int theInt in toTest) + { + int networkOrder = System.Net.IPAddress.HostToNetworkOrder(theInt); + var bytes1 = BitConverter.GetBytes(networkOrder); + + var serializer = new IntSerializer(); + var bytes2 = serializer.Serialize(theInt); + + Assert.Equal(bytes1.Length, bytes2.Length); + + for (int i=0; i