From 442e11cbca7209bcb90bbc81881f58d503105a1a Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Fri, 13 Sep 2024 17:19:29 -0400 Subject: [PATCH 01/13] Natively support Memory --- src/Confluent.Kafka/Impl/SafeKafkaHandle.cs | 60 ++++----- src/Confluent.Kafka/Producer.cs | 118 +++++++++++++----- .../Tests/Producer_Produce_Async.cs | 31 +++++ 3 files changed, 141 insertions(+), 68 deletions(-) diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs index 60d0e34ba..ce9b98bec 100644 --- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs @@ -18,6 +18,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; @@ -357,45 +358,35 @@ private IntPtr marshalHeaders(IReadOnlyList headers) return headersPtr; } - internal ErrorCode Produce( + internal unsafe ErrorCode Produce( string topic, - byte[] val, int valOffset, int valLength, - byte[] key, int keyOffset, int keyLength, + ReadOnlyMemory? val, + ReadOnlyMemory? key, int partition, long timestamp, IReadOnlyList headers, IntPtr opaque) { - var pValue = IntPtr.Zero; - var pKey = IntPtr.Zero; + MemoryHandle? valueHandle = null; + IntPtr valuePtr = IntPtr.Zero; + UIntPtr valueLength = UIntPtr.Zero; - var gchValue = default(GCHandle); - var gchKey = default(GCHandle); + MemoryHandle? keyHandle = null; + IntPtr keyPtr = IntPtr.Zero; + UIntPtr keyLength = UIntPtr.Zero; - if (val == null) + if (val != null) { - if (valOffset != 0 || valLength != 0) - { - throw new ArgumentException("valOffset and valLength parameters must be 0 when producing null values."); - } - } - else - { - gchValue = GCHandle.Alloc(val, GCHandleType.Pinned); - pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset); + valueHandle = val.Value.Pin(); + valuePtr = (IntPtr)valueHandle.Value.Pointer; + valueLength = (UIntPtr)val.Value.Length; } - if (key == null) + if (key != null) { - if (keyOffset != 0 || keyLength != 0) - { - throw new ArgumentException("keyOffset and keyLength parameters must be 0 when producing null key values."); - } - } - else - { - gchKey = GCHandle.Alloc(key, GCHandleType.Pinned); - pKey = Marshal.UnsafeAddrOfPinnedArrayElement(key, keyOffset); + keyHandle = key.Value.Pin(); + keyPtr = (IntPtr)keyHandle.Value.Pointer; + keyLength = (UIntPtr)key.Value.Length; } IntPtr headersPtr = marshalHeaders(headers); @@ -407,8 +398,8 @@ internal ErrorCode Produce( topic, partition, (IntPtr)MsgFlags.MSG_F_COPY, - pValue, (UIntPtr)valLength, - pKey, (UIntPtr)keyLength, + valuePtr, valueLength, + keyPtr, keyLength, timestamp, headersPtr, opaque); @@ -433,15 +424,8 @@ internal ErrorCode Produce( } finally { - if (val != null) - { - gchValue.Free(); - } - - if (key != null) - { - gchKey.Free(); - } + valueHandle?.Dispose(); + keyHandle?.Dispose(); } } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..2ba61f8b6 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -279,8 +279,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq private void ProduceImpl( string topic, - byte[] val, int valOffset, int valLength, - byte[] key, int keyOffset, int keyLength, + ReadOnlyMemory val, + ReadOnlyMemory key, Timestamp timestamp, Partition partition, IReadOnlyList headers, @@ -308,8 +308,8 @@ private void ProduceImpl( err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + val, + key, partition.Value, timestamp.UnixTimestampMs, headers, @@ -325,8 +325,8 @@ private void ProduceImpl( { err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + val, + key, partition.Value, timestamp.UnixTimestampMs, headers, @@ -506,7 +506,14 @@ private void InitializeSerializers( IAsyncSerializer asyncValueSerializer) { // setup key serializer. - if (keySerializer == null && asyncKeySerializer == null) + if (typeof(TKey) == typeof(Memory) || typeof(TKey) == typeof(ReadOnlyMemory)) + { + if (keySerializer != null || asyncKeySerializer != null) + { + throw new ArgumentNullException(null, "Key serializer should not be specified for Memory"); + } + } + else if (keySerializer == null && asyncKeySerializer == null) { if (!defaultSerializers.TryGetValue(typeof(TKey), out object serializer)) { @@ -529,7 +536,14 @@ private void InitializeSerializers( } // setup value serializer. - if (valueSerializer == null && asyncValueSerializer == null) + if (typeof(TValue) == typeof(Memory) || typeof(TValue) == typeof(ReadOnlyMemory)) + { + if (valueSerializer != null || asyncValueSerializer != null) + { + throw new ArgumentNullException(null, "Value serializer should not be specified for Memory"); + } + } + else if (valueSerializer == null && asyncValueSerializer == null) { if (!defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) { @@ -750,12 +764,23 @@ public async Task> ProduceAsync( { Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; + ReadOnlyMemory keyBytes; try { - keyBytes = (keySerializer != null) - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) - : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); + if (message.Key is Memory memory) + { + keyBytes = memory; + } + else if (message.Key is ReadOnlyMemory readOnlyMemory) + { + keyBytes = readOnlyMemory; + } + else + { + keyBytes = (keySerializer != null) + ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) + : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); + } } catch (Exception ex) { @@ -769,12 +794,23 @@ public async Task> ProduceAsync( ex); } - byte[] valBytes; + ReadOnlyMemory valBytes; try { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); + if (message.Value is Memory memory) + { + valBytes = memory; + } + else if (message.Value is ReadOnlyMemory readOnlyMemory) + { + valBytes = readOnlyMemory; + } + else + { + valBytes = (valueSerializer != null) + ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); + } } catch (Exception ex) { @@ -805,8 +841,8 @@ public async Task> ProduceAsync( ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes, + keyBytes, message.Timestamp, topicPartition.Partition, headers.BackingList, handler); @@ -816,8 +852,8 @@ public async Task> ProduceAsync( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes, + keyBytes, message.Timestamp, topicPartition.Partition, headers.BackingList, null); @@ -873,12 +909,23 @@ public void Produce( Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; + ReadOnlyMemory keyBytes; try { - keyBytes = (keySerializer != null) - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); + if (message.Key is Memory memory) + { + keyBytes = memory; + } + else if (message.Key is ReadOnlyMemory readOnlyMemory) + { + keyBytes = readOnlyMemory; + } + else + { + keyBytes = (keySerializer != null) + ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) + : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); + } } catch (Exception ex) { @@ -892,12 +939,23 @@ public void Produce( ex); } - byte[] valBytes; + ReadOnlyMemory valBytes; try { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + if (message.Value is Memory memory) + { + valBytes = memory; + } + else if (message.Value is ReadOnlyMemory readOnlyMemory) + { + valBytes = readOnlyMemory; + } + else + { + valBytes = (valueSerializer != null) + ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + } } catch (Exception ex) { @@ -915,8 +973,8 @@ public void Produce( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes, + keyBytes, message.Timestamp, topicPartition.Partition, headers.BackingList, deliveryHandler == null diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs index eba94683b..06c0ecf40 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs @@ -60,5 +60,36 @@ public void Producer_Produce_Async(string bootstrapServers) Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce_Async"); } + + [Theory, MemberData(nameof(KafkaParameters))] + public void Producer_Produce_Memory_Async(string bootstrapServers) + { + LogToFile("start Producer_Produce_Memory_Async"); + + var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers }; + + using (var testTopic = new TemporaryTopic(bootstrapServers, 1)) + using (var producer = new TestProducerBuilder>(producerConfig) + .Build()) + using (var dProducer = new DependentProducerBuilder, Null>(producer.Handle) + .Build()) + { + Memory data = new byte[] { 1, 2, 3, 4 }; + Assert.Throws>>( + () => producer.Produce(testTopic.Name, new Message> { Value = data })); + + Assert.Throws>>( + () => producer.Produce(testTopic.Name, new Message> { Value = data }, dr => { Assert.True(false); })); + + Assert.Throws, Null>>( + () => dProducer.Produce(testTopic.Name, new Message, Null> { Key = data })); + + Assert.Throws, Null>>( + () => dProducer.Produce(testTopic.Name, new Message, Null> { Key = data }, dr => { Assert.True(false); })); + } + + Assert.Equal(0, Library.HandleCount); + LogToFile("end Producer_Produce_Memory_Async"); + } } } From ae63cfaaddd16bcc5e6f4437e8a7573d58aff93b Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Fri, 13 Sep 2024 18:18:33 -0400 Subject: [PATCH 02/13] Allow having a serializer with Memory --- src/Confluent.Kafka/Producer.cs | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 2ba61f8b6..d6a67c5e5 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -506,21 +506,21 @@ private void InitializeSerializers( IAsyncSerializer asyncValueSerializer) { // setup key serializer. - if (typeof(TKey) == typeof(Memory) || typeof(TKey) == typeof(ReadOnlyMemory)) + if (keySerializer == null && asyncKeySerializer == null) { - if (keySerializer != null || asyncKeySerializer != null) + if (defaultSerializers.TryGetValue(typeof(TKey), out object serializer)) { - throw new ArgumentNullException(null, "Key serializer should not be specified for Memory"); + this.keySerializer = (ISerializer)serializer; } - } - else if (keySerializer == null && asyncKeySerializer == null) - { - if (!defaultSerializers.TryGetValue(typeof(TKey), out object serializer)) + else if (typeof(TValue) == typeof(Memory) || typeof(TValue) == typeof(ReadOnlyMemory)) + { + // Serializers are not used for Memory. + } + else { throw new ArgumentNullException( $"Key serializer not specified and there is no default serializer defined for type {typeof(TKey).Name}."); } - this.keySerializer = (ISerializer)serializer; } else if (keySerializer == null && asyncKeySerializer != null) { @@ -536,21 +536,22 @@ private void InitializeSerializers( } // setup value serializer. - if (typeof(TValue) == typeof(Memory) || typeof(TValue) == typeof(ReadOnlyMemory)) + if (valueSerializer == null && asyncValueSerializer == null) { - if (valueSerializer != null || asyncValueSerializer != null) + if (defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) { - throw new ArgumentNullException(null, "Value serializer should not be specified for Memory"); + this.valueSerializer = (ISerializer)serializer; } - } - else if (valueSerializer == null && asyncValueSerializer == null) - { - if (!defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) + else if (typeof(TValue) == typeof(Memory) || typeof(TValue) == typeof(ReadOnlyMemory)) + { + // Serializers are not used for Memory. + } + else { throw new ArgumentNullException( $"Value serializer not specified and there is no default serializer defined for type {typeof(TValue).Name}."); } - this.valueSerializer = (ISerializer)serializer; + } else if (valueSerializer == null && asyncValueSerializer != null) { From 62aa07492a3facb8e618817660a7991d69f63eec Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Fri, 13 Sep 2024 18:39:08 -0400 Subject: [PATCH 03/13] Fix null values beingconverted to empty array --- src/Confluent.Kafka/Producer.cs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index d6a67c5e5..97fad5d44 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -279,8 +279,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq private void ProduceImpl( string topic, - ReadOnlyMemory val, - ReadOnlyMemory key, + ReadOnlyMemory? val, + ReadOnlyMemory? key, Timestamp timestamp, Partition partition, IReadOnlyList headers, @@ -765,7 +765,7 @@ public async Task> ProduceAsync( { Headers headers = message.Headers ?? new Headers(); - ReadOnlyMemory keyBytes; + ReadOnlyMemory? keyBytes; try { if (message.Key is Memory memory) @@ -778,9 +778,10 @@ public async Task> ProduceAsync( } else { - keyBytes = (keySerializer != null) + byte[] keyBytesArray = keySerializer != null ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); + keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; } } catch (Exception ex) @@ -795,7 +796,7 @@ public async Task> ProduceAsync( ex); } - ReadOnlyMemory valBytes; + ReadOnlyMemory? valBytes; try { if (message.Value is Memory memory) @@ -808,9 +809,10 @@ public async Task> ProduceAsync( } else { - valBytes = (valueSerializer != null) + byte[] valBytesArray = valueSerializer != null ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); + valBytes = valBytesArray == null ? (ReadOnlyMemory?)null : valBytesArray; } } catch (Exception ex) @@ -910,7 +912,7 @@ public void Produce( Headers headers = message.Headers ?? new Headers(); - ReadOnlyMemory keyBytes; + ReadOnlyMemory? keyBytes; try { if (message.Key is Memory memory) @@ -923,9 +925,10 @@ public void Produce( } else { - keyBytes = (keySerializer != null) + byte[] keyBytesArray = keySerializer != null ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); + keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; } } catch (Exception ex) @@ -940,7 +943,7 @@ public void Produce( ex); } - ReadOnlyMemory valBytes; + ReadOnlyMemory? valBytes; try { if (message.Value is Memory memory) @@ -953,9 +956,10 @@ public void Produce( } else { - valBytes = (valueSerializer != null) + byte[] valBytesArray = valueSerializer != null ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + valBytes = valBytesArray == null ? (ReadOnlyMemory?)null : valBytesArray; } } catch (Exception ex) From 53db4d92065beaf7b4ddd2a28244102908c7ede6 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Fri, 13 Sep 2024 19:10:53 -0400 Subject: [PATCH 04/13] Support nullable memory too --- src/Confluent.Kafka/Producer.cs | 92 +++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 97fad5d44..83fa635a1 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -512,7 +512,10 @@ private void InitializeSerializers( { this.keySerializer = (ISerializer)serializer; } - else if (typeof(TValue) == typeof(Memory) || typeof(TValue) == typeof(ReadOnlyMemory)) + else if (typeof(TKey) == typeof(Memory) + || typeof(TKey) == typeof(ReadOnlyMemory) + || typeof(TKey) == typeof(Memory?) + || typeof(TKey) == typeof(ReadOnlyMemory?)) { // Serializers are not used for Memory. } @@ -542,7 +545,10 @@ private void InitializeSerializers( { this.valueSerializer = (ISerializer)serializer; } - else if (typeof(TValue) == typeof(Memory) || typeof(TValue) == typeof(ReadOnlyMemory)) + else if (typeof(TValue) == typeof(Memory) + || typeof(TValue) == typeof(ReadOnlyMemory) + || typeof(TValue) == typeof(Memory?) + || typeof(TValue) == typeof(ReadOnlyMemory?)) { // Serializers are not used for Memory. } @@ -765,10 +771,20 @@ public async Task> ProduceAsync( { Headers headers = message.Headers ?? new Headers(); - ReadOnlyMemory? keyBytes; + ReadOnlyMemory? keyBytes = null; try { - if (message.Key is Memory memory) + if (keySerializer != null) + { + byte[] keyBytesArray = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)); + keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; + } + else if (asyncKeySerializer != null) + { + byte[] keyBytesArray = await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); + keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; + } + else if (message.Key is Memory memory) { keyBytes = memory; } @@ -776,13 +792,6 @@ public async Task> ProduceAsync( { keyBytes = readOnlyMemory; } - else - { - byte[] keyBytesArray = keySerializer != null - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) - : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); - keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; - } } catch (Exception ex) { @@ -796,10 +805,20 @@ public async Task> ProduceAsync( ex); } - ReadOnlyMemory? valBytes; + ReadOnlyMemory? valBytes = null; try { - if (message.Value is Memory memory) + if (valueSerializer != null) + { + byte[] valueBytesArray = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)); + valBytes = valueBytesArray == null ? (ReadOnlyMemory?)null : valueBytesArray; + } + else if (asyncValueSerializer != null) + { + byte[] valueBytesArray = await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); + valBytes = valueBytesArray == null ? (ReadOnlyMemory?)null : valueBytesArray; + } + else if (message.Value is Memory memory) { valBytes = memory; } @@ -807,13 +826,6 @@ public async Task> ProduceAsync( { valBytes = readOnlyMemory; } - else - { - byte[] valBytesArray = valueSerializer != null - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); - valBytes = valBytesArray == null ? (ReadOnlyMemory?)null : valBytesArray; - } } catch (Exception ex) { @@ -912,10 +924,19 @@ public void Produce( Headers headers = message.Headers ?? new Headers(); - ReadOnlyMemory? keyBytes; + ReadOnlyMemory? keyBytes = null; try { - if (message.Key is Memory memory) + if (keySerializer != null) + { + byte[] keyBytesArray = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)); + keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; + } + else if (asyncKeySerializer != null) + { + throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); + } + else if (message.Key is Memory memory) { keyBytes = memory; } @@ -923,13 +944,6 @@ public void Produce( { keyBytes = readOnlyMemory; } - else - { - byte[] keyBytesArray = keySerializer != null - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); - keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; - } } catch (Exception ex) { @@ -943,10 +957,19 @@ public void Produce( ex); } - ReadOnlyMemory? valBytes; + ReadOnlyMemory? valBytes = null; try { - if (message.Value is Memory memory) + if (valueSerializer != null) + { + byte[] valueBytesArray = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)); + valBytes = valueBytesArray == null ? (ReadOnlyMemory?)null : valueBytesArray; + } + else if (asyncValueSerializer != null) + { + throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + } + else if (message.Value is Memory memory) { valBytes = memory; } @@ -954,13 +977,6 @@ public void Produce( { valBytes = readOnlyMemory; } - else - { - byte[] valBytesArray = valueSerializer != null - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); - valBytes = valBytesArray == null ? (ReadOnlyMemory?)null : valBytesArray; - } } catch (Exception ex) { From 8e55eaad987994e5af0a5b8bbc27183537143ab8 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Fri, 13 Sep 2024 21:00:47 -0400 Subject: [PATCH 05/13] Use AsMemory --- src/Confluent.Kafka/Producer.cs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 83fa635a1..641946296 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -557,7 +557,6 @@ private void InitializeSerializers( throw new ArgumentNullException( $"Value serializer not specified and there is no default serializer defined for type {typeof(TValue).Name}."); } - } else if (valueSerializer == null && asyncValueSerializer != null) { @@ -776,13 +775,13 @@ public async Task> ProduceAsync( { if (keySerializer != null) { - byte[] keyBytesArray = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)); - keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; + keyBytes = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))?.AsMemory(); } else if (asyncKeySerializer != null) { - byte[] keyBytesArray = await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); - keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; + keyBytes = (await asyncKeySerializer.SerializeAsync(message.Key, + new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) + .ConfigureAwait(false))?.AsMemory(); } else if (message.Key is Memory memory) { @@ -810,13 +809,13 @@ public async Task> ProduceAsync( { if (valueSerializer != null) { - byte[] valueBytesArray = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)); - valBytes = valueBytesArray == null ? (ReadOnlyMemory?)null : valueBytesArray; + valBytes = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))?.AsMemory(); } else if (asyncValueSerializer != null) { - byte[] valueBytesArray = await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); - valBytes = valueBytesArray == null ? (ReadOnlyMemory?)null : valueBytesArray; + valBytes = (await asyncValueSerializer.SerializeAsync(message.Value, + new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + .ConfigureAwait(false))?.AsMemory(); } else if (message.Value is Memory memory) { @@ -929,8 +928,7 @@ public void Produce( { if (keySerializer != null) { - byte[] keyBytesArray = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)); - keyBytes = keyBytesArray == null ? (ReadOnlyMemory?)null : keyBytesArray; + keyBytes = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))?.AsMemory(); } else if (asyncKeySerializer != null) { @@ -962,8 +960,7 @@ public void Produce( { if (valueSerializer != null) { - byte[] valueBytesArray = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)); - valBytes = valueBytesArray == null ? (ReadOnlyMemory?)null : valueBytesArray; + valBytes = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))?.AsMemory(); } else if (asyncValueSerializer != null) { From 70e6dfde8ee21a0b54509905b6c29a7155f1c965 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Fri, 13 Sep 2024 21:06:31 -0400 Subject: [PATCH 06/13] Create variable for SerializationContext --- src/Confluent.Kafka/Producer.cs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 641946296..389ab2035 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -775,13 +775,13 @@ public async Task> ProduceAsync( { if (keySerializer != null) { - keyBytes = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))?.AsMemory(); + SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers); + keyBytes = keySerializer.Serialize(message.Key, ctx)?.AsMemory(); } else if (asyncKeySerializer != null) { - keyBytes = (await asyncKeySerializer.SerializeAsync(message.Key, - new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) - .ConfigureAwait(false))?.AsMemory(); + SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers); + keyBytes = (await asyncKeySerializer.SerializeAsync(message.Key, ctx).ConfigureAwait(false))?.AsMemory(); } else if (message.Key is Memory memory) { @@ -809,13 +809,13 @@ public async Task> ProduceAsync( { if (valueSerializer != null) { - valBytes = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))?.AsMemory(); + SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers); + valBytes = valueSerializer.Serialize(message.Value, ctx)?.AsMemory(); } else if (asyncValueSerializer != null) { - valBytes = (await asyncValueSerializer.SerializeAsync(message.Value, - new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - .ConfigureAwait(false))?.AsMemory(); + SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers); + valBytes = (await asyncValueSerializer.SerializeAsync(message.Value, ctx).ConfigureAwait(false))?.AsMemory(); } else if (message.Value is Memory memory) { @@ -928,7 +928,8 @@ public void Produce( { if (keySerializer != null) { - keyBytes = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))?.AsMemory(); + SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers); + keyBytes = keySerializer.Serialize(message.Key, ctx)?.AsMemory(); } else if (asyncKeySerializer != null) { @@ -960,7 +961,8 @@ public void Produce( { if (valueSerializer != null) { - valBytes = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))?.AsMemory(); + SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers); + valBytes = valueSerializer.Serialize(message.Value, ctx)?.AsMemory(); } else if (asyncValueSerializer != null) { From 2615d178d58f40549cd46962805374e545f461c2 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Fri, 13 Sep 2024 22:16:39 -0400 Subject: [PATCH 07/13] Explicit the null behavior --- src/Confluent.Kafka/Producer.cs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 389ab2035..7feaf51de 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -770,7 +770,7 @@ public async Task> ProduceAsync( { Headers headers = message.Headers ?? new Headers(); - ReadOnlyMemory? keyBytes = null; + ReadOnlyMemory? keyBytes; try { if (keySerializer != null) @@ -791,6 +791,10 @@ public async Task> ProduceAsync( { keyBytes = readOnlyMemory; } + else // Nullable Memory + { + keyBytes = null; + } } catch (Exception ex) { @@ -804,7 +808,7 @@ public async Task> ProduceAsync( ex); } - ReadOnlyMemory? valBytes = null; + ReadOnlyMemory? valBytes; try { if (valueSerializer != null) @@ -825,6 +829,10 @@ public async Task> ProduceAsync( { valBytes = readOnlyMemory; } + else // Nullable Memory + { + valBytes = null; + } } catch (Exception ex) { @@ -923,7 +931,7 @@ public void Produce( Headers headers = message.Headers ?? new Headers(); - ReadOnlyMemory? keyBytes = null; + ReadOnlyMemory? keyBytes; try { if (keySerializer != null) @@ -943,6 +951,10 @@ public void Produce( { keyBytes = readOnlyMemory; } + else // Nullable Memory + { + keyBytes = null; + } } catch (Exception ex) { @@ -956,7 +968,7 @@ public void Produce( ex); } - ReadOnlyMemory? valBytes = null; + ReadOnlyMemory? valBytes; try { if (valueSerializer != null) @@ -976,6 +988,10 @@ public void Produce( { valBytes = readOnlyMemory; } + else // Nullable Memory + { + valBytes = null; + } } catch (Exception ex) { From 8d666d9b4b8da75f858d0314a61fbd796c686828 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Sun, 15 Sep 2024 17:43:41 -0400 Subject: [PATCH 08/13] Add tests for Memory? --- src/Confluent.Kafka/Producer.cs | 8 ++++---- .../Tests/Producer_Produce_Async.cs | 20 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 7feaf51de..d49e701d1 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -791,7 +791,7 @@ public async Task> ProduceAsync( { keyBytes = readOnlyMemory; } - else // Nullable Memory + else // Null Memory? { keyBytes = null; } @@ -829,7 +829,7 @@ public async Task> ProduceAsync( { valBytes = readOnlyMemory; } - else // Nullable Memory + else // Null Memory? { valBytes = null; } @@ -951,7 +951,7 @@ public void Produce( { keyBytes = readOnlyMemory; } - else // Nullable Memory + else // Null Memory? { keyBytes = null; } @@ -988,7 +988,7 @@ public void Produce( { valBytes = readOnlyMemory; } - else // Nullable Memory + else // Null Memory? { valBytes = null; } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs index 06c0ecf40..14bd07c06 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs @@ -69,23 +69,23 @@ public void Producer_Produce_Memory_Async(string bootstrapServers) var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers }; using (var testTopic = new TemporaryTopic(bootstrapServers, 1)) - using (var producer = new TestProducerBuilder>(producerConfig) + using (var producer = new TestProducerBuilder?, Memory>(producerConfig) .Build()) - using (var dProducer = new DependentProducerBuilder, Null>(producer.Handle) + using (var dProducer = new DependentProducerBuilder, ReadOnlyMemory?>(producer.Handle) .Build()) { Memory data = new byte[] { 1, 2, 3, 4 }; - Assert.Throws>>( - () => producer.Produce(testTopic.Name, new Message> { Value = data })); + Assert.Throws?, Memory>>( + () => producer.Produce(testTopic.Name, new Message?, Memory> { Value = data })); - Assert.Throws>>( - () => producer.Produce(testTopic.Name, new Message> { Value = data }, dr => { Assert.True(false); })); + Assert.Throws?, Memory>>( + () => producer.Produce(testTopic.Name, new Message?, Memory> { Value = data }, dr => { Assert.True(false); })); - Assert.Throws, Null>>( - () => dProducer.Produce(testTopic.Name, new Message, Null> { Key = data })); + Assert.Throws, ReadOnlyMemory?>>( + () => dProducer.Produce(testTopic.Name, new Message, ReadOnlyMemory?> { Key = data })); - Assert.Throws, Null>>( - () => dProducer.Produce(testTopic.Name, new Message, Null> { Key = data }, dr => { Assert.True(false); })); + Assert.Throws, ReadOnlyMemory?>>( + () => dProducer.Produce(testTopic.Name, new Message, ReadOnlyMemory?> { Key = data }, dr => { Assert.True(false); })); } Assert.Equal(0, Library.HandleCount); From 22f46d4c1a03cfafbc4d3fada107140e1197ac66 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Mon, 18 Nov 2024 10:35:46 +0100 Subject: [PATCH 09/13] Improve tests --- .../Tests/Producer_Produce.cs | 65 +++++++++++++++++++ .../Tests/Producer_ProduceAsync_Error.cs | 34 ++++++++++ .../Tests/Producer_ProduceAsync_Null_Task.cs | 23 +++++++ .../Tests/Producer_ProduceAsync_Task.cs | 58 +++++++++++++++++ .../Tests/Producer_Produce_Async.cs | 31 --------- .../Tests/Producer_Produce_Error.cs | 23 +++++++ .../Tests/Producer_Produce_Null.cs | 25 +++++++ 7 files changed, 228 insertions(+), 31 deletions(-) diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs index e21ceffa8..2361b572b 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs @@ -107,6 +107,71 @@ public void Producer_Produce(string bootstrapServers) Assert.Equal(2, count); + // Memory case. + + count = 0; + Action, ReadOnlyMemory>> dh3 = dr => + { + Assert.Equal(ErrorCode.NoError, dr.Error.Code); + Assert.Equal(PersistenceStatus.Persisted, dr.Status); + Assert.Equal((Partition)0, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.Equal($"test key {count + 42}", Encoding.UTF8.GetString(dr.Message.Key.Span)); + Assert.Equal($"test val {count + 42}", Encoding.UTF8.GetString(dr.Message.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + count += 1; + }; + + using (var producer = new TestProducerBuilder, ReadOnlyMemory>(producerConfig).Build()) + { + producer.Produce( + new TopicPartition(singlePartitionTopic, 0), + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 42"), Value = Encoding.UTF8.GetBytes("test val 42") }, dh3); + + producer.Produce( + singlePartitionTopic, + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 43"), Value = Encoding.UTF8.GetBytes("test val 43") }, dh3); + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(2, count); + + // Memory? case. + + count = 0; + Action?, Memory?>> dh4 = dr => + { + Assert.Equal(ErrorCode.NoError, dr.Error.Code); + Assert.Equal(PersistenceStatus.Persisted, dr.Status); + Assert.Equal((Partition)0, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.True(dr.Message.Key.HasValue); + Assert.Equal($"test key {count + 42}", Encoding.UTF8.GetString(dr.Message.Key.Value.Span)); + Assert.True(dr.Message.Value.HasValue); + Assert.Equal($"test val {count + 42}", Encoding.UTF8.GetString(dr.Message.Value.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + count += 1; + }; + + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + producer.Produce( + new TopicPartition(singlePartitionTopic, 0), + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 42"), Value = Encoding.UTF8.GetBytes("test val 42") }, dh4); + + producer.Produce( + singlePartitionTopic, + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 43"), Value = Encoding.UTF8.GetBytes("test val 43") }, dh4); + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(2, count); Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce"); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs index 8bc1df92b..de69a3490 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs @@ -106,6 +106,40 @@ public void Producer_ProduceAsync_Error(string bootstrapServers) Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type); } + // byte[] case + + Task, Memory>> drt3; + using (var producer = new TestProducerBuilder, Memory>(producerConfig).Build()) + { + drt3 = producer.ProduceAsync( + new TopicPartition(partitionedTopic, 42), + new Message, Memory> { Key = new byte[] { 100 }, Value = new byte[] { 101 } }); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + Assert.Throws(() => { drt.Wait(); }); + + try + { + _ = drt3.Result; + } + catch (AggregateException e) + { + var inner = e.InnerException; + Assert.IsType, Memory>>(inner); + var dr = ((ProduceException, Memory>)inner).DeliveryResult; + var err = ((ProduceException, Memory>)inner).Error; + + Assert.True(err.IsError); + Assert.False(err.IsFatal); + Assert.Equal(partitionedTopic, dr.Topic); + Assert.Equal(Offset.Unset, dr.Offset); + Assert.True(dr.Partition == 42); + Assert.Equal(new byte[] { 100 }, dr.Message.Key.ToArray()); + Assert.Equal(new byte[] { 101 }, dr.Message.Value.ToArray()); + Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type); + } + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_ProduceAsync_Error"); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs index b5463cc37..973f49ad2 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs @@ -90,6 +90,29 @@ public void Producer_ProduceAsync_Null_Task(string bootstrapServers) Assert.Equal((Partition)1, drs2[0].Result.Partition); + // Memory? case + + var drs3 = new List?, Memory?>>>(); + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + drs3.Add(producer.ProduceAsync(new TopicPartition(partitionedTopic, 1), new Message?, Memory?>())); + drs3.Add(producer.ProduceAsync(partitionedTopic, new Message?, Memory?>())); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + for (int i = 0; i < 2; ++i) + { + var dr = drs3[i].Result; + Assert.True(dr.Partition == 0 || dr.Partition == 1); + Assert.Equal(partitionedTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.Null(dr.Message.Key); + Assert.Null(dr.Message.Value); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + } + + Assert.Equal((Partition)1, drs3[0].Result.Partition); Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_ProduceAsync_Null_Task"); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs index 98edbcb09..4e6b095b6 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs @@ -98,6 +98,64 @@ public void Producer_ProduceAsync_Task(string bootstrapServers) Assert.Equal((Partition)1, drs2[0].Result.Partition); + // Memory case + + var drs3 = new List, ReadOnlyMemory>>>(); + using (var producer = new TestProducerBuilder, ReadOnlyMemory>(producerConfig).Build()) + { + drs3.Add(producer.ProduceAsync( + new TopicPartition(partitionedTopic, 1), + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 2"), Value = Encoding.UTF8.GetBytes("test val 2") })); + drs3.Add(producer.ProduceAsync( + partitionedTopic, + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 3"), Value = Encoding.UTF8.GetBytes("test val 3") })); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + for (int i = 0; i < 2; ++i) + { + var dr = drs3[i].Result; + Assert.Equal(partitionedTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.True(dr.Partition == 0 || dr.Partition == 1); + Assert.Equal($"test key {i+2}", Encoding.UTF8.GetString(dr.Message.Key.Span)); + Assert.Equal($"test val {i+2}", Encoding.UTF8.GetString(dr.Message.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + } + + Assert.Equal((Partition)1, drs3[0].Result.Partition); + + // Memory? case + + var drs4 = new List?, Memory?>>>(); + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + drs4.Add(producer.ProduceAsync( + new TopicPartition(partitionedTopic, 1), + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 2"), Value = Encoding.UTF8.GetBytes("test val 2") })); + drs4.Add(producer.ProduceAsync( + partitionedTopic, + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 3"), Value = Encoding.UTF8.GetBytes("test val 3") })); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + for (int i = 0; i < 2; ++i) + { + var dr = drs4[i].Result; + Assert.Equal(partitionedTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.True(dr.Partition == 0 || dr.Partition == 1); + Assert.True(dr.Message.Key.HasValue); + Assert.Equal($"test key {i+2}", Encoding.UTF8.GetString(dr.Message.Key.Value.Span)); + Assert.True(dr.Message.Value.HasValue); + Assert.Equal($"test val {i+2}", Encoding.UTF8.GetString(dr.Message.Value.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + } + + Assert.Equal((Partition)1, drs4[0].Result.Partition); + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_ProduceAsync_Task"); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs index 14bd07c06..eba94683b 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs @@ -60,36 +60,5 @@ public void Producer_Produce_Async(string bootstrapServers) Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce_Async"); } - - [Theory, MemberData(nameof(KafkaParameters))] - public void Producer_Produce_Memory_Async(string bootstrapServers) - { - LogToFile("start Producer_Produce_Memory_Async"); - - var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers }; - - using (var testTopic = new TemporaryTopic(bootstrapServers, 1)) - using (var producer = new TestProducerBuilder?, Memory>(producerConfig) - .Build()) - using (var dProducer = new DependentProducerBuilder, ReadOnlyMemory?>(producer.Handle) - .Build()) - { - Memory data = new byte[] { 1, 2, 3, 4 }; - Assert.Throws?, Memory>>( - () => producer.Produce(testTopic.Name, new Message?, Memory> { Value = data })); - - Assert.Throws?, Memory>>( - () => producer.Produce(testTopic.Name, new Message?, Memory> { Value = data }, dr => { Assert.True(false); })); - - Assert.Throws, ReadOnlyMemory?>>( - () => dProducer.Produce(testTopic.Name, new Message, ReadOnlyMemory?> { Key = data })); - - Assert.Throws, ReadOnlyMemory?>>( - () => dProducer.Produce(testTopic.Name, new Message, ReadOnlyMemory?> { Key = data }, dr => { Assert.True(false); })); - } - - Assert.Equal(0, Library.HandleCount); - LogToFile("end Producer_Produce_Memory_Async"); - } } } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs index 6b5818b59..40c136147 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs @@ -89,6 +89,29 @@ public void Producer_Produce_Error(string bootstrapServers) Assert.Equal(1, count); + // Memory case. + + count = 0; + Action, ReadOnlyMemory?>> dh3 = dr => + { + Assert.Equal(ErrorCode.Local_UnknownPartition, dr.Error.Code); + Assert.Equal((Partition)42, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.Equal(Offset.Unset, dr.Offset); + Assert.Equal(new byte[] { 11 }, dr.Message.Key.ToArray()); + Assert.Null(dr.Message.Value); + Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type); + count += 1; + }; + + using (var producer = new TestProducerBuilder, ReadOnlyMemory?>(producerConfig).Build()) + { + producer.Produce(new TopicPartition(singlePartitionTopic, 42), new Message, ReadOnlyMemory?> { Key = new byte[] { 11 } }, dh3); + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(1, count); + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce_Error"); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs index fffc515d0..300c30fb2 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs @@ -89,6 +89,31 @@ public void Producer_Produce_Null(string bootstrapServers) Assert.Equal(2, count); + // Memory? case. + + count = 0; + Action?, Memory?>> dh3 = dr => + { + Assert.Equal(ErrorCode.NoError, dr.Error.Code); + Assert.Equal((Partition)0, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.Null(dr.Message.Key); + Assert.Null(dr.Message.Value); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + count += 1; + }; + + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + producer.Produce(new TopicPartition(singlePartitionTopic, 0), new Message?, Memory?>(), dh3); + producer.Produce(singlePartitionTopic, new Message?, Memory?>(), dh3); + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(2, count); + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce_Null"); } From b7acc222c592ad90475756813c894cf29008e6d6 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Mon, 18 Nov 2024 11:02:56 +0100 Subject: [PATCH 10/13] Add a produce/consume test --- .../Tests/SimpleProduceConsume.cs | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs b/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs index cbab5abd8..37774de38 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs @@ -50,21 +50,33 @@ public void SimpleProduceConsume(string bootstrapServers) string testString1 = "hello world"; string testString2 = null; + string testString3 = "dlrow olleh"; + string testString4 = null; DeliveryResult produceResult1; DeliveryResult produceResult2; + DeliveryResult?> produceResult3; + DeliveryResult?> produceResult4; using (var producer = new TestProducerBuilder(producerConfig).Build()) { produceResult1 = ProduceMessage(singlePartitionTopic, producer, testString1); produceResult2 = ProduceMessage(singlePartitionTopic, producer, testString2); } + using (var producer = new TestProducerBuilder?>(producerConfig).Build()) + { + produceResult3 = ProduceMessage(singlePartitionTopic, producer, testString3); + produceResult4 = ProduceMessage(singlePartitionTopic, producer, testString4); + } + using (var consumer = new TestConsumerBuilder(consumerConfig).Build()) { ConsumeMessage(consumer, produceResult1, testString1); ConsumeMessage(consumer, produceResult2, testString2); + ConsumeMessage(consumer, produceResult3, testString3); + ConsumeMessage(consumer, produceResult4, testString4); } - + Assert.Equal(0, Library.HandleCount); LogToFile("end SimpleProduceConsume"); } @@ -80,6 +92,17 @@ private static void ConsumeMessage(IConsumer consumer, DeliveryR Assert.Equal(r.Message.Timestamp.UnixTimestampMs, dr.Message.Timestamp.UnixTimestampMs); } + private static void ConsumeMessage(IConsumer consumer, DeliveryResult?> dr, string testString) + { + consumer.Assign(new List { dr.TopicPartitionOffset }); + var r = consumer.Consume(TimeSpan.FromSeconds(10)); + Assert.NotNull(r?.Message); + Assert.Equal(testString, r.Message.Value == null ? null : Encoding.UTF8.GetString(r.Message.Value)); + Assert.Null(r.Message.Key); + Assert.Equal(r.Message.Timestamp.Type, dr.Message.Timestamp.Type); + Assert.Equal(r.Message.Timestamp.UnixTimestampMs, dr.Message.Timestamp.UnixTimestampMs); + } + private static DeliveryResult ProduceMessage(string topic, IProducer producer, string testString) { var result = producer.ProduceAsync(topic, new Message { Value = testString }).Result; @@ -91,5 +114,20 @@ private static DeliveryResult ProduceMessage(string topic, IProduc Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); return result; } + + private static DeliveryResult?> ProduceMessage(string topic, IProducer?> producer, string testString) + { + var result = producer .ProduceAsync(topic, new Message?> + { + Value = testString == null ? null : Encoding.UTF8.GetBytes(testString), + }).Result; + Assert.NotNull(result?.Message); + Assert.Equal(topic, result.Topic); + Assert.NotEqual(result.Offset, Offset.Unset); + Assert.Equal(TimestampType.CreateTime, result.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - result.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + return result; + } } } From d4e728b25ce3c7390270d1d88ed8966ef023a590 Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Mon, 14 Apr 2025 13:12:28 -0400 Subject: [PATCH 11/13] Greatly simplify code --- src/Confluent.Kafka/Producer.cs | 134 ++++++++------------------------ 1 file changed, 32 insertions(+), 102 deletions(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index d49e701d1..e227fd378 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -42,8 +42,8 @@ internal class Config public PartitionerDelegate defaultPartitioner; } - private ISerializer keySerializer; - private ISerializer valueSerializer; + private Func?> serializeKey; + private Func?> serializeValue; private IAsyncSerializer asyncKeySerializer; private IAsyncSerializer asyncValueSerializer; @@ -58,6 +58,14 @@ internal class Config { typeof(byte[]), Serializers.ByteArray } }; + private static readonly Dictionary memorySerializeFuncs = new Dictionary + { + [typeof(Memory)] = (Memory x, SerializationContext _) => (ReadOnlyMemory?)x, + [typeof(Memory?)] = (Memory? x, SerializationContext _) => (ReadOnlyMemory?)x, + [typeof(ReadOnlyMemory)] = (ReadOnlyMemory x, SerializationContext _) => (ReadOnlyMemory?)x, + [typeof(ReadOnlyMemory?)] = (ReadOnlyMemory? x, SerializationContext _) => x, + }; + private int cancellationDelayMaxMs; private bool disposeHasBeenCalled = false; private object disposeHasBeenCalledLockObj = new object(); @@ -510,14 +518,12 @@ private void InitializeSerializers( { if (defaultSerializers.TryGetValue(typeof(TKey), out object serializer)) { - this.keySerializer = (ISerializer)serializer; + keySerializer = (ISerializer)serializer; + this.serializeKey = (k, ctx) => keySerializer.Serialize(k, ctx)?.AsMemory(); } - else if (typeof(TKey) == typeof(Memory) - || typeof(TKey) == typeof(ReadOnlyMemory) - || typeof(TKey) == typeof(Memory?) - || typeof(TKey) == typeof(ReadOnlyMemory?)) + else if (memorySerializeFuncs.TryGetValue(typeof(TKey), out object serialize)) { - // Serializers are not used for Memory. + this.serializeKey = (Func?>)serialize; } else { @@ -531,7 +537,7 @@ private void InitializeSerializers( } else if (keySerializer != null && asyncKeySerializer == null) { - this.keySerializer = keySerializer; + this.serializeKey = (k, ctx) => keySerializer.Serialize(k, ctx)?.AsMemory(); } else { @@ -543,14 +549,12 @@ private void InitializeSerializers( { if (defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) { - this.valueSerializer = (ISerializer)serializer; + valueSerializer = (ISerializer)serializer; + this.serializeValue = (k, ctx) => valueSerializer.Serialize(k, ctx)?.AsMemory(); } - else if (typeof(TValue) == typeof(Memory) - || typeof(TValue) == typeof(ReadOnlyMemory) - || typeof(TValue) == typeof(Memory?) - || typeof(TValue) == typeof(ReadOnlyMemory?)) + else if (memorySerializeFuncs.TryGetValue(typeof(TKey), out object serialize)) { - // Serializers are not used for Memory. + this.serializeValue = (Func?>)serialize; } else { @@ -564,7 +568,7 @@ private void InitializeSerializers( } else if (valueSerializer != null && asyncValueSerializer == null) { - this.valueSerializer = valueSerializer; + this.serializeValue = (k, ctx) => valueSerializer.Serialize(k, ctx)?.AsMemory(); } else { @@ -773,28 +777,9 @@ public async Task> ProduceAsync( ReadOnlyMemory? keyBytes; try { - if (keySerializer != null) - { - SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers); - keyBytes = keySerializer.Serialize(message.Key, ctx)?.AsMemory(); - } - else if (asyncKeySerializer != null) - { - SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers); - keyBytes = (await asyncKeySerializer.SerializeAsync(message.Key, ctx).ConfigureAwait(false))?.AsMemory(); - } - else if (message.Key is Memory memory) - { - keyBytes = memory; - } - else if (message.Key is ReadOnlyMemory readOnlyMemory) - { - keyBytes = readOnlyMemory; - } - else // Null Memory? - { - keyBytes = null; - } + keyBytes = (serializeKey != null) + ? serializeKey(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) + : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); } catch (Exception ex) { @@ -811,28 +796,9 @@ public async Task> ProduceAsync( ReadOnlyMemory? valBytes; try { - if (valueSerializer != null) - { - SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers); - valBytes = valueSerializer.Serialize(message.Value, ctx)?.AsMemory(); - } - else if (asyncValueSerializer != null) - { - SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers); - valBytes = (await asyncValueSerializer.SerializeAsync(message.Value, ctx).ConfigureAwait(false))?.AsMemory(); - } - else if (message.Value is Memory memory) - { - valBytes = memory; - } - else if (message.Value is ReadOnlyMemory readOnlyMemory) - { - valBytes = readOnlyMemory; - } - else // Null Memory? - { - valBytes = null; - } + valBytes = (serializeValue != null) + ? serializeValue(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); } catch (Exception ex) { @@ -934,27 +900,9 @@ public void Produce( ReadOnlyMemory? keyBytes; try { - if (keySerializer != null) - { - SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers); - keyBytes = keySerializer.Serialize(message.Key, ctx)?.AsMemory(); - } - else if (asyncKeySerializer != null) - { - throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); - } - else if (message.Key is Memory memory) - { - keyBytes = memory; - } - else if (message.Key is ReadOnlyMemory readOnlyMemory) - { - keyBytes = readOnlyMemory; - } - else // Null Memory? - { - keyBytes = null; - } + keyBytes = (serializeKey != null) + ? serializeKey(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) + : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); } catch (Exception ex) { @@ -971,27 +919,9 @@ public void Produce( ReadOnlyMemory? valBytes; try { - if (valueSerializer != null) - { - SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers); - valBytes = valueSerializer.Serialize(message.Value, ctx)?.AsMemory(); - } - else if (asyncValueSerializer != null) - { - throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); - } - else if (message.Value is Memory memory) - { - valBytes = memory; - } - else if (message.Value is ReadOnlyMemory readOnlyMemory) - { - valBytes = readOnlyMemory; - } - else // Null Memory? - { - valBytes = null; - } + valBytes = (serializeValue != null) + ? serializeValue(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); } catch (Exception ex) { From 12adab1bc4d07f598990dd1bf6adc284092edc5f Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Mon, 14 Apr 2025 13:26:51 -0400 Subject: [PATCH 12/13] Fix bug --- src/Confluent.Kafka/Producer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index e227fd378..a8c507e66 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -552,7 +552,7 @@ private void InitializeSerializers( valueSerializer = (ISerializer)serializer; this.serializeValue = (k, ctx) => valueSerializer.Serialize(k, ctx)?.AsMemory(); } - else if (memorySerializeFuncs.TryGetValue(typeof(TKey), out object serialize)) + else if (memorySerializeFuncs.TryGetValue(typeof(TValue), out object serialize)) { this.serializeValue = (Func?>)serialize; } From 30146df09c2d7a19f28cca4ce36e9d3584ede3da Mon Sep 17 00:00:00 2001 From: verdie-g Date: Mon, 26 May 2025 15:44:43 -0400 Subject: [PATCH 13/13] nit --- .../Tests/Producer_ProduceAsync_Error.cs | 2 +- .../Tests/SimpleProduceConsume.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs index de69a3490..40f398b67 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs @@ -106,7 +106,7 @@ public void Producer_ProduceAsync_Error(string bootstrapServers) Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type); } - // byte[] case + // Memory case Task, Memory>> drt3; using (var producer = new TestProducerBuilder, Memory>(producerConfig).Build()) diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs b/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs index 37774de38..191c79ba2 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs @@ -117,7 +117,7 @@ private static DeliveryResult ProduceMessage(string topic, IProduc private static DeliveryResult?> ProduceMessage(string topic, IProducer?> producer, string testString) { - var result = producer .ProduceAsync(topic, new Message?> + var result = producer.ProduceAsync(topic, new Message?> { Value = testString == null ? null : Encoding.UTF8.GetBytes(testString), }).Result;