diff --git a/examples/AdvancedProducer/Program.cs b/examples/AdvancedProducer/Program.cs index bf303a29f..20787656c 100644 --- a/examples/AdvancedProducer/Program.cs +++ b/examples/AdvancedProducer/Program.cs @@ -1,4 +1,5 @@ using System; +using System.IO; using System.Text; using System.Collections.Generic; using System.Threading.Tasks; @@ -36,14 +37,8 @@ public static void Main(string[] args) var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8))) { - // TODO: work out why explicit cast is needed here. - // TODO: remove need to explicitly specify string serializers - assume Utf8StringSerializer in Producer as default. - // TODO: allow be be set only in constructor. make readonly. - producer.KeySerializer = (ISerializer)new Confluent.Kafka.Serialization.Utf8StringSerializer(); - producer.ValueSerializer = producer.KeySerializer; - Console.WriteLine("\n-----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); Console.WriteLine("-----------------------------------------------------------------------"); @@ -68,7 +63,7 @@ public static void Main(string[] args) { text = Console.ReadLine(); } - catch + catch (IOException) { // IO exception is thrown when ConsoleCancelEventArgs.Cancel == true. break; @@ -82,10 +77,10 @@ public static void Main(string[] args) if (index != -1) { key = text.Substring(0, index); - val = text.Substring(index); + val = text.Substring(index + 1); } - Task deliveryReport = producer.Produce(topicName, key, val); + Task deliveryReport = producer.ProduceAsync(topicName, key, val); var result = deliveryReport.Result; // synchronously waits for message to be produced. Console.WriteLine($"Partition: {result.Partition}, Offset: {result.Offset}"); } diff --git a/examples/Benchmark/Program.cs b/examples/Benchmark/Program.cs index 7e4dbf6cc..a75c6456d 100644 --- a/examples/Benchmark/Program.cs +++ b/examples/Benchmark/Program.cs @@ -1,50 +1,56 @@ using System; +using System.Linq; using System.Collections.Generic; -using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -using Confluent.Kafka.Serialization; namespace Confluent.Kafka.Benchmark { public class Program { - public class DeliveryHandler : IDeliveryHandler + public class BenchmarkProducer { - public void SetException(Exception exception) + public static void Run(string broker, string topicName, int numberOfMessagesToProduce, int numberOfTests) { - throw exception; - } - - public void SetResult(DeliveryReport deliveryReport) - { - } - } - - public static void Produce(string broker, string topicName, long numMessages) - { - var deliveryHandler = new DeliveryHandler(); - - var config = new Dictionary { { "bootstrap.servers", broker } }; - - using (var producer = new Producer(config)) - { - // TODO: remove need to explicitly specify this serializer. - producer.ValueSerializer = (ISerializer)new ByteArraySerializer(); + // mirrors the librdkafka performance test example. + var config = new Dictionary + { + { "bootstrap.servers", broker }, + { "queue.buffering.max.messages", 500000 }, + { "message.send.max.retries", 3 }, + { "retry.backoff.ms", 500 }, + { "session.timeout.ms", 6000 } + }; - Console.WriteLine($"{producer.Name} producing on {topicName}"); - // TODO: think more about exactly what we want to benchmark. - var payload = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; - for (int i = 0; i < numMessages; i++) + using (var producer = new Producer(config)) { - producer.ProduceWithDeliveryReport(topicName, payload, deliveryHandler); + for (var j=0; j ++cnt).ToArray(); + + var startTime = DateTime.Now.Ticks; + var tasks = new Task[numberOfMessagesToProduce]; + for (int i = 0; i < numberOfMessagesToProduce; i++) + { + tasks[i] = producer.ProduceAsync(topicName, null, val); + } + Task.WaitAll(tasks); + var duration = DateTime.Now.Ticks - startTime; + + Console.WriteLine($"Produced {numberOfMessagesToProduce} in {duration/10000.0:F0}ms"); + Console.WriteLine($"{numberOfMessagesToProduce / (duration/10000.0):F0} messages/ms"); + } + + Console.WriteLine("Disposing producer"); } - - Console.WriteLine("Shutting down"); } } + // TODO: Update Consumer benchmark for new Consumer when it's written. public static async Task Consume(string broker, string topic) { long n = 0; @@ -86,24 +92,7 @@ public static void Main(string[] args) string brokerList = args[0]; string topic = args[1]; - long numMessages = 1000000; - - var stopwatch = new Stopwatch(); - - // TODO: we really want time from first ack. as it is, this includes producer startup time. - stopwatch.Start(); - Produce(brokerList, topic, numMessages); - stopwatch.Stop(); - - Console.WriteLine($"Sent {numMessages} messages in {stopwatch.Elapsed}"); - Console.WriteLine($"{numMessages / stopwatch.Elapsed.TotalSeconds:F0} messages/second"); - - stopwatch.Restart(); - long n = Consume(brokerList, topic).Result; - stopwatch.Stop(); - - Console.WriteLine($"Received {n} messages in {stopwatch.Elapsed}"); - Console.WriteLine($"{n / stopwatch.Elapsed.TotalSeconds:F0} messages/second"); + BenchmarkProducer.Run(brokerList, topic, 5000000, 4); } } } diff --git a/examples/Misc/Program.cs b/examples/Misc/Program.cs index 9f86612d8..ca0ac7049 100644 --- a/examples/Misc/Program.cs +++ b/examples/Misc/Program.cs @@ -13,7 +13,7 @@ static async Task ListGroups(string brokerList) { var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config)) { var groups = await producer.ListGroups(TimeSpan.FromSeconds(10)); Console.WriteLine($"Consumer Groups:"); @@ -27,9 +27,7 @@ static async Task ListGroups(string brokerList) { Console.WriteLine($" {m.MemberId} {m.ClientId} {m.ClientHost}"); Console.WriteLine($" Metadata: {m.MemberMetadata.Length} bytes"); - //Console.WriteLine(System.Text.Encoding.UTF8.GetString(m.MemberMetadata)); Console.WriteLine($" Assignment: {m.MemberAssignment.Length} bytes"); - //Console.WriteLine(System.Text.Encoding.UTF8.GetString(m.MemberAssignment)); } } } @@ -38,7 +36,7 @@ static async Task ListGroups(string brokerList) static async Task PrintMetadata(string brokerList) { var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config)) { var meta = await producer.Metadata(); Console.WriteLine($"{meta.OriginatingBrokerId} {meta.OriginatingBrokerName}"); diff --git a/examples/SimpleProducer/Program.cs b/examples/SimpleProducer/Program.cs index 9aa122554..ed18cb2af 100644 --- a/examples/SimpleProducer/Program.cs +++ b/examples/SimpleProducer/Program.cs @@ -1,4 +1,5 @@ using System; +using System.Text; using System.Collections.Generic; using System.Threading.Tasks; using Confluent.Kafka.Serialization; @@ -14,23 +15,22 @@ public static void Main(string[] args) var config = new Dictionary { { "bootstrap.servers", brokerList } }; - using (var producer = new Producer(config)) + using (var producer = new Producer(config, new NullSerializer(), new StringSerializer(Encoding.UTF8))) { - // TODO: figure out why the cast below is necessary and how to avoid it. - // TODO: There should be no need to specify a serializer for common types like string - I think it should default to the UTF8 serializer. - producer.ValueSerializer = (ISerializer)new Confluent.Kafka.Serialization.Utf8StringSerializer(); - Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit."); string text; while ((text = Console.ReadLine()) != "q") { - Task deliveryReport = producer.Produce(topicName, text); + Task deliveryReport = producer.ProduceAsync(topicName, null, text); var unused = deliveryReport.ContinueWith(task => { Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); }); } + + // Tasks are not waited on, it's possible they may still in progress here. + producer.Flush(); } } } diff --git a/examples/Wrapped/Program.cs b/examples/Wrapped/Program.cs new file mode 100644 index 000000000..748af0f8e --- /dev/null +++ b/examples/Wrapped/Program.cs @@ -0,0 +1,50 @@ +using System.Text; +using System.Collections.Generic; +using Confluent.Kafka.Serialization; + +namespace Confluent.Kafka.Wrapped +{ + /// + /// An example showing how to wrap a single Producer to produce messages using + /// different serializers. + /// + /// + /// If you only want to use a single pair of serializers in your application, + /// you should use the Producer<TKey, TValue> constructor instead. + /// + public class Program + { + public static void Main(string[] args) + { + var config = new Dictionary { { "bootstrap.servers", args[0] } }; + + using (var producer = new Producer(config)) + { + // sProducer1 is a lightweight wrapper around a Producer instance that adds + // (string, string) serialization. Note that sProducer1 does not need to be + // (and cannot be) disposed. + var sProducer1 = producer.Wrap(new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)); + + // sProducer2 is another lightweight wrapper around kafkaProducer that adds + // (null, int) serialization. When you do not wish to write any data to a key + // or value, the Null type should be used. + var sProducer2 = producer.Wrap(new NullSerializer(), new IntSerializer()); + + // write (string, string) data to topic "first-topic", statically type checked. + sProducer1.ProduceAsync("first-topic", "my-key-value", "my-value"); + + // write (null, int) data to topic "second-data". statically type checked, using + // the same underlying producer as the producer1. + sProducer2.ProduceAsync("second-topic", null, 42); + + // producers are NOT tied to topics. Although it's unusual that you might want to + // do so, you can use different serializing producers to write to the same topic. + sProducer2.ProduceAsync("first-topic", null, 107); + + // ProducerAsync tasks are not waited on - there is a good chance they are still + // in flight. + producer.Flush(); + } + } + } +} diff --git a/examples/Wrapped/project.json b/examples/Wrapped/project.json new file mode 100644 index 000000000..08578640a --- /dev/null +++ b/examples/Wrapped/project.json @@ -0,0 +1,25 @@ +{ + "version": "1.0.0", + "authors": ["Confluent Inc"], + + "buildOptions": { + "emitEntryPoint": true + }, + + "dependencies": { + "Confluent.Kafka": { + "target": "project" + } + }, + + "frameworks": { + "netcoreapp1.0": { + "dependencies": { + "Microsoft.NETCore.App": { + "type": "platform", + "version": "1.0.0" + } + } + } + } +} diff --git a/src/Confluent.Kafka/DeliveryReport.cs b/src/Confluent.Kafka/DeliveryReport.cs new file mode 100644 index 000000000..80e751cd5 --- /dev/null +++ b/src/Confluent.Kafka/DeliveryReport.cs @@ -0,0 +1,17 @@ +namespace Confluent.Kafka +{ + /* + TODO: (via ewencp): The equivalent in the Python client fills in more information -- + the callbacks accept (err, msg) parameters, where the latter is + http://docs.confluent.io/3.1.0/clients/confluent-kafka-python/index.html#confluent_kafka.Message + Same deal with Go where the DR channel gets one of these: + http://docs.confluent.io/3.1.0/clients/confluent-kafka-go/index.html#Message Is this being + kept more minimal intentionally? + */ + + public struct DeliveryReport + { + public int Partition; + public long Offset; + } +} diff --git a/src/Confluent.Kafka/Handle.cs b/src/Confluent.Kafka/Handle.cs index e616274f6..849c73a25 100644 --- a/src/Confluent.Kafka/Handle.cs +++ b/src/Confluent.Kafka/Handle.cs @@ -69,6 +69,16 @@ internal void Init(RdKafkaType type, IntPtr config, Config.LogCallback logger) callbackTask = StartCallbackTask(callbackCts.Token); } + // TODO: Add timout parameter (with default option == block indefinitely) when use rd_kafka_flush. + public void Flush() + { + // TODO: use rd_kafka_flush here instead.. + while (OutQueueLength > 0) + { + handle.Poll((IntPtr) 100); + } + } + public void Dispose() { Dispose(true); @@ -83,12 +93,6 @@ protected virtual void Dispose(bool disposing) if (disposing) { - // Wait until all outstanding sends have completed. - while (OutQueueLength > 0) - { - handle.Poll((IntPtr) 100); - } - handle.Dispose(); } } diff --git a/src/Confluent.Kafka/ISerializingProducer.cs b/src/Confluent.Kafka/ISerializingProducer.cs new file mode 100644 index 000000000..006d4975e --- /dev/null +++ b/src/Confluent.Kafka/ISerializingProducer.cs @@ -0,0 +1,17 @@ +using System.Threading.Tasks; +using Confluent.Kafka.Serialization; + + +namespace Confluent.Kafka +{ + public interface ISerializingProducer + { + string Name { get; } + + ISerializer KeySerializer { get; } + + ISerializer ValueSerializer { get; } + + Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true); + } +} diff --git a/src/Confluent.Kafka/Impl/LibRdKafka.cs b/src/Confluent.Kafka/Impl/LibRdKafka.cs index fd683c7a4..4ab495957 100644 --- a/src/Confluent.Kafka/Impl/LibRdKafka.cs +++ b/src/Confluent.Kafka/Impl/LibRdKafka.cs @@ -341,14 +341,14 @@ internal static ErrorCode committed(IntPtr rk, IntPtr partitions, IntPtr timeout internal static ErrorCode position(IntPtr rk, IntPtr partitions) => _position(rk, partitions); - private static Func _produce; internal static IntPtr produce( IntPtr rkt, int partition, IntPtr msgflags, - byte[] val, UIntPtr len, - byte[] key, UIntPtr keylen, + IntPtr val, UIntPtr len, + IntPtr key, UIntPtr keylen, IntPtr msg_opaque) => _produce(rkt, partition, msgflags, val, len, key, keylen, msg_opaque); @@ -607,8 +607,8 @@ internal static extern IntPtr rd_kafka_produce( IntPtr rkt, int partition, IntPtr msgflags, - byte[] val, UIntPtr len, - byte[] key, UIntPtr keylen, + IntPtr val, UIntPtr len, + IntPtr key, UIntPtr keylen, IntPtr msg_opaque); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] @@ -644,5 +644,6 @@ internal static extern IntPtr rd_kafka_brokers_add(IntPtr rk, [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_wait_destroyed(IntPtr timeout_ms); } + } } diff --git a/src/Confluent.Kafka/Impl/SafeTopicHandle.cs b/src/Confluent.Kafka/Impl/SafeTopicHandle.cs index fc2227046..120737a87 100644 --- a/src/Confluent.Kafka/Impl/SafeTopicHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeTopicHandle.cs @@ -28,14 +28,52 @@ protected override bool ReleaseHandle() internal string GetName() => Marshal.PtrToStringAnsi(LibRdKafka.topic_name(handle)); - internal long Produce(byte[] val, int valLength, byte[] key, int keyCount, int partition, IntPtr opaque, bool blockIfQueueFull) - => (long) LibRdKafka.produce( - handle, - partition, - (IntPtr) (MsgFlags.MSG_F_COPY | (blockIfQueueFull ? MsgFlags.MSG_F_BLOCK : 0)), - val, (UIntPtr) valLength, - key, (UIntPtr) keyCount, - opaque); + internal long Produce(byte[] val, int valOffset, int valLength, byte[] key, int keyOffset, int keyLength, int partition, IntPtr opaque, bool blockIfQueueFull) + { + var pValue = IntPtr.Zero; + var pKey = IntPtr.Zero; + + var gchValue = default(GCHandle); + var gchKey = default(GCHandle); + + if (val != null) + { + gchValue = GCHandle.Alloc(val, GCHandleType.Pinned); + pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset); + } + + if (key != null) + { + gchKey = GCHandle.Alloc(key, GCHandleType.Pinned); + pKey = Marshal.UnsafeAddrOfPinnedArrayElement(key, keyOffset); + } + + try + { + // TODO: when refactor complete, reassess the below note. + // Note: since the message queue threshold limit also includes delivery reports, it is important that another + // thread of the application calls poll() for a blocking produce() to ever unblock. + return (long) LibRdKafka.produce( + handle, + partition, + (IntPtr) (MsgFlags.MSG_F_COPY | (blockIfQueueFull ? MsgFlags.MSG_F_BLOCK : 0)), + pValue, (UIntPtr) valLength, + pKey, (UIntPtr) keyLength, + opaque); + } + finally + { + if (val != null) + { + gchValue.Free(); + } + + if (key != null) + { + gchKey.Free(); + } + } + } internal bool PartitionAvailable(int partition) => LibRdKafka.topic_partition_available(handle, partition); } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index a83180c5d..5a41415cc 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -9,34 +9,28 @@ namespace Confluent.Kafka { - /// - /// High-level, asynchronous message producer. - /// - public class Producer : Handle + public class Producer : Handle { - // TODO: allow be be set only in constructor. make readonly. - // TODO: These should arguably be left as properties here. - // TODO: And/or these could be in config + use reflection to find the class. - public ISerializer KeySerializer { get; set; } - - // TODO: allow be be set only in constructor. make readonly. - public ISerializer ValueSerializer { get; set; } + private const int RD_KAFKA_PARTITION_UA = -1; private IEnumerable> topicConfig; + // TODO: get rid of the Topic class. private SafeDictionary topics = new SafeDictionary(); public Producer(IEnumerable> config) { this.topicConfig = (IEnumerable>)config.FirstOrDefault(prop => prop.Key == "default.topic.config").Value; + + // setup the rdkafka config object, including pointer to the delivery report callback. var rdKafkaConfig = new Config(config.Where(prop => prop.Key != "default.topic.config")); + IntPtr rdKafkaConfigPtr = rdKafkaConfig.handle.Dup(); - // TODO: If serializers aren't specified in config, then we could use defaults associated with TKey, TValue if - // we have matching Confluent.Kafka.Serialization serializers available. + // TODO: some use cases may never need DeliveryReport callbacks. In this case, we may + // be able to avoid setting this callback entirely. + LibRdKafka.conf_set_dr_msg_cb(rdKafkaConfigPtr, DeliveryReportCallback); - IntPtr cfgPtr = rdKafkaConfig.handle.Dup(); - LibRdKafka.conf_set_dr_msg_cb(cfgPtr, DeliveryReportDelegate); - Init(RdKafkaType.Producer, cfgPtr, rdKafkaConfig.Logger); + Init(RdKafkaType.Producer, rdKafkaConfigPtr, rdKafkaConfig.Logger); } private Topic getKafkaTopic(string topic) @@ -52,27 +46,9 @@ private Topic getKafkaTopic(string topic) return kafkaTopic; } + private static readonly LibRdKafka.DeliveryReportCallback DeliveryReportCallback = DeliveryReportCallbackImpl; - // TODO: Support the other function overloads in Topic. - // TODO: I'd like a way to produce as (byte[], offset, length) as well if possible all the way down to librdkafka (need to investigate). - // TODO: should we append the Produce methods with Async? Seems to be a convention. - - public Task Produce(string topic, TValue val) - => getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val)); - - public Task Produce(string topic, TKey key, TValue val) - => getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val), KeySerializer.Serialize(key)); - - // TODO: do we need both the callback way of doing this and the Task way? - // i think this was added late to rdkafka-dotnet, so there is probably a need. - // What about Task.ContinueWith? I belive this can even handle exceptions? - public void ProduceWithDeliveryReport(string topic, TValue val, IDeliveryHandler deliveryHandler) - => getKafkaTopic(topic).Produce(ValueSerializer.Serialize(val), deliveryHandler); - - // Explicitly keep reference to delegate so it stays alive - private static readonly LibRdKafka.DeliveryReportCallback DeliveryReportDelegate = DeliveryReportCallback; - - private static void DeliveryReportCallback(IntPtr rk, ref rd_kafka_message rkmessage, IntPtr opaque) + private static void DeliveryReportCallbackImpl(IntPtr rk, ref rd_kafka_message rkmessage, IntPtr opaque) { // msg_opaque was set by Topic.Produce var gch = GCHandle.FromIntPtr(rkmessage._private); @@ -88,10 +64,94 @@ private static void DeliveryReportCallback(IntPtr rk, ref rd_kafka_message rkmes return; } - deliveryHandler.SetResult(new DeliveryReport { - Offset = rkmessage.offset, - Partition = rkmessage.partition - }); + deliveryHandler.SetResult( + new DeliveryReport + { + Offset = rkmessage.offset, + Partition = rkmessage.partition + } + ); + } + + public ISerializingProducer Wrap(ISerializer keySerializer, ISerializer valueSerializer) + { + return new SerializingProducer(this, keySerializer, valueSerializer); + } + + public Task ProduceAsync(string topic, byte[] key, byte[] val, int? partition = null, bool blockIfQueueFull = true) + => getKafkaTopic(topic).Produce(val, 0, val?.Length ?? 0, key, 0, key?.Length ?? 0, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + + public Task ProduceAsync(string topic, ArraySegment key, ArraySegment val, int? partition = null, bool blockIfQueueFull = true) + => getKafkaTopic(topic).Produce(val.Array, val.Offset, val.Count, key.Array, key.Offset, key.Count, partition ?? RD_KAFKA_PARTITION_UA, blockIfQueueFull); + + } + + + internal class SerializingProducer : ISerializingProducer + { + protected readonly Producer producer; + + public ISerializer KeySerializer { get; } + + public ISerializer ValueSerializer { get; } + + public SerializingProducer(Producer producer, ISerializer keySerializer, ISerializer valueSerializer) + { + this.producer = producer; + KeySerializer = keySerializer; + ValueSerializer = valueSerializer; + + // TODO: allow serializers to be set in the producer config IEnumerable>. + + if (KeySerializer == null) + { + throw new ArgumentNullException("Key serializer must be specified."); + } + + if (ValueSerializer == null) + { + throw new ArgumentNullException("Value serializer must be specified."); + } + } + + public Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true) + => producer.ProduceAsync(topic, KeySerializer?.Serialize(key), ValueSerializer?.Serialize(val), partition, blockIfQueueFull); + + public string Name => producer.Name; + } + + + public class Producer : ISerializingProducer, IDisposable + { + private readonly Producer producer; + private readonly ISerializingProducer serializingProducer; + + + public ISerializer KeySerializer => serializingProducer.KeySerializer; + + public ISerializer ValueSerializer => serializingProducer.ValueSerializer; + + public string Name => serializingProducer.Name; + + // TODO: In the future, we can introduce a new overload that takes a Message + // rather than T, V and add timestamp, headers etc into that. + // TODO: put partition in there also and implement in this version. + + public Task ProduceAsync(string topic, TKey key, TValue val, int? partition = null, bool blockIfQueueFull = true) + => serializingProducer.ProduceAsync(topic, key, val, partition, blockIfQueueFull); + + public Producer( + IEnumerable> config, + ISerializer keySerializer, + ISerializer valueSerializer) + { + producer = new Producer(config); + serializingProducer = producer.Wrap(keySerializer, valueSerializer); } + + public void Flush() => producer.Flush(); + + public void Dispose() => producer.Dispose(); + } } diff --git a/src/Confluent.Kafka/Serialization/ByteArraySerializer.cs b/src/Confluent.Kafka/Serialization/ByteArraySerializer.cs deleted file mode 100644 index 3fb1d23b6..000000000 --- a/src/Confluent.Kafka/Serialization/ByteArraySerializer.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System.Text; - -namespace Confluent.Kafka.Serialization -{ - /// - /// A byte[] pass-through serializer. - /// - /// - /// The byte[] to pass through - /// - /// - /// A reference to . - /// - public class ByteArraySerializer : ISerializer - { - public byte[] Serialize(byte[] val) - { - return val; - } - } -} diff --git a/src/Confluent.Kafka/Serialization/ISerializer.cs b/src/Confluent.Kafka/Serialization/ISerializer.cs index 63fb8e7e4..e672f7c8a 100644 --- a/src/Confluent.Kafka/Serialization/ISerializer.cs +++ b/src/Confluent.Kafka/Serialization/ISerializer.cs @@ -1,5 +1,6 @@ namespace Confluent.Kafka.Serialization { + // TODO: Consider not having this. Consider replacing with Func. This would be more normal, and I don't think we need the flexibility provided by having an interface. public interface ISerializer { byte[] Serialize(T data); diff --git a/src/Confluent.Kafka/Serialization/IntDeserializer.cs b/src/Confluent.Kafka/Serialization/IntDeserializer.cs new file mode 100644 index 000000000..822786ebf --- /dev/null +++ b/src/Confluent.Kafka/Serialization/IntDeserializer.cs @@ -0,0 +1,16 @@ + +namespace Confluent.Kafka.Serialization +{ + public class IntDeserializer : IDeserializer + { + public int Deserialize(byte[] data) + { + // network byte order -> big endian -> most significant byte in the smallest address. + return + (((int)data[0]) << 24) | + (((int)data[1]) << 16) | + (((int)data[2]) << 8) | + (int)data[3]; + } + } +} diff --git a/src/Confluent.Kafka/Serialization/IntSerializer.cs b/src/Confluent.Kafka/Serialization/IntSerializer.cs new file mode 100644 index 000000000..719a0a79b --- /dev/null +++ b/src/Confluent.Kafka/Serialization/IntSerializer.cs @@ -0,0 +1,22 @@ + +// TODO: Consider whether we want to allow this serializer to handle nulls. Perhaps a separate serialzer that can? + +namespace Confluent.Kafka.Serialization +{ + public class IntSerializer : ISerializer + { + public byte[] Serialize(int val) + { + var result = new byte[4]; // int is always 32 bits on .NET. + // network byte order -> big endian -> most significant byte in the smallest address. + // Note: At the IL level, the conv.u1 operator is used to cast int to byte which truncates + // the high order bits if overflow occurs. + // https://msdn.microsoft.com/en-us/library/system.reflection.emit.opcodes.conv_u1.aspx + result[0] = (byte)(val >> 24); + result[1] = (byte)(val >> 16); // & 0xff; + result[2] = (byte)(val >> 8); // & 0xff; + result[3] = (byte)val; // & 0xff; + return result; + } + } +} diff --git a/src/Confluent.Kafka/Serialization/NullDeserializer.cs b/src/Confluent.Kafka/Serialization/NullDeserializer.cs new file mode 100644 index 000000000..cea351d43 --- /dev/null +++ b/src/Confluent.Kafka/Serialization/NullDeserializer.cs @@ -0,0 +1,10 @@ +namespace Confluent.Kafka.Serialization +{ + public class NullDeserializer : IDeserializer + { + public Null Deserialize(byte[] data) + { + return null; + } + } +} diff --git a/src/Confluent.Kafka/Serialization/NullSerializer.cs b/src/Confluent.Kafka/Serialization/NullSerializer.cs index 7b478c261..1ac6a9243 100644 --- a/src/Confluent.Kafka/Serialization/NullSerializer.cs +++ b/src/Confluent.Kafka/Serialization/NullSerializer.cs @@ -7,4 +7,4 @@ public byte[] Serialize(Null val) return null; } } -} +} \ No newline at end of file diff --git a/src/Confluent.Kafka/Serialization/StringDeserializer.cs b/src/Confluent.Kafka/Serialization/StringDeserializer.cs new file mode 100644 index 000000000..fa77cd06e --- /dev/null +++ b/src/Confluent.Kafka/Serialization/StringDeserializer.cs @@ -0,0 +1,22 @@ +using System.Text; + +// TODO: Deserializer needs to be able to handle nulls and differentiate between null and empty string. + + +namespace Confluent.Kafka.Serialization +{ + public class StringDeserializer : IDeserializer + { + Encoding encoding; + + StringDeserializer(Encoding encoding) + { + this.encoding = encoding; + } + + public string Deserialize(byte[] data) + { + return Encoding.UTF8.GetString(data); + } + } +} diff --git a/src/Confluent.Kafka/Serialization/StringSerializer.cs b/src/Confluent.Kafka/Serialization/StringSerializer.cs new file mode 100644 index 000000000..98acb1512 --- /dev/null +++ b/src/Confluent.Kafka/Serialization/StringSerializer.cs @@ -0,0 +1,34 @@ +using System.Text; + +// TODO: Serializer needs to be able to handle nulls and differentiate between null and empty string. + +namespace Confluent.Kafka.Serialization +{ + /// + /// String serializer. + /// + /// + /// The string value to serialize. + /// + /// + /// encoded in a byte array. + /// + /// + /// cannot be null. + /// TODO: well it shouldn't be other there is ambiguity on deserialization. check this. + /// + public class StringSerializer : ISerializer + { + private Encoding encoding; + + public StringSerializer(Encoding encoding) + { + this.encoding = encoding; + } + + public byte[] Serialize(string val) + { + return encoding.GetBytes(val); + } + } +} diff --git a/src/Confluent.Kafka/Serialization/Utf8StringDeserializer.cs b/src/Confluent.Kafka/Serialization/Utf8StringDeserializer.cs deleted file mode 100644 index 1104dcfa9..000000000 --- a/src/Confluent.Kafka/Serialization/Utf8StringDeserializer.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Text; - -namespace Confluent.Kafka.Serialization -{ - public class Utf8StringDeserializer : IDeserializer - { - public string Deserialize(byte[] data) - { - return Encoding.UTF8.GetString(data); - } - } -} diff --git a/src/Confluent.Kafka/Serialization/Utf8StringSerializer.cs b/src/Confluent.Kafka/Serialization/Utf8StringSerializer.cs deleted file mode 100644 index 969736e40..000000000 --- a/src/Confluent.Kafka/Serialization/Utf8StringSerializer.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System.Text; - -namespace Confluent.Kafka.Serialization -{ - // TODO: Perhaps have a general string serializer + encoding as a type parameter - - /// - /// A UTF-8 encoding string serializer. - /// - /// - /// The string value to serialize. - /// - /// - /// encoded in a UTF-8 byte array. - /// - /// - /// cannot be null. - /// - public class Utf8StringSerializer : ISerializer - { - public byte[] Serialize(string val) - { - return Encoding.UTF8.GetBytes(val); - } - } -} diff --git a/src/Confluent.Kafka/Topic.cs b/src/Confluent.Kafka/Topic.cs index 76cd39785..946b045ae 100644 --- a/src/Confluent.Kafka/Topic.cs +++ b/src/Confluent.Kafka/Topic.cs @@ -6,12 +6,6 @@ namespace Confluent.Kafka { - public struct DeliveryReport - { - public int Partition; - public long Offset; - } - // TODO: We want to get rid of this class entirely. /// @@ -62,96 +56,23 @@ public void Dispose() public string Name => handle.GetName(); - public Task Produce(byte[] val, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) + public Task Produce(byte[] val, int valOffset, int valLength, byte[] key = null, int keyOffset = 0, int keyLength = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) { - return Produce(val, val?.Length ?? 0, key, key?.Length ?? 0, partition, blockIfQueueFull); - } + // Passes the TaskCompletionSource to the delivery report callback via the msg_opaque pointer - public Task Produce(byte[] val, int valLength, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) - { - // Passes the TaskCompletionSource to the delivery report callback - // via the msg_opaque pointer var deliveryCompletionSource = new TaskDeliveryHandler(); - Produce(val, valLength, key, keyCount, partition, deliveryCompletionSource, blockIfQueueFull); - return deliveryCompletionSource.Task; - } - - /// - /// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface. - /// - /// - /// Value to send to Kafka. Can be null. - /// - /// - /// IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs. - /// - /// - /// (Optional) The key associated with (or null if no key is specified). - /// - /// - /// (Optional) The topic partition to which will be sent (or -1 if no partition is specified). - /// - /// - /// Thrown if is null. - /// - /// - /// Methods of will be executed in an RdKafka internal thread and will block other operations - /// - consider this when implementing IDeliveryHandler. - /// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations. - /// - public void Produce(byte[] val, IDeliveryHandler deliveryHandler, byte[] key = null, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) - { - Produce(val, val?.Length ?? 0, deliveryHandler, key, key?.Length ?? 0, partition, blockIfQueueFull); - } - - /// - /// Produces a keyed message to a partition of the current Topic and notifies the caller of progress via a callback interface. - /// - /// - /// Value to send to Kafka. Can be null. - /// - /// - /// Number of bytes to use from val buffer - /// - /// - /// IDeliveryHandler implementation used to notify the caller when the given produce request completes or an error occurs. - /// - /// - /// (Optional) The key associated with (or null if no key is specified). - /// - /// - /// Number of bytes to use from key buffer - /// - /// - /// (Optional) The topic partition to which will be sent (or -1 if no partition is specified). - /// - /// - /// Thrown if is null. - /// - /// - /// Methods of will be executed in an RdKafka-internal thread and will block other operations - /// - consider this when implementing IDeliveryHandler. - /// Use this overload for high-performance use cases as it does not use TPL and reduces the number of allocations. - /// - public void Produce(byte[] value, int valLength, IDeliveryHandler deliveryHandler, byte[] key = null, int keyCount = 0, Int32 partition = RD_KAFKA_PARTITION_UA, bool blockIfQueueFull = true) - { - if (deliveryHandler == null) - throw new ArgumentNullException(nameof(deliveryHandler)); - Produce(value, valLength, key, keyCount, partition, deliveryHandler, blockIfQueueFull); - } - - - private void Produce(byte[] val, int valLength, byte[] key, int keyCount, Int32 partition, object deliveryHandler, bool blockIfQueueFull) - { - var gch = GCHandle.Alloc(deliveryHandler); + var gch = GCHandle.Alloc(deliveryCompletionSource); var ptr = GCHandle.ToIntPtr(gch); - if (handle.Produce(val, valLength, key, keyCount, partition, ptr, blockIfQueueFull) != 0) + if (handle.Produce(val, valOffset, valLength, key, keyOffset, keyLength, partition, ptr, blockIfQueueFull) != 0) { var err = LibRdKafka.last_error(); gch.Free(); + // TODO: Use proper error string (rd_kafka_err2str(..last_error)) throw RdKafkaException.FromErr(err, "Could not produce message"); } + + return deliveryCompletionSource.Task; } /// @@ -162,5 +83,6 @@ private void Produce(byte[] val, int valLength, byte[] key, int keyCount, Int32 /// This function must only be called from inside a partitioner function. /// public bool PartitionAvailable(int partition) => handle.PartitionAvailable(partition); + } } diff --git a/src/Confluent.Kafka/project.json b/src/Confluent.Kafka/project.json index e44a12723..5a951b93b 100644 --- a/src/Confluent.Kafka/project.json +++ b/src/Confluent.Kafka/project.json @@ -18,6 +18,7 @@ "System.Console": "4.0.0", "System.Linq": "4.1.0", "System.Runtime.InteropServices": "4.1.0", + "System.Runtime.Extensions": "4.1.0", "System.Threading": "4.0.11" } } diff --git a/test/Confluent.Kafka.Tests/IntSerdeTests.cs b/test/Confluent.Kafka.Tests/IntSerdeTests.cs new file mode 100644 index 000000000..3653a3a30 --- /dev/null +++ b/test/Confluent.Kafka.Tests/IntSerdeTests.cs @@ -0,0 +1,64 @@ +using System; +using Xunit; +using Confluent.Kafka.Serialization; + + +namespace Confluent.Kafka.Tests +{ + public class IntSerdeTests + { + private static readonly int[] toTest = new int[] + { + 0, 1, -1, 42, -42, 127, 128, 128, -127, -128, -129, + 254, 255, 256, 257, -254, -255, -256, -257, + (int)short.MinValue-1, (int)short.MinValue, (int)short.MinValue+1, + (int)short.MaxValue-1, (int)short.MaxValue, (int)short.MaxValue+1, + int.MaxValue-1, int.MaxValue, int.MinValue, int.MinValue + 1 + }; + + [Fact] + public void IsBigEndian() + { + var serializer = new IntSerializer(); + var bytes = serializer.Serialize(42); + Assert.Equal(bytes.Length, 4); + // most significant byte in smallest address. + Assert.Equal(bytes[0], 0); + Assert.Equal(bytes[3], 42); + } + + [Fact] + public void SerializationAgreesWithSystemNetHostToNetworkOrder() + { + foreach (int theInt in toTest) + { + int networkOrder = System.Net.IPAddress.HostToNetworkOrder(theInt); + var bytes1 = BitConverter.GetBytes(networkOrder); + + var serializer = new IntSerializer(); + var bytes2 = serializer.Serialize(theInt); + + Assert.Equal(bytes1.Length, bytes2.Length); + + for (int i=0; i