From 28b16be6ac57e136e6396afdf1c04852996e196b Mon Sep 17 00:00:00 2001 From: Christopher de Haas Date: Mon, 22 Nov 2021 09:39:57 +0100 Subject: [PATCH] Change serialization to be use an IBufferWriter instead of relying on a correctly size un-poolable byte array --- src/Confluent.Kafka/ArrayPoolBufferWriter.cs | 135 ++++++++ src/Confluent.Kafka/BufferWriterExtensions.cs | 45 +++ src/Confluent.Kafka/BufferWriterStream.cs | 158 +++++++++ src/Confluent.Kafka/Confluent.Kafka.csproj | 1 + .../DefaultSerializationBufferProvider.cs | 43 +++ .../DependentProducerBuilder.cs | 13 + src/Confluent.Kafka/IAsyncSerializer.cs | 6 +- src/Confluent.Kafka/ISerializationBuffer.cs | 38 +++ .../ISerializationBufferProvider.cs | 36 ++ src/Confluent.Kafka/ISerializer.cs | 10 +- src/Confluent.Kafka/Producer.cs | 319 ++++++++++-------- src/Confluent.Kafka/ProducerBuilder.cs | 20 ++ src/Confluent.Kafka/Serializers.cs | 87 +++-- .../SyncOverAsyncSerializer.cs | 11 +- .../AvroSerializer.cs | 8 +- .../GenericSerializerImpl.cs | 10 +- .../IAvroSerializerImpl.cs | 3 +- .../SpecificSerializerImpl.cs | 8 +- .../JsonSerializer.cs | 33 +- .../ProtobufSerializer.cs | 68 ++-- .../Serdes.cs | 9 +- .../Tests/AssignPastEnd.cs | 2 +- .../Tests/Builder_CustomDefaults.cs | 8 +- .../Tests/Consumer_DisableHeaders.cs | 2 +- .../Tests/Consumer_DisableTimestamps.cs | 2 +- .../Tests/Consumer_OffsetsForTimes.cs | 4 +- .../Tests/Consumer_Pause_Resume.cs | 4 +- .../Tests/Consumer_Seek.cs | 6 +- .../Tests/Consumer_StoreOffset.cs | 2 +- .../Tests/DuplicateConsumerAssign.cs | 2 +- .../Tests/GarbageCollect.cs | 2 +- .../Tests/Headers_SerializationContext.cs | 11 +- .../Tests/LogDelegate.cs | 2 +- .../Tests/OnPartitionsAssignedNotSet.cs | 2 +- .../Tests/SerializationExtensions.cs | 33 ++ test/Confluent.Kafka.SyncOverAsync/Program.cs | 9 +- .../Serialization/ByteArray.cs | 4 +- .../Serialization/Double.cs | 4 +- .../Serialization/Float.cs | 4 +- .../Serialization/Int.cs | 6 +- .../Serialization/Long.cs | 4 +- .../Serialization/SerializationExtensions.cs | 33 ++ .../Serialization/String.cs | 8 +- .../JsonSerializeDeserialize.cs | 43 ++- .../ProtoSerializeDeserialize.cs | 4 +- .../SerializeDeserialize.cs | 26 +- 46 files changed, 1010 insertions(+), 278 deletions(-) create mode 100644 src/Confluent.Kafka/ArrayPoolBufferWriter.cs create mode 100644 src/Confluent.Kafka/BufferWriterExtensions.cs create mode 100644 src/Confluent.Kafka/BufferWriterStream.cs create mode 100644 src/Confluent.Kafka/DefaultSerializationBufferProvider.cs create mode 100644 src/Confluent.Kafka/ISerializationBuffer.cs create mode 100644 src/Confluent.Kafka/ISerializationBufferProvider.cs create mode 100644 test/Confluent.Kafka.IntegrationTests/Tests/SerializationExtensions.cs create mode 100644 test/Confluent.Kafka.UnitTests/Serialization/SerializationExtensions.cs diff --git a/src/Confluent.Kafka/ArrayPoolBufferWriter.cs b/src/Confluent.Kafka/ArrayPoolBufferWriter.cs new file mode 100644 index 000000000..1e1369343 --- /dev/null +++ b/src/Confluent.Kafka/ArrayPoolBufferWriter.cs @@ -0,0 +1,135 @@ +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Buffers; + + +namespace Confluent.Kafka +{ + /// + /// Implements a using for memory allocation. + /// + internal sealed class ArrayPoolBufferWriter : ISerializationBuffer + { + /// + /// The default buffer size to use to expand empty arrays. + /// + private const int DefaultInitialBufferSize = 256; + + private readonly ArrayPool pool; + private int index; + private byte[] array; + + /// + /// Initializes a new instance of the class. + /// + public ArrayPoolBufferWriter() + { + this.pool = ArrayPool.Shared; + this.array = pool.Rent(DefaultInitialBufferSize); + this.index = 0; + } + + /// + /// Returns the memory allocation on finialization if not explicitly disposed in user code. + /// + ~ArrayPoolBufferWriter() => Dispose(); + + /// + public void Advance(int count) + { + byte[] array = this.array; + + if (array is null) + { + throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter)); + } + + if (count < 0) + { + throw new ArgumentOutOfRangeException("Count must be greater than 0"); + } + + if (this.index > array.Length - count) + { + throw new ArgumentOutOfRangeException("Cannot advance further than current capacity"); + } + + this.index += count; + } + + /// + public void Dispose() + { + if (this.array != null) + { + this.pool.Return(this.array, true); + this.array = null; + } + } + + + public ArraySegment GetComitted(int offset = 0) + { + return new ArraySegment(this.array, offset, this.index); + } + + /// + public Memory GetMemory(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + return this.array.AsMemory(this.index); + } + + /// + public Span GetSpan(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + return this.array.AsSpan(this.index); + } + + private void EnsureCapacity(int sizeHint) + { + var array = this.array; + + if (array is null) + { + throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter)); + } + + if (sizeHint < 0) + { + throw new ArgumentOutOfRangeException("Cannot advance further than current capacity"); + } + + if (sizeHint == 0) + { + sizeHint = 1; + } + + if (sizeHint > array.Length - this.index) + { + int minimumSize = this.index + sizeHint; + var newArray = pool.Rent(minimumSize); + + Array.Copy(array, 0, newArray, 0, this.index); + pool.Return(array, true); + array = newArray; + } + } + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/BufferWriterExtensions.cs b/src/Confluent.Kafka/BufferWriterExtensions.cs new file mode 100644 index 000000000..f7f1b9450 --- /dev/null +++ b/src/Confluent.Kafka/BufferWriterExtensions.cs @@ -0,0 +1,45 @@ +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Buffers; +using System.IO; + + +namespace Confluent.Kafka +{ + /// + /// Extends a with a adapter. + /// + public static class BufferWriterExtensions + { + /// + /// Gets a adapting implementation working on a as underlying memory. + /// + /// The used for underlying memory. + /// + /// Thrown if the provises is null. + public static Stream AsStream(this IBufferWriter bufferWriter) + { + if (bufferWriter is null) + { + throw new ArgumentNullException(nameof(bufferWriter)); + } + + return new BufferWriterStream(bufferWriter); + } + } +} diff --git a/src/Confluent.Kafka/BufferWriterStream.cs b/src/Confluent.Kafka/BufferWriterStream.cs new file mode 100644 index 000000000..52bf29774 --- /dev/null +++ b/src/Confluent.Kafka/BufferWriterStream.cs @@ -0,0 +1,158 @@ +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Buffers; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + + +namespace Confluent.Kafka +{ + /// + /// A implementation wrapping an instance. + /// + internal sealed class BufferWriterStream : Stream + { + private readonly IBufferWriter bufferWriter; + private bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The target instance to use. + public BufferWriterStream(IBufferWriter bufferWriter) + { + this.bufferWriter = bufferWriter ?? throw new ArgumentNullException(nameof(bufferWriter)); + } + + /// + public override bool CanRead => false; + + /// + public override bool CanSeek => false; + + /// + public override bool CanWrite + { + get => !this.disposed; + } + + /// + public override long Length => throw new NotSupportedException(); + + /// + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + throw new NotSupportedException(); + } + + /// + public override void Flush() + { + } + + /// + public override Task FlushAsync(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + return Task.FromResult(true); + } + + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + throw new NotSupportedException(); + } + + /// + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + Write(buffer, offset, count); + cancellationToken.ThrowIfCancellationRequested(); + + return Task.FromResult(true); + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + /// + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + /// + public override int ReadByte() + { + throw new NotSupportedException(); + } + + /// + public override void Write(byte[] buffer, int offset, int count) + { + if (this.disposed) + { + throw new ObjectDisposedException(nameof(BufferWriterStream)); + } + + var source = buffer.AsSpan(offset, count); + var destination = this.bufferWriter.GetSpan(count); + + source.CopyTo(destination); + + this.bufferWriter.Advance(count); + } + + /// + public override void WriteByte(byte value) + { + if (this.disposed) + { + throw new ObjectDisposedException(nameof(BufferWriterStream)); + } + + this.bufferWriter.GetSpan(1)[0] = value; + + this.bufferWriter.Advance(1); + } + + /// + protected override void Dispose(bool disposing) + { + this.disposed = true; + } + } +} diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj index 78cbf20fd..9bf6faec8 100755 --- a/src/Confluent.Kafka/Confluent.Kafka.csproj +++ b/src/Confluent.Kafka/Confluent.Kafka.csproj @@ -24,6 +24,7 @@ None + diff --git a/src/Confluent.Kafka/DefaultSerializationBufferProvider.cs b/src/Confluent.Kafka/DefaultSerializationBufferProvider.cs new file mode 100644 index 000000000..e6e057292 --- /dev/null +++ b/src/Confluent.Kafka/DefaultSerializationBufferProvider.cs @@ -0,0 +1,43 @@ +// Copyright 2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Buffers; + + +namespace Confluent.Kafka +{ + /// + /// The default . Creates buffers using internal memory. + /// + public sealed class DefaultSerializationBufferProvider : ISerializationBufferProvider + { + /// + /// Gets a singleton instance of the + /// + public static DefaultSerializationBufferProvider Instance { get; } = new DefaultSerializationBufferProvider(); + + private DefaultSerializationBufferProvider() + { + + } + + /// + public ISerializationBuffer Create() + { + return new ArrayPoolBufferWriter(); + } + } +} diff --git a/src/Confluent.Kafka/DependentProducerBuilder.cs b/src/Confluent.Kafka/DependentProducerBuilder.cs index d152ca652..ef0aa723f 100644 --- a/src/Confluent.Kafka/DependentProducerBuilder.cs +++ b/src/Confluent.Kafka/DependentProducerBuilder.cs @@ -54,6 +54,10 @@ public class DependentProducerBuilder /// public IAsyncSerializer AsyncValueSerializer { get; set; } + /// + /// The configured serialization buffer provider. + /// + public ISerializationBufferProvider SerializationBufferProvider { get; set; } /// /// An underlying librdkafka client handle that the Producer will use to @@ -101,6 +105,15 @@ public DependentProducerBuilder SetValueSerializer(IAsyncSerialize return this; } + /// + /// The serialization buffer provider. + /// + public DependentProducerBuilder SetSerializationBufferProvider(ISerializationBufferProvider serializationBufferProvider) + { + this.SerializationBufferProvider = serializationBufferProvider; + return this; + } + /// /// Build a new IProducer implementation instance. /// diff --git a/src/Confluent.Kafka/IAsyncSerializer.cs b/src/Confluent.Kafka/IAsyncSerializer.cs index 8be1431a7..8cee1c94a 100644 --- a/src/Confluent.Kafka/IAsyncSerializer.cs +++ b/src/Confluent.Kafka/IAsyncSerializer.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Threading.Tasks; @@ -35,10 +36,13 @@ public interface IAsyncSerializer /// /// Context relevant to the serialize operation. /// + /// + /// The to serialize the binary representation to. + /// /// /// A that /// completes with the serialized data. /// - Task SerializeAsync(T data, SerializationContext context); + Task SerializeAsync(T data, SerializationContext context, IBufferWriter bufferWriter); } } diff --git a/src/Confluent.Kafka/ISerializationBuffer.cs b/src/Confluent.Kafka/ISerializationBuffer.cs new file mode 100644 index 000000000..8abb07e37 --- /dev/null +++ b/src/Confluent.Kafka/ISerializationBuffer.cs @@ -0,0 +1,38 @@ +// Copyright 2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Buffers; + +namespace Confluent.Kafka +{ + /// + /// Defines a buffer used to serialize keys and value. This buffer is disposed when no longer used. + /// + public interface ISerializationBuffer : IBufferWriter, IDisposable + { + /// + /// Gets an representing the currently comitted memory. + /// + /// + /// An optional offset into the comitted memory. + /// + /// + /// Returns a representing the commited memory with an initial offset as requested. + /// + ArraySegment GetComitted(int offset = 0); + } +} diff --git a/src/Confluent.Kafka/ISerializationBufferProvider.cs b/src/Confluent.Kafka/ISerializationBufferProvider.cs new file mode 100644 index 000000000..cc16a1a0a --- /dev/null +++ b/src/Confluent.Kafka/ISerializationBufferProvider.cs @@ -0,0 +1,36 @@ +// Copyright 2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Buffers; +using System.Collections.Generic; + +namespace Confluent.Kafka +{ + /// + /// Defines a factory for creating a used for key and value serialization. + /// + public interface ISerializationBufferProvider + { + /// + /// Creates a new . + /// + /// + /// Returns a . + /// + ISerializationBuffer Create(); + } +} diff --git a/src/Confluent.Kafka/ISerializer.cs b/src/Confluent.Kafka/ISerializer.cs index e404536d8..b23be4113 100644 --- a/src/Confluent.Kafka/ISerializer.cs +++ b/src/Confluent.Kafka/ISerializer.cs @@ -15,6 +15,8 @@ // Refer to LICENSE for more information. +using System.Buffers; + namespace Confluent.Kafka { /// @@ -32,9 +34,9 @@ public interface ISerializer /// /// Context relevant to the serialize operation. /// - /// - /// The serialized value. - /// - byte[] Serialize(T data, SerializationContext context); + /// + /// The to serialize the binary representation to. + /// + void Serialize(T data, SerializationContext context, IBufferWriter bufferWriter); } } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 3d605a774..da3209628 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -15,7 +15,9 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Runtime.InteropServices; using System.Threading; @@ -46,6 +48,7 @@ internal class Config private ISerializer valueSerializer; private IAsyncSerializer asyncKeySerializer; private IAsyncSerializer asyncValueSerializer; + private ISerializationBufferProvider serializationBufferProvider; private static readonly Dictionary defaultSerializers = new Dictionary { @@ -279,8 +282,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq private void ProduceImpl( string topic, - byte[] val, int valOffset, int valLength, - byte[] key, int keyOffset, int keyLength, + ArraySegment valueBuffer, + ArraySegment keyBuffer, Timestamp timestamp, Partition partition, IEnumerable headers, @@ -308,8 +311,8 @@ private void ProduceImpl( err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + valueBuffer.Array, valueBuffer.Offset, valueBuffer.Count, + keyBuffer.Array, keyBuffer.Offset, keyBuffer.Count, partition.Value, timestamp.UnixTimestampMs, headers, @@ -325,8 +328,8 @@ private void ProduceImpl( { err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + valueBuffer.Array, valueBuffer.Offset, valueBuffer.Count, + keyBuffer.Array, keyBuffer.Offset, keyBuffer.Count, partition.Value, timestamp.UnixTimestampMs, headers, @@ -495,6 +498,18 @@ public Handle Handle } } + private void InitializeSerializationBufferProvider(ISerializationBufferProvider serializationBufferProvider) + { + if (serializationBufferProvider == null) + { + this.serializationBufferProvider = DefaultSerializationBufferProvider.Instance; + } + else + { + this.serializationBufferProvider = serializationBufferProvider; + } + } + private void InitializeSerializers( ISerializer keySerializer, ISerializer valueSerializer, @@ -560,6 +575,8 @@ internal Producer(DependentProducerBuilder builder) InitializeSerializers( builder.KeySerializer, builder.ValueSerializer, builder.AsyncKeySerializer, builder.AsyncValueSerializer); + + InitializeSerializationBufferProvider(builder.SerializationBufferProvider); } internal Producer(ProducerBuilder builder) @@ -735,6 +752,8 @@ internal Producer(ProducerBuilder builder) InitializeSerializers( builder.KeySerializer, builder.ValueSerializer, builder.AsyncKeySerializer, builder.AsyncValueSerializer); + + InitializeSerializationBufferProvider(builder.SerializationBufferProvider); } @@ -746,95 +765,113 @@ public async Task> ProduceAsync( { Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; - try - { - keyBytes = (keySerializer != null) - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) - : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); - } - catch (Exception ex) + using (var buffer = this.serializationBufferProvider.Create()) { - throw new ProduceException( - new Error(ErrorCode.Local_KeySerialization), - new DeliveryResult - { - Message = message, - TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) - }, - ex); - } + ArraySegment keySegment; + ArraySegment valueSegment; - byte[] valBytes; - try - { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); - } - catch (Exception ex) - { - throw new ProduceException( - new Error(ErrorCode.Local_ValueSerialization), - new DeliveryResult + try + { + if (keySerializer != null) { - Message = message, - TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) - }, - ex); - } - - try - { - if (enableDeliveryReports) + keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers), buffer); + } + else + { + await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers), buffer).ConfigureAwait(false); + } + } + catch (Exception ex) { - var handler = new TypedTaskDeliveryHandlerShim( - topicPartition.Topic, - enableDeliveryReportKey ? message.Key : default(TKey), - enableDeliveryReportValue ? message.Value : default(TValue)); + throw new ProduceException( + new Error(ErrorCode.Local_KeySerialization), + new DeliveryResult + { + Message = message, + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) + }, + ex); + } - if (cancellationToken != null && cancellationToken.CanBeCanceled) + keySegment = buffer.GetComitted(); + + try + { + if (valueSerializer != null) + { + valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers), buffer); + } + else { - handler.CancellationTokenRegistration - = cancellationToken.Register(() => handler.TrySetCanceled()); + await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers), buffer).ConfigureAwait(false); } + } + catch (Exception ex) + { + throw new ProduceException( + new Error(ErrorCode.Local_ValueSerialization), + new DeliveryResult + { + Message = message, + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) + }, + ex); + } - ProduceImpl( - topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, - message.Timestamp, topicPartition.Partition, headers, - handler); + valueSegment = buffer.GetComitted(offset: keySegment.Count); - return await handler.Task.ConfigureAwait(false); - } - else + try { - ProduceImpl( - topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, - message.Timestamp, topicPartition.Partition, headers, - null); + if (enableDeliveryReports) + { + var handler = new TypedTaskDeliveryHandlerShim( + topicPartition.Topic, + enableDeliveryReportKey ? message.Key : default(TKey), + enableDeliveryReportValue ? message.Value : default(TValue)); + + if (cancellationToken != null && cancellationToken.CanBeCanceled) + { + handler.CancellationTokenRegistration + = cancellationToken.Register(() => handler.TrySetCanceled()); + } - var result = new DeliveryResult + ProduceImpl( + topicPartition.Topic, + valueSegment, + keySegment, + message.Timestamp, topicPartition.Partition, headers, + handler); + + return await handler.Task.ConfigureAwait(false); + } + else { - TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset), - Message = message - }; + ProduceImpl( + topicPartition.Topic, + valueSegment, + keySegment, + message.Timestamp, topicPartition.Partition, headers, + null); + + var result = new DeliveryResult + { + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset), + Message = message + }; - return result; + return result; + } + } + catch (KafkaException ex) + { + throw new ProduceException( + ex.Error, + new DeliveryResult + { + Message = message, + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) + }); } - } - catch (KafkaException ex) - { - throw new ProduceException( - ex.Error, - new DeliveryResult - { - Message = message, - TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) - }); } } @@ -869,67 +906,85 @@ public void Produce( Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; - try - { - keyBytes = (keySerializer != null) - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); - } - catch (Exception ex) + using (var buffer = this.serializationBufferProvider.Create()) { - throw new ProduceException( - new Error(ErrorCode.Local_KeySerialization, ex.ToString()), - new DeliveryResult + ArraySegment keySegment; + ArraySegment valueSegment; + + try + { + if (keySerializer != null) { - Message = message, - TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset), - }, - ex); - } + keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers), buffer); + } + else + { + throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); + } + } + catch (Exception ex) + { + throw new ProduceException( + new Error(ErrorCode.Local_KeySerialization, ex.ToString()), + new DeliveryResult + { + Message = message, + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset), + }, + ex); + } - byte[] valBytes; - try - { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); - } - catch (Exception ex) - { - throw new ProduceException( - new Error(ErrorCode.Local_ValueSerialization, ex.ToString()), - new DeliveryResult + keySegment = buffer.GetComitted(); + + try + { + if (valueSerializer != null) { - Message = message, - TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset), - }, - ex); - } + valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers), buffer); + } + else + { + throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + } + } + catch (Exception ex) + { + throw new ProduceException( + new Error(ErrorCode.Local_ValueSerialization, ex.ToString()), + new DeliveryResult + { + Message = message, + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset), + }, + ex); + } - try - { - ProduceImpl( - topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, - message.Timestamp, topicPartition.Partition, - headers, - new TypedDeliveryHandlerShim_Action( + valueSegment = buffer.GetComitted(offset: keySegment.Count); + + try + { + ProduceImpl( topicPartition.Topic, - enableDeliveryReportKey ? message.Key : default(TKey), - enableDeliveryReportValue ? message.Value : default(TValue), - deliveryHandler)); - } - catch (KafkaException ex) - { - throw new ProduceException( - ex.Error, - new DeliveryReport + valueSegment, + keySegment, + message.Timestamp, topicPartition.Partition, + headers, + new TypedDeliveryHandlerShim_Action( + topicPartition.Topic, + enableDeliveryReportKey ? message.Key : default(TKey), + enableDeliveryReportValue ? message.Value : default(TValue), + deliveryHandler)); + } + catch (KafkaException ex) + { + throw new ProduceException( + ex.Error, + new DeliveryReport { Message = message, TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) }); + } } } diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index bb24db3a2..0a9526114 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; @@ -114,6 +115,11 @@ public class ProducerBuilder /// internal protected IAsyncSerializer AsyncValueSerializer { get; set; } + /// + /// The configured serialization buffer provider. + /// + internal protected ISerializationBufferProvider SerializationBufferProvider { get; set; } + internal Producer.Config ConstructBaseConfig(Producer producer) { return new Producer.Config @@ -367,6 +373,20 @@ public ProducerBuilder SetValueSerializer(IAsyncSerializer return this; } + /// + /// The to use to when serializing keys and values. + /// + public ProducerBuilder SetSerializationBufferProvider(ISerializationBufferProvider serializationBufferProvider) + { + if (this.SerializationBufferProvider != null) + { + throw new InvalidOperationException("Serialization buffer provider may not be specified more than once."); + } + + this.SerializationBufferProvider = serializationBufferProvider ?? throw new ArgumentNullException(nameof(serializationBufferProvider)); + return this; + } + /// /// Build a new IProducer implementation instance. /// diff --git a/src/Confluent.Kafka/Serializers.cs b/src/Confluent.Kafka/Serializers.cs index b9e9ac2eb..2556c89b4 100644 --- a/src/Confluent.Kafka/Serializers.cs +++ b/src/Confluent.Kafka/Serializers.cs @@ -15,6 +15,8 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; +using System.Runtime.InteropServices; using System.Text; @@ -32,14 +34,18 @@ public static class Serializers private class Utf8Serializer : ISerializer { - public byte[] Serialize(string data, SerializationContext context) + public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter) { - if (data == null) + if (data != null) { - return null; - } + var size = Encoding.UTF8.GetByteCount(data); + + var buffer = bufferWriter.GetMemory(size); + MemoryMarshal.TryGetArray(buffer, out var arraySegment); - return Encoding.UTF8.GetBytes(data); + Encoding.UTF8.GetBytes(data, 0, data.Length, arraySegment.Array, arraySegment.Offset); + bufferWriter.Advance(size); + } } } @@ -51,8 +57,10 @@ public byte[] Serialize(string data, SerializationContext context) private class NullSerializer : ISerializer { - public byte[] Serialize(Null data, SerializationContext context) - => null; + public void Serialize(Null data, SerializationContext context, IBufferWriter bufferWriter) + { + + } } @@ -63,9 +71,9 @@ public byte[] Serialize(Null data, SerializationContext context) private class Int64Serializer : ISerializer { - public byte[] Serialize(long data, SerializationContext context) + public void Serialize(long data, SerializationContext context, IBufferWriter bufferWriter) { - var result = new byte[8]; + var result = bufferWriter.GetSpan(sizeHint: 8); result[0] = (byte)(data >> 56); result[1] = (byte)(data >> 48); result[2] = (byte)(data >> 40); @@ -74,7 +82,8 @@ public byte[] Serialize(long data, SerializationContext context) result[5] = (byte)(data >> 16); result[6] = (byte)(data >> 8); result[7] = (byte)data; - return result; + + bufferWriter.Advance(8); } } @@ -86,9 +95,9 @@ public byte[] Serialize(long data, SerializationContext context) private class Int32Serializer : ISerializer { - public byte[] Serialize(int data, SerializationContext context) + public void Serialize(int data, SerializationContext context, IBufferWriter bufferWriter) { - var result = new byte[4]; // int is always 32 bits on .NET. + var result = bufferWriter.GetSpan(sizeHint: 4); // int is always 32 bits on .NET. // network byte order -> big endian -> most significant byte in the smallest address. // Note: At the IL level, the conv.u1 operator is used to cast int to byte which truncates // the high order bits if overflow occurs. @@ -97,7 +106,8 @@ public byte[] Serialize(int data, SerializationContext context) result[1] = (byte)(data >> 16); // & 0xff; result[2] = (byte)(data >> 8); // & 0xff; result[3] = (byte)data; // & 0xff; - return result; + + bufferWriter.Advance(count: 4); } } @@ -109,24 +119,35 @@ public byte[] Serialize(int data, SerializationContext context) private class SingleSerializer : ISerializer { - public byte[] Serialize(float data, SerializationContext context) + public void Serialize(float data, SerializationContext context, IBufferWriter bufferWriter) { if (BitConverter.IsLittleEndian) { unsafe { - byte[] result = new byte[4]; + var result = bufferWriter.GetSpan(4); byte* p = (byte*)(&data); result[3] = *p++; result[2] = *p++; result[1] = *p++; result[0] = *p++; - return result; + + bufferWriter.Advance(4); } } else { - return BitConverter.GetBytes(data); + unsafe + { + var result = bufferWriter.GetSpan(4); + byte* p = (byte*)(&data); + result[0] = *p++; + result[1] = *p++; + result[2] = *p++; + result[3] = *p++; + + bufferWriter.Advance(4); + } } } } @@ -139,13 +160,13 @@ public byte[] Serialize(float data, SerializationContext context) private class DoubleSerializer : ISerializer { - public byte[] Serialize(double data, SerializationContext context) + public void Serialize(double data, SerializationContext context, IBufferWriter bufferWriter) { if (BitConverter.IsLittleEndian) { unsafe { - byte[] result = new byte[8]; + var result = bufferWriter.GetSpan(8); byte* p = (byte*)(&data); result[7] = *p++; result[6] = *p++; @@ -155,13 +176,26 @@ public byte[] Serialize(double data, SerializationContext context) result[2] = *p++; result[1] = *p++; result[0] = *p++; - return result; } } else { - return BitConverter.GetBytes(data); + unsafe + { + var result = bufferWriter.GetSpan(8); + byte* p = (byte*)(&data); + result[0] = *p++; + result[1] = *p++; + result[2] = *p++; + result[3] = *p++; + result[4] = *p++; + result[5] = *p++; + result[6] = *p++; + result[7] = *p++; + } } + + bufferWriter.Advance(8); } } @@ -176,8 +210,15 @@ public byte[] Serialize(double data, SerializationContext context) private class ByteArraySerializer : ISerializer { - public byte[] Serialize(byte[] data, SerializationContext context) - => data; + public void Serialize(byte[] data, SerializationContext context, IBufferWriter bufferWriter) + { + if (data != null) + { + var buffer = bufferWriter.GetSpan(data.Length); + data.CopyTo(buffer); + bufferWriter.Advance(data.Length); + } + } } } } diff --git a/src/Confluent.Kafka/SyncOverAsyncSerializer.cs b/src/Confluent.Kafka/SyncOverAsyncSerializer.cs index 19a160351..87f9e7241 100644 --- a/src/Confluent.Kafka/SyncOverAsyncSerializer.cs +++ b/src/Confluent.Kafka/SyncOverAsyncSerializer.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; namespace Confluent.Kafka.SyncOverAsync { @@ -66,11 +67,11 @@ public SyncOverAsyncSerializer(IAsyncSerializer asyncSerializer) /// /// Context relevant to the serialize operation. /// - /// - /// the serialized data. - /// - public byte[] Serialize(T data, SerializationContext context) - => asyncSerializer.SerializeAsync(data, context) + /// + /// The to serialize the binary representation to. + /// + public void Serialize(T data, SerializationContext context, IBufferWriter bufferWriter) + => asyncSerializer.SerializeAsync(data, context, bufferWriter) .ConfigureAwait(continueOnCapturedContext: false) .GetAwaiter() .GetResult(); diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs index b1d4c5058..5b36ebb33 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -133,11 +134,14 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer /// /// Context relevant to the serialize operation. /// + /// + /// The to serialize the binary representation to. + /// /// /// A that completes with /// serialized as a byte array. /// - public async Task SerializeAsync(T value, SerializationContext context) + public async Task SerializeAsync(T value, SerializationContext context, IBufferWriter bufferWriter) { try { @@ -148,7 +152,7 @@ public async Task SerializeAsync(T value, SerializationContext context) : new SpecificSerializerImpl(schemaRegistryClient, autoRegisterSchema, useLatestVersion, initialBufferSize, subjectNameStrategy); } - return await serializerImpl.Serialize(context.Topic, value, context.Component == MessageComponentType.Key).ConfigureAwait(continueOnCapturedContext: false); + await serializerImpl.Serialize(context.Topic, value, context.Component == MessageComponentType.Key, bufferWriter).ConfigureAwait(continueOnCapturedContext: false); } catch (AggregateException e) { diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs index d94e1e9a2..b73695230 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs @@ -26,7 +26,7 @@ using Confluent.Kafka; using Avro.Generic; using Avro.IO; - +using System.Buffers; namespace Confluent.SchemaRegistry.Serdes { @@ -73,10 +73,13 @@ public GenericSerializerImpl( /// /// whether or not the data represents a message key. /// + /// + /// The to serialize the binary representation to. + /// /// /// serialized as a byte array. /// - public async Task Serialize(string topic, GenericRecord data, bool isKey) + public async Task Serialize(string topic, GenericRecord data, bool isKey, IBufferWriter bufferWriter) { try { @@ -176,14 +179,13 @@ public async Task Serialize(string topic, GenericRecord data, bool isKey serializeMutex.Release(); } - using (var stream = new MemoryStream(initialBufferSize)) + using (var stream = bufferWriter.AsStream()) using (var writer = new BinaryWriter(stream)) { stream.WriteByte(Constants.MagicByte); writer.Write(IPAddress.HostToNetworkOrder(schemaId)); new GenericWriter(writerSchema) .Write(data, new BinaryEncoder(stream)); - return stream.ToArray(); } } catch (AggregateException e) diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs index cd0c22279..83f98fba4 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs @@ -14,6 +14,7 @@ // // Refer to LICENSE for more information. +using System.Buffers; using System.Threading.Tasks; @@ -21,6 +22,6 @@ namespace Confluent.SchemaRegistry.Serdes { internal interface IAvroSerializerImpl { - Task Serialize(string topic, T data, bool isKey); + Task Serialize(string topic, T data, bool isKey, IBufferWriter bufferWriter); } } diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs index cd44c30f5..8dc4f2717 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs @@ -18,6 +18,7 @@ #pragma warning disable CS0618 using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -173,7 +174,7 @@ private static SerializerSchemaData ExtractSchemaData(Type writerType) return serializerSchemaData; } - public async Task Serialize(string topic, T data, bool isKey) + public async Task Serialize(string topic, T data, bool isKey, IBufferWriter bufferWriter) { try { @@ -237,16 +238,13 @@ public async Task Serialize(string topic, T data, bool isKey) serializeMutex.Release(); } - using (var stream = new MemoryStream(initialBufferSize)) + using (var stream = bufferWriter.AsStream()) using (var writer = new BinaryWriter(stream)) { stream.WriteByte(Constants.MagicByte); writer.Write(IPAddress.HostToNetworkOrder(currentSchemaData.WriterSchemaId.Value)); currentSchemaData.AvroWriter.Write(data, new BinaryEncoder(stream)); - - // TODO: maybe change the ISerializer interface so that this copy isn't necessary. - return stream.ToArray(); } } catch (AggregateException e) diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs index 88a91e685..e216e8953 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs @@ -28,7 +28,9 @@ using NJsonSchema.Generation; using NJsonSchema.Validation; using Confluent.Kafka; - +using System.Buffers; +using System.Text; +using System.Runtime.InteropServices; namespace Confluent.SchemaRegistry.Serdes { @@ -139,13 +141,16 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer /// /// Context relevant to the serialize operation. /// + /// + /// The to serialize the binary representation to. + /// /// /// A that completes with /// serialized as a byte array. /// - public async Task SerializeAsync(T value, SerializationContext context) + public async Task SerializeAsync(T value, SerializationContext context, IBufferWriter bufferWriter) { - if (value == null) { return null; } + if (value == null) { return; } var serializedString = Newtonsoft.Json.JsonConvert.SerializeObject(value, this.jsonSchemaGeneratorSettings?.ActualSerializerSettings); var validationResult = validator.Validate(serializedString, this.schema); @@ -196,15 +201,19 @@ public async Task SerializeAsync(T value, SerializationContext context) { serializeMutex.Release(); } - - using (var stream = new MemoryStream(initialBufferSize)) - using (var writer = new BinaryWriter(stream)) - { - stream.WriteByte(Constants.MagicByte); - writer.Write(IPAddress.HostToNetworkOrder(schemaId.Value)); - writer.Write(System.Text.Encoding.UTF8.GetBytes(serializedString)); - return stream.ToArray(); - } + + var targetSize = 1 + 4 + Encoding.UTF8.GetByteCount(serializedString); + var buffer = bufferWriter.GetMemory(targetSize); + buffer.Span[0] = Constants.MagicByte; + + var schemaIdNumber = IPAddress.HostToNetworkOrder(schemaId.Value); + buffer.Span[1] = (byte)schemaIdNumber; + buffer.Span[2] = (byte)(schemaIdNumber >> 8); + buffer.Span[3] = (byte)(schemaIdNumber >> 16); + buffer.Span[4] = (byte)(schemaIdNumber >> 24); + + bufferWriter.Advance(5); + Serializers.Utf8.Serialize(serializedString, SerializationContext.Empty, bufferWriter); } catch (AggregateException e) { diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs index e535ad8df..47be46b0c 100644 --- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs +++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs @@ -18,6 +18,7 @@ #pragma warning disable CS0618 using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -31,25 +32,26 @@ namespace Confluent.SchemaRegistry.Serdes { - /// - /// Protobuf Serializer. - /// - /// - /// Serialization format: - /// byte 0: A magic byte that identifies this as a message with - /// Confluent Platform framing. - /// bytes 1-4: Unique global id of the Protobuf schema that was used - /// for encoding (as registered in Confluent Schema Registry), - /// big endian. - /// following bytes: 1. A size-prefixed array of indices that identify the - /// specific message type in the schema (a given schema - /// can contain many message types and they can be nested). - /// Size and indices are unsigned varints. The common case - /// where the message type is the first message in the - /// schema (i.e. index data would be [1,0]) is encoded as - /// a single 0 byte as an optimization. - /// 2. The protobuf serialized data. - /// + +/// +/// Protobuf Serializer. +/// +/// +/// Serialization format: +/// byte 0: A magic byte that identifies this as a message with +/// Confluent Platform framing. +/// bytes 1-4: Unique global id of the Protobuf schema that was used +/// for encoding (as registered in Confluent Schema Registry), +/// big endian. +/// following bytes: 1. A size-prefixed array of indices that identify the +/// specific message type in the schema (a given schema +/// can contain many message types and they can be nested). +/// Size and indices are unsigned varints. The common case +/// where the message type is the first message in the +/// schema (i.e. index data would be [1,0]) is encoded as +/// a single 0 byte as an optimization. +/// 2. The protobuf serialized data. +/// public class ProtobufSerializer : IAsyncSerializer where T : IMessage, new() { private const int DefaultInitialBufferSize = 1024; @@ -242,13 +244,16 @@ private async Task> RegisterOrGetReferences(FileDescriptor /// /// Context relevant to the serialize operation. /// + /// + /// The to serialize the binary representation to. + /// /// /// A that completes with /// serialized as a byte array. /// - public async Task SerializeAsync(T value, SerializationContext context) + public async Task SerializeAsync(T value, SerializationContext context, IBufferWriter bufferWriter) { - if (value == null) { return null; } + if (value == null) { return; } try { @@ -307,14 +312,23 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema serializeMutex.Release(); } - using (var stream = new MemoryStream(initialBufferSize)) - using (var writer = new BinaryWriter(stream)) + + var targetSize = 1 + 4 + this.indexArray.Length; + var buffer = bufferWriter.GetMemory(targetSize); + buffer.Span[0] = Constants.MagicByte; + + var schemaIdNumber = IPAddress.HostToNetworkOrder(schemaId.Value); + buffer.Span[1] = (byte)schemaIdNumber; + buffer.Span[2] = (byte)(schemaIdNumber >> 8); + buffer.Span[3] = (byte)(schemaIdNumber >> 16); + buffer.Span[4] = (byte)(schemaIdNumber >> 24); + this.indexArray.CopyTo(buffer.Span.Slice(start: 5)); + + bufferWriter.Advance(5 + this.indexArray.Length); + + using (var stream = bufferWriter.AsStream()) { - stream.WriteByte(Constants.MagicByte); - writer.Write(IPAddress.HostToNetworkOrder(schemaId.Value)); - writer.Write(this.indexArray); value.WriteTo(stream); - return stream.ToArray(); } } catch (AggregateException e) diff --git a/test/Confluent.Kafka.IntegrationTests/Serdes.cs b/test/Confluent.Kafka.IntegrationTests/Serdes.cs index e5d61c95b..3ebd130b3 100644 --- a/test/Confluent.Kafka.IntegrationTests/Serdes.cs +++ b/test/Confluent.Kafka.IntegrationTests/Serdes.cs @@ -14,6 +14,7 @@ // // Refer to LICENSE for more information. +using System.Buffers; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; @@ -23,10 +24,10 @@ namespace Confluent.Kafka.IntegrationTests { class SimpleAsyncSerializer : IAsyncSerializer { - public async Task SerializeAsync(string data, SerializationContext context) + public async Task SerializeAsync(string data, SerializationContext context, IBufferWriter bufferWriter) { await Task.Delay(500).ConfigureAwait(false); - return Serializers.Utf8.Serialize(data, context); + Serializers.Utf8.Serialize(data, context, bufferWriter); } public ISerializer SyncOverAsync() @@ -37,10 +38,10 @@ public ISerializer SyncOverAsync() class SimpleSyncSerializer : ISerializer { - public byte[] Serialize(string data, SerializationContext context) + public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter) { Thread.Sleep(500); - return Serializers.Utf8.Serialize(data, context); + Serializers.Utf8.Serialize(data, context, bufferWriter); } } } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs index a2dea1084..1a058c2ed 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs @@ -47,7 +47,7 @@ public void AssignPastEnd(string bootstrapServers) DeliveryResult dr; using (var producer = new ProducerBuilder(producerConfig).Build()) { - dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize(testString, SerializationContext.Empty) }).Result; + dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray(testString, SerializationContext.Empty) }).Result; Assert.True(dr.Offset >= 0); producer.Flush(TimeSpan.FromSeconds(10)); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs index 7ba394eff..74f665219 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs @@ -20,7 +20,7 @@ using System.Text; using System.Collections.Generic; using Xunit; - +using System.Buffers; namespace Confluent.Kafka.IntegrationTests { @@ -35,9 +35,11 @@ class MyProducerBuilder : ProducerBuilder { class Utf32Serializer : ISerializer { - public byte[] Serialize(string data, SerializationContext context) + public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter) { - return Encoding.UTF32.GetBytes(data); + var size = Encoding.UTF32.GetByteCount(data); + var buffer = bufferWriter.GetSpan(size); + Encoding.UTF32.GetBytes(data).CopyTo(buffer); } } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs index 3c33cf62c..52db0e812 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs @@ -48,7 +48,7 @@ public void Consumer_DisableHeaders(string bootstrapServers) singlePartitionTopic, new Message { - Value = Serializers.Utf8.Serialize("my-value", SerializationContext.Empty), + Value = Serializers.Utf8.ToByteArray("my-value", SerializationContext.Empty), Headers = new Headers() { new Header("my-header", new byte[] { 42 }) } } ).Result; diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs index 4aaafd7b3..09476fb6f 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs @@ -48,7 +48,7 @@ public void Consumer_DisableTimestamps(string bootstrapServers) singlePartitionTopic, new Message { - Value = Serializers.Utf8.Serialize("my-value", SerializationContext.Empty), + Value = Serializers.Utf8.ToByteArray("my-value", SerializationContext.Empty), Headers = new Headers() { new Header("my-header", new byte[] { 42 }) } } ).Result; diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs index 3166b1170..23f0fe5e7 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs @@ -130,8 +130,8 @@ private static DeliveryResult[] ProduceMessages(string bootstrap new TopicPartition(topic, partition), new Message { - Key = Serializers.Utf8.Serialize($"test key {index}", SerializationContext.Empty), - Value = Serializers.Utf8.Serialize($"test val {index}", SerializationContext.Empty), + Key = Serializers.Utf8.ToByteArray($"test key {index}", SerializationContext.Empty), + Value = Serializers.Utf8.ToByteArray($"test val {index}", SerializationContext.Empty), Timestamp = new Timestamp(baseTime + index*1000, TimestampType.CreateTime), Headers = null } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs index 45015d5eb..012748060 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs @@ -61,13 +61,13 @@ public void Consumer_Pause_Resume(string bootstrapServers) ConsumeResult record = consumer.Consume(TimeSpan.FromSeconds(2)); Assert.Null(record); - producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.Serialize("test value", SerializationContext.Empty) }).Wait(); + producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.ToByteArray("test value", SerializationContext.Empty) }).Wait(); record = consumer.Consume(TimeSpan.FromSeconds(10)); Assert.NotNull(record?.Message); Assert.Equal(0, record?.Offset); consumer.Pause(assignment); - producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.Serialize("test value 2", SerializationContext.Empty) }).Wait(); + producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.ToByteArray("test value 2", SerializationContext.Empty) }).Wait(); record = consumer.Consume(TimeSpan.FromSeconds(2)); Assert.Null(record); consumer.Resume(assignment); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs index bbd556cdb..3fada93d9 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs @@ -49,9 +49,9 @@ public void Consumer_Seek(string bootstrapServers) .Build()) { const string checkValue = "check value"; - var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize(checkValue, SerializationContext.Empty) }).Result; - var dr2 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("second value", SerializationContext.Empty) }).Result; - var dr3 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("third value", SerializationContext.Empty) }).Result; + var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray(checkValue, SerializationContext.Empty) }).Result; + var dr2 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("second value", SerializationContext.Empty) }).Result; + var dr3 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("third value", SerializationContext.Empty) }).Result; consumer.Assign(new TopicPartitionOffset[] { new TopicPartitionOffset(singlePartitionTopic, 0, dr.Offset) }); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs index 00f328731..098544490 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs @@ -64,7 +64,7 @@ public void Consumer_StoreOffsets(string bootstrapServers) ConsumeResult record = consumer.Consume(TimeSpan.FromSeconds(10)); Assert.Null(record); - producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test store offset value", SerializationContext.Empty) }).Wait(); + producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test store offset value", SerializationContext.Empty) }).Wait(); record = consumer.Consume(TimeSpan.FromSeconds(10)); Assert.NotNull(record?.Message); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs b/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs index 61c097da2..0295c8af2 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs @@ -52,7 +52,7 @@ public void DuplicateConsumerAssign(string bootstrapServers) DeliveryResult dr; using (var producer = new ProducerBuilder(producerConfig).Build()) { - dr = producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.Serialize(testString, SerializationContext.Empty) }).Result; + dr = producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.ToByteArray(testString, SerializationContext.Empty) }).Result; Assert.NotNull(dr); producer.Flush(TimeSpan.FromSeconds(10)); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs b/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs index cb0264bcf..091a6d1f0 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs @@ -40,7 +40,7 @@ public void GarbageCollect(string bootstrapServers) using (var producer = new ProducerBuilder(producerConfig).Build()) { - producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test string", SerializationContext.Empty) }).Wait(); + producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test string", SerializationContext.Empty) }).Wait(); } using (var consumer = new ConsumerBuilder(consumerConfig).Build()) diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs index 0bc8c9960..afd017b6d 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs @@ -17,6 +17,7 @@ #pragma warning disable xUnit1026 using System; +using System.Buffers; using System.Text; using System.Threading.Tasks; using Xunit; @@ -29,11 +30,17 @@ public partial class Tests class TestSerializer : IAsyncSerializer { - public Task SerializeAsync(string data, SerializationContext context) + public Task SerializeAsync(string data, SerializationContext context, IBufferWriter bufferWriter) { Assert.NotNull(context.Headers); context.Headers.Add("test_header", new byte[] { 100, 42 }); - return Task.FromResult(Encoding.UTF8.GetBytes("test_value")); + + var bytes = Encoding.UTF8.GetBytes("test_value"); + var buffer = bufferWriter.GetSpan(bytes.Length); + bytes.CopyTo(buffer); + bufferWriter.Advance(bytes.Length); + + return Task.CompletedTask; } } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs b/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs index 055baf8ff..639e2e5a8 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs @@ -60,7 +60,7 @@ public void LogDelegate(string bootstrapServers) .SetLogHandler((_, m) => logCount += 1) .Build()) { - dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test value", SerializationContext.Empty) }).Result; + dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test value", SerializationContext.Empty) }).Result; producer.Flush(TimeSpan.FromSeconds(10)); } Assert.True(logCount > 0); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs b/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs index bea52fa0d..252d32c73 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs @@ -46,7 +46,7 @@ public void OnPartitionsAssignedNotSet(string bootstrapServers) // Producing onto the topic to make sure it exists. using (var producer = new ProducerBuilder(producerConfig).Build()) { - var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test string", SerializationContext.Empty) }).Result; + var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test string", SerializationContext.Empty) }).Result; Assert.NotEqual(Offset.Unset, dr.Offset); producer.Flush(TimeSpan.FromSeconds(10)); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/SerializationExtensions.cs b/test/Confluent.Kafka.IntegrationTests/Tests/SerializationExtensions.cs new file mode 100644 index 000000000..a52aea71b --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/SerializationExtensions.cs @@ -0,0 +1,33 @@ +// Copyright 2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +#pragma warning disable xUnit1026 + + +namespace Confluent.Kafka.IntegrationTests +{ + public static class SerializationExtensions + { + public static byte[] ToByteArray(this ISerializer serializer, T value, SerializationContext serializationContext) + { + using (var buffer = DefaultSerializationBufferProvider.Instance.Create()) + { + serializer.Serialize(value, serializationContext, buffer); + return buffer.GetComitted().ToArray(); + } + } + } +} diff --git a/test/Confluent.Kafka.SyncOverAsync/Program.cs b/test/Confluent.Kafka.SyncOverAsync/Program.cs index 7898afe31..4a69a0008 100644 --- a/test/Confluent.Kafka.SyncOverAsync/Program.cs +++ b/test/Confluent.Kafka.SyncOverAsync/Program.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -33,10 +34,10 @@ namespace Confluent.Kafka.SyncOverAsync { class SimpleAsyncSerializer : IAsyncSerializer { - public async Task SerializeAsync(string data, SerializationContext context) + public async Task SerializeAsync(string data, SerializationContext context, IBufferWriter bufferWriter) { await Task.Delay(500); - return Serializers.Utf8.Serialize(data, context); + Serializers.Utf8.Serialize(data, context, bufferWriter); } public ISerializer SyncOverAsync() @@ -47,10 +48,10 @@ public ISerializer SyncOverAsync() class SimpleSyncSerializer : ISerializer { - public byte[] Serialize(string data, SerializationContext context) + public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter) { Thread.Sleep(500); - return Serializers.Utf8.Serialize(data, context); + Serializers.Utf8.Serialize(data, context, bufferWriter); } } diff --git a/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs b/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs index 3acf442b9..69c8aaaa0 100644 --- a/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs +++ b/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs @@ -27,13 +27,13 @@ public class ByteArrayTests [InlineData(new byte[] { 1, 2, 3, 4, 5 })] public void CanReconstructByteArray(byte[] values) { - Assert.Equal(values, Deserializers.ByteArray.Deserialize(Serializers.ByteArray.Serialize(values, SerializationContext.Empty), false, SerializationContext.Empty)); + Assert.Equal(values, Deserializers.ByteArray.Deserialize(Serializers.ByteArray.ToByteArray(values, SerializationContext.Empty), false, SerializationContext.Empty)); } [Fact] public void CanReconstructByteArrayNull() { - Assert.Null(Deserializers.ByteArray.Deserialize(Serializers.ByteArray.Serialize(null, SerializationContext.Empty), true, SerializationContext.Empty)); + Assert.Null(Deserializers.ByteArray.Deserialize(Serializers.ByteArray.ToByteArray(null, SerializationContext.Empty), true, SerializationContext.Empty)); } } } diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Double.cs b/test/Confluent.Kafka.UnitTests/Serialization/Double.cs index 6581154a1..e2820d2f1 100644 --- a/test/Confluent.Kafka.UnitTests/Serialization/Double.cs +++ b/test/Confluent.Kafka.UnitTests/Serialization/Double.cs @@ -28,7 +28,7 @@ public void CanReconstructDouble() { foreach (var value in TestData) { - Assert.Equal(value, Deserializers.Double.Deserialize(Serializers.Double.Serialize(value, SerializationContext.Empty), false, SerializationContext.Empty)); + Assert.Equal(value, Deserializers.Double.Deserialize(Serializers.Double.ToByteArray(value, SerializationContext.Empty), false, SerializationContext.Empty)); } } @@ -37,7 +37,7 @@ public void IsBigEndian() { var buffer = new byte[] { 23, 0, 0, 0, 0, 0, 0, 0 }; var value = BitConverter.ToDouble(buffer, 0); - var data = Serializers.Double.Serialize(value, SerializationContext.Empty); + var data = Serializers.Double.ToByteArray(value, SerializationContext.Empty); Assert.Equal(23, data[7]); Assert.Equal(0, data[0]); } diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Float.cs b/test/Confluent.Kafka.UnitTests/Serialization/Float.cs index 00168a5db..401f4df67 100644 --- a/test/Confluent.Kafka.UnitTests/Serialization/Float.cs +++ b/test/Confluent.Kafka.UnitTests/Serialization/Float.cs @@ -28,7 +28,7 @@ public void CanReconstructFloat() { foreach (var value in TestData) { - Assert.Equal(value, Deserializers.Single.Deserialize(Serializers.Single.Serialize(value, SerializationContext.Empty), false, SerializationContext.Empty)); + Assert.Equal(value, Deserializers.Single.Deserialize(Serializers.Single.ToByteArray(value, SerializationContext.Empty), false, SerializationContext.Empty)); } } @@ -37,7 +37,7 @@ public void IsBigEndian() { var buffer = new byte[] { 23, 0, 0, 0 }; var value = BitConverter.ToSingle(buffer, 0); - var data = Serializers.Single.Serialize(value, SerializationContext.Empty); + var data = Serializers.Single.ToByteArray(value, SerializationContext.Empty); Assert.Equal(23, data[3]); Assert.Equal(0, data[0]); } diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Int.cs b/test/Confluent.Kafka.UnitTests/Serialization/Int.cs index a76e85538..b1ad4d214 100644 --- a/test/Confluent.Kafka.UnitTests/Serialization/Int.cs +++ b/test/Confluent.Kafka.UnitTests/Serialization/Int.cs @@ -34,7 +34,7 @@ public class IntTests [Fact] public void IsBigEndian() { - var bytes = Serializers.Int32.Serialize(42, SerializationContext.Empty); + var bytes = Serializers.Int32.ToByteArray(42, SerializationContext.Empty); Assert.Equal(4, bytes.Length); // most significant byte in smallest address. Assert.Equal(0, bytes[0]); @@ -49,7 +49,7 @@ public void SerializationAgreesWithSystemNetHostToNetworkOrder() int networkOrder = System.Net.IPAddress.HostToNetworkOrder(theInt); var bytes1 = BitConverter.GetBytes(networkOrder); - var bytes2 = Serializers.Int32.Serialize(theInt, SerializationContext.Empty); + var bytes2 = Serializers.Int32.ToByteArray(theInt, SerializationContext.Empty); Assert.Equal(bytes1.Length, bytes2.Length); @@ -65,7 +65,7 @@ public void CanReconstructInt() { foreach (int theInt in toTest) { - var reconstructed = Deserializers.Int32.Deserialize(Serializers.Int32.Serialize(theInt, SerializationContext.Empty), false, SerializationContext.Empty); + var reconstructed = Deserializers.Int32.Deserialize(Serializers.Int32.ToByteArray(theInt, SerializationContext.Empty), false, SerializationContext.Empty); Assert.Equal(theInt, reconstructed); } } diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Long.cs b/test/Confluent.Kafka.UnitTests/Serialization/Long.cs index 755105c71..b291a35c6 100644 --- a/test/Confluent.Kafka.UnitTests/Serialization/Long.cs +++ b/test/Confluent.Kafka.UnitTests/Serialization/Long.cs @@ -26,13 +26,13 @@ public class LongTests [MemberData(nameof(TestData))] public void CanReconstructLong(long value) { - Assert.Equal(value, Deserializers.Int64.Deserialize(Serializers.Int64.Serialize(value, SerializationContext.Empty), false, SerializationContext.Empty)); + Assert.Equal(value, Deserializers.Int64.Deserialize(Serializers.Int64.ToByteArray(value, SerializationContext.Empty), false, SerializationContext.Empty)); } [Fact] public void IsBigEndian() { - var data = Serializers.Int64.Serialize(23L, SerializationContext.Empty); + var data = Serializers.Int64.ToByteArray(23L, SerializationContext.Empty); Assert.Equal(23, data[7]); Assert.Equal(0, data[0]); } diff --git a/test/Confluent.Kafka.UnitTests/Serialization/SerializationExtensions.cs b/test/Confluent.Kafka.UnitTests/Serialization/SerializationExtensions.cs new file mode 100644 index 000000000..06d852cde --- /dev/null +++ b/test/Confluent.Kafka.UnitTests/Serialization/SerializationExtensions.cs @@ -0,0 +1,33 @@ +// Copyright 2016-2017 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Linq; + + +namespace Confluent.Kafka.UnitTests.Serialization +{ + public static class SerializationExtensions + { + public static byte[] ToByteArray(this ISerializer serializer, T value, SerializationContext serializationContext) + { + using (var buffer = DefaultSerializationBufferProvider.Instance.Create()) + { + serializer.Serialize(value, serializationContext, buffer); + return buffer.GetComitted().ToArray(); + } + } + } +} diff --git a/test/Confluent.Kafka.UnitTests/Serialization/String.cs b/test/Confluent.Kafka.UnitTests/Serialization/String.cs index f1f8ff381..9ba76c370 100644 --- a/test/Confluent.Kafka.UnitTests/Serialization/String.cs +++ b/test/Confluent.Kafka.UnitTests/Serialization/String.cs @@ -28,10 +28,10 @@ public class StringTests [Fact] public void SerializeDeserialize() { - Assert.Equal("hello world", Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("hello world", SerializationContext.Empty), false, SerializationContext.Empty)); - Assert.Equal("ឆ្មាត្រូវបានហែលទឹក", Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("ឆ្មាត្រូវបានហែលទឹក", SerializationContext.Empty), false, SerializationContext.Empty)); - Assert.Equal("вы не банан", Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("вы не банан", SerializationContext.Empty), false, SerializationContext.Empty)); - Assert.Null(Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize(null, SerializationContext.Empty), true, SerializationContext.Empty)); + Assert.Equal("hello world", Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray("hello world", SerializationContext.Empty), false, SerializationContext.Empty)); + Assert.Equal("ឆ្មាត្រូវបានហែលទឹក", Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray("ឆ្មាត្រូវបានហែលទឹក", SerializationContext.Empty), false, SerializationContext.Empty)); + Assert.Equal("вы не банан", Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray("вы не банан", SerializationContext.Empty), false, SerializationContext.Empty)); + Assert.Null(Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray(null, SerializationContext.Empty), true, SerializationContext.Empty)); // TODO: check some serialize / deserialize operations that are not expected to work, including some // cases where Deserialize can be expected to throw an exception. diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs index afb3db938..3273ba06d 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs @@ -32,6 +32,41 @@ namespace Confluent.SchemaRegistry.Serdes.UnitTests { + public static class SerializationExtensions + { + public static byte[] ToByteArray(this ISerializer serializer, T value, SerializationContext serializationContext) + { + using (var buffer = DefaultSerializationBufferProvider.Instance.Create()) + { + serializer.Serialize(value, serializationContext, buffer); + var comitted = buffer.GetComitted(); + + if (comitted.Count == 0) + { + return null; + } + + return comitted.ToArray(); + } + } + + public static async Task ToByteArray(this IAsyncSerializer serializer, T value, SerializationContext serializationContext) + { + using (var buffer = DefaultSerializationBufferProvider.Instance.Create()) + { + await serializer.SerializeAsync(value, serializationContext, buffer); + var comitted = buffer.GetComitted(); + + if (comitted.Count == 0) + { + return null; + } + + return comitted.ToArray(); + } + } + } + public class JsonSerializeDeserialzeTests { public class UInt32Value @@ -104,7 +139,7 @@ public void Null() var jsonSerializer = new JsonSerializer(schemaRegistryClient); var jsonDeserializer = new JsonDeserializer(); - var bytes = jsonSerializer.SerializeAsync(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytes = jsonSerializer.ToByteArray(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Null(bytes); Assert.Null(jsonDeserializer.DeserializeAsync(bytes, true, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -117,7 +152,7 @@ public void UInt32SerDe() var jsonDeserializer = new JsonDeserializer(); var v = new UInt32Value { Value = 1234 }; - var bytes = jsonSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytes = jsonSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(v.Value, jsonDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result.Value); } @@ -142,7 +177,7 @@ public async Task WithJsonSerializerSettingsSerDe() var jsonDeserializer = new JsonDeserializer(jsonSchemaGeneratorSettings: jsonSchemaGeneratorSettings); var v = new UInt32Value { Value = value }; - var bytes = await jsonSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic)); + var bytes = await jsonSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic)); Assert.NotNull(bytes); Assert.Equal(expectedJson, Encoding.UTF8.GetString(bytes.AsSpan().Slice(5))); @@ -166,7 +201,7 @@ public async Task WithJsonSchemaGeneratorSettingsSerDe(EnumHandling enumHandling var jsonDeserializer = new JsonDeserializer(jsonSchemaGeneratorSettings: jsonSchemaGeneratorSettings); var v = new EnumObject { Value = value }; - var bytes = await jsonSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic)); + var bytes = await jsonSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic)); Assert.NotNull(bytes); Assert.Equal(expectedJson, Encoding.UTF8.GetString(bytes.AsSpan().Slice(5))); diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs index ff3c68c55..724efd1b2 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs @@ -52,7 +52,7 @@ public void Null() var protoSerializer = new ProtobufSerializer(schemaRegistryClient); var protoDeserializer = new ProtobufDeserializer(); - var bytes = protoSerializer.SerializeAsync(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytes = protoSerializer.ToByteArray(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Null(bytes); Assert.Null(protoDeserializer.DeserializeAsync(bytes, true, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -64,7 +64,7 @@ public void UInt32SerDe() var protoDeserializer = new ProtobufDeserializer(); var v = new UInt32Value { Value = 1234 }; - var bytes = protoSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytes = protoSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(v.Value, protoDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result.Value); } diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs index 9f083a4b4..836a6334d 100644 --- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs +++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs @@ -54,7 +54,7 @@ public void IntSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync(1234, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray(1234, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(1234, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -64,7 +64,7 @@ public void LongSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync(123, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray(123, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(123, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -74,7 +74,7 @@ public void BoolSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync(true, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray(true, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(true, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -84,7 +84,7 @@ public void StringSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync("abc", new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray("abc", new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal("abc", avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -94,7 +94,7 @@ public void DoubleSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync(123d, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray(123d, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(123d, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -104,7 +104,7 @@ public void FloatSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync(123f, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray(123f, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(123f, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -114,7 +114,7 @@ public void BytesSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync(new byte[] { 2, 3, 4 }, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray(new byte[] { 2, 3, 4 }, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(new byte[] { 2, 3, 4 }, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -124,7 +124,7 @@ public void NullSerDe() var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); byte[] bytes; - bytes = avroSerializer.SerializeAsync(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + bytes = avroSerializer.ToByteArray(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(null, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } @@ -141,7 +141,7 @@ public void ISpecificRecord() name = "awesome" }; - var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytes = serializer.ToByteArray(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result; var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Equal(user.name, result.name); @@ -169,7 +169,7 @@ public void Multiple_ISpecificRecords() name = "great_brand" }; - var bytesUser = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytesUser = serializer.ToByteArray(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result; var resultUser = deserializerUser.DeserializeAsync(bytesUser, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result as User; Assert.NotNull(resultUser); @@ -177,7 +177,7 @@ public void Multiple_ISpecificRecords() Assert.Equal(user.favorite_color, resultUser.favorite_color); Assert.Equal(user.favorite_number, resultUser.favorite_number); - var bytesCar = serializer.SerializeAsync(car, new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytesCar = serializer.ToByteArray(car, new SerializationContext(MessageComponentType.Value, testTopic)).Result; var resultCar = deserializerCar.DeserializeAsync(bytesCar, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result as Car; Assert.NotNull(resultCar); @@ -189,7 +189,7 @@ public void Multiple_ISpecificRecords() public void Poco_Serialize() { var serializer = new AvroSerializer>(schemaRegistryClient); - Assert.Throws(() => serializer.SerializeAsync(new Dictionary { { "cat", "dog" } }, new SerializationContext(MessageComponentType.Key, testTopic)).GetAwaiter().GetResult()); + Assert.Throws(() => serializer.ToByteArray(new Dictionary { { "cat", "dog" } }, new SerializationContext(MessageComponentType.Key, testTopic)).GetAwaiter().GetResult()); } [Fact] @@ -204,7 +204,7 @@ public void Incompatible() { var avroSerializer = new AvroSerializer(schemaRegistryClient); var avroDeserializer = new AvroDeserializer(schemaRegistryClient); - var bytes = avroSerializer.SerializeAsync("hello world", new SerializationContext(MessageComponentType.Value, testTopic)).Result; + var bytes = avroSerializer.ToByteArray("hello world", new SerializationContext(MessageComponentType.Value, testTopic)).Result; Assert.Throws(() => avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result); } }