diff --git a/src/Confluent.Kafka/ArrayPoolBufferWriter.cs b/src/Confluent.Kafka/ArrayPoolBufferWriter.cs
new file mode 100644
index 000000000..1e1369343
--- /dev/null
+++ b/src/Confluent.Kafka/ArrayPoolBufferWriter.cs
@@ -0,0 +1,135 @@
+// Copyright 2016-2018 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System;
+using System.Buffers;
+
+
+namespace Confluent.Kafka
+{
+ ///
+ /// Implements a using for memory allocation.
+ ///
+ internal sealed class ArrayPoolBufferWriter : ISerializationBuffer
+ {
+ ///
+ /// The default buffer size to use to expand empty arrays.
+ ///
+ private const int DefaultInitialBufferSize = 256;
+
+ private readonly ArrayPool pool;
+ private int index;
+ private byte[] array;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public ArrayPoolBufferWriter()
+ {
+ this.pool = ArrayPool.Shared;
+ this.array = pool.Rent(DefaultInitialBufferSize);
+ this.index = 0;
+ }
+
+ ///
+ /// Returns the memory allocation on finialization if not explicitly disposed in user code.
+ ///
+ ~ArrayPoolBufferWriter() => Dispose();
+
+ ///
+ public void Advance(int count)
+ {
+ byte[] array = this.array;
+
+ if (array is null)
+ {
+ throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter));
+ }
+
+ if (count < 0)
+ {
+ throw new ArgumentOutOfRangeException("Count must be greater than 0");
+ }
+
+ if (this.index > array.Length - count)
+ {
+ throw new ArgumentOutOfRangeException("Cannot advance further than current capacity");
+ }
+
+ this.index += count;
+ }
+
+ ///
+ public void Dispose()
+ {
+ if (this.array != null)
+ {
+ this.pool.Return(this.array, true);
+ this.array = null;
+ }
+ }
+
+
+ public ArraySegment GetComitted(int offset = 0)
+ {
+ return new ArraySegment(this.array, offset, this.index);
+ }
+
+ ///
+ public Memory GetMemory(int sizeHint = 0)
+ {
+ EnsureCapacity(sizeHint);
+ return this.array.AsMemory(this.index);
+ }
+
+ ///
+ public Span GetSpan(int sizeHint = 0)
+ {
+ EnsureCapacity(sizeHint);
+ return this.array.AsSpan(this.index);
+ }
+
+ private void EnsureCapacity(int sizeHint)
+ {
+ var array = this.array;
+
+ if (array is null)
+ {
+ throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter));
+ }
+
+ if (sizeHint < 0)
+ {
+ throw new ArgumentOutOfRangeException("Cannot advance further than current capacity");
+ }
+
+ if (sizeHint == 0)
+ {
+ sizeHint = 1;
+ }
+
+ if (sizeHint > array.Length - this.index)
+ {
+ int minimumSize = this.index + sizeHint;
+ var newArray = pool.Rent(minimumSize);
+
+ Array.Copy(array, 0, newArray, 0, this.index);
+ pool.Return(array, true);
+ array = newArray;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Confluent.Kafka/BufferWriterExtensions.cs b/src/Confluent.Kafka/BufferWriterExtensions.cs
new file mode 100644
index 000000000..f7f1b9450
--- /dev/null
+++ b/src/Confluent.Kafka/BufferWriterExtensions.cs
@@ -0,0 +1,45 @@
+// Copyright 2016-2018 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System;
+using System.Buffers;
+using System.IO;
+
+
+namespace Confluent.Kafka
+{
+ ///
+ /// Extends a with a adapter.
+ ///
+ public static class BufferWriterExtensions
+ {
+ ///
+ /// Gets a adapting implementation working on a as underlying memory.
+ ///
+ /// The used for underlying memory.
+ ///
+ /// Thrown if the provises is null.
+ public static Stream AsStream(this IBufferWriter bufferWriter)
+ {
+ if (bufferWriter is null)
+ {
+ throw new ArgumentNullException(nameof(bufferWriter));
+ }
+
+ return new BufferWriterStream(bufferWriter);
+ }
+ }
+}
diff --git a/src/Confluent.Kafka/BufferWriterStream.cs b/src/Confluent.Kafka/BufferWriterStream.cs
new file mode 100644
index 000000000..52bf29774
--- /dev/null
+++ b/src/Confluent.Kafka/BufferWriterStream.cs
@@ -0,0 +1,158 @@
+// Copyright 2016-2018 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System;
+using System.Buffers;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+
+namespace Confluent.Kafka
+{
+ ///
+ /// A implementation wrapping an instance.
+ ///
+ internal sealed class BufferWriterStream : Stream
+ {
+ private readonly IBufferWriter bufferWriter;
+ private bool disposed;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The target instance to use.
+ public BufferWriterStream(IBufferWriter bufferWriter)
+ {
+ this.bufferWriter = bufferWriter ?? throw new ArgumentNullException(nameof(bufferWriter));
+ }
+
+ ///
+ public override bool CanRead => false;
+
+ ///
+ public override bool CanSeek => false;
+
+ ///
+ public override bool CanWrite
+ {
+ get => !this.disposed;
+ }
+
+ ///
+ public override long Length => throw new NotSupportedException();
+
+ ///
+ public override long Position
+ {
+ get => throw new NotSupportedException();
+ set => throw new NotSupportedException();
+ }
+
+ ///
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override void Flush()
+ {
+ }
+
+ ///
+ public override Task FlushAsync(CancellationToken cancellationToken)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ return Task.FromResult(true);
+ }
+
+ ///
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ Write(buffer, offset, count);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ return Task.FromResult(true);
+ }
+
+ ///
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override int ReadByte()
+ {
+ throw new NotSupportedException();
+ }
+
+ ///
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(nameof(BufferWriterStream));
+ }
+
+ var source = buffer.AsSpan(offset, count);
+ var destination = this.bufferWriter.GetSpan(count);
+
+ source.CopyTo(destination);
+
+ this.bufferWriter.Advance(count);
+ }
+
+ ///
+ public override void WriteByte(byte value)
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(nameof(BufferWriterStream));
+ }
+
+ this.bufferWriter.GetSpan(1)[0] = value;
+
+ this.bufferWriter.Advance(1);
+ }
+
+ ///
+ protected override void Dispose(bool disposing)
+ {
+ this.disposed = true;
+ }
+ }
+}
diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj
index 78cbf20fd..9bf6faec8 100755
--- a/src/Confluent.Kafka/Confluent.Kafka.csproj
+++ b/src/Confluent.Kafka/Confluent.Kafka.csproj
@@ -24,6 +24,7 @@
None
+
diff --git a/src/Confluent.Kafka/DefaultSerializationBufferProvider.cs b/src/Confluent.Kafka/DefaultSerializationBufferProvider.cs
new file mode 100644
index 000000000..e6e057292
--- /dev/null
+++ b/src/Confluent.Kafka/DefaultSerializationBufferProvider.cs
@@ -0,0 +1,43 @@
+// Copyright 2018 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System.Buffers;
+
+
+namespace Confluent.Kafka
+{
+ ///
+ /// The default . Creates buffers using internal memory.
+ ///
+ public sealed class DefaultSerializationBufferProvider : ISerializationBufferProvider
+ {
+ ///
+ /// Gets a singleton instance of the
+ ///
+ public static DefaultSerializationBufferProvider Instance { get; } = new DefaultSerializationBufferProvider();
+
+ private DefaultSerializationBufferProvider()
+ {
+
+ }
+
+ ///
+ public ISerializationBuffer Create()
+ {
+ return new ArrayPoolBufferWriter();
+ }
+ }
+}
diff --git a/src/Confluent.Kafka/DependentProducerBuilder.cs b/src/Confluent.Kafka/DependentProducerBuilder.cs
index d152ca652..ef0aa723f 100644
--- a/src/Confluent.Kafka/DependentProducerBuilder.cs
+++ b/src/Confluent.Kafka/DependentProducerBuilder.cs
@@ -54,6 +54,10 @@ public class DependentProducerBuilder
///
public IAsyncSerializer AsyncValueSerializer { get; set; }
+ ///
+ /// The configured serialization buffer provider.
+ ///
+ public ISerializationBufferProvider SerializationBufferProvider { get; set; }
///
/// An underlying librdkafka client handle that the Producer will use to
@@ -101,6 +105,15 @@ public DependentProducerBuilder SetValueSerializer(IAsyncSerialize
return this;
}
+ ///
+ /// The serialization buffer provider.
+ ///
+ public DependentProducerBuilder SetSerializationBufferProvider(ISerializationBufferProvider serializationBufferProvider)
+ {
+ this.SerializationBufferProvider = serializationBufferProvider;
+ return this;
+ }
+
///
/// Build a new IProducer implementation instance.
///
diff --git a/src/Confluent.Kafka/IAsyncSerializer.cs b/src/Confluent.Kafka/IAsyncSerializer.cs
index 8be1431a7..8cee1c94a 100644
--- a/src/Confluent.Kafka/IAsyncSerializer.cs
+++ b/src/Confluent.Kafka/IAsyncSerializer.cs
@@ -15,6 +15,7 @@
// Refer to LICENSE for more information.
using System;
+using System.Buffers;
using System.Threading.Tasks;
@@ -35,10 +36,13 @@ public interface IAsyncSerializer
///
/// Context relevant to the serialize operation.
///
+ ///
+ /// The to serialize the binary representation to.
+ ///
///
/// A that
/// completes with the serialized data.
///
- Task SerializeAsync(T data, SerializationContext context);
+ Task SerializeAsync(T data, SerializationContext context, IBufferWriter bufferWriter);
}
}
diff --git a/src/Confluent.Kafka/ISerializationBuffer.cs b/src/Confluent.Kafka/ISerializationBuffer.cs
new file mode 100644
index 000000000..8abb07e37
--- /dev/null
+++ b/src/Confluent.Kafka/ISerializationBuffer.cs
@@ -0,0 +1,38 @@
+// Copyright 2018 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System;
+using System.Buffers;
+
+namespace Confluent.Kafka
+{
+ ///
+ /// Defines a buffer used to serialize keys and value. This buffer is disposed when no longer used.
+ ///
+ public interface ISerializationBuffer : IBufferWriter, IDisposable
+ {
+ ///
+ /// Gets an representing the currently comitted memory.
+ ///
+ ///
+ /// An optional offset into the comitted memory.
+ ///
+ ///
+ /// Returns a representing the commited memory with an initial offset as requested.
+ ///
+ ArraySegment GetComitted(int offset = 0);
+ }
+}
diff --git a/src/Confluent.Kafka/ISerializationBufferProvider.cs b/src/Confluent.Kafka/ISerializationBufferProvider.cs
new file mode 100644
index 000000000..cc16a1a0a
--- /dev/null
+++ b/src/Confluent.Kafka/ISerializationBufferProvider.cs
@@ -0,0 +1,36 @@
+// Copyright 2018 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+
+namespace Confluent.Kafka
+{
+ ///
+ /// Defines a factory for creating a used for key and value serialization.
+ ///
+ public interface ISerializationBufferProvider
+ {
+ ///
+ /// Creates a new .
+ ///
+ ///
+ /// Returns a .
+ ///
+ ISerializationBuffer Create();
+ }
+}
diff --git a/src/Confluent.Kafka/ISerializer.cs b/src/Confluent.Kafka/ISerializer.cs
index e404536d8..b23be4113 100644
--- a/src/Confluent.Kafka/ISerializer.cs
+++ b/src/Confluent.Kafka/ISerializer.cs
@@ -15,6 +15,8 @@
// Refer to LICENSE for more information.
+using System.Buffers;
+
namespace Confluent.Kafka
{
///
@@ -32,9 +34,9 @@ public interface ISerializer
///
/// Context relevant to the serialize operation.
///
- ///
- /// The serialized value.
- ///
- byte[] Serialize(T data, SerializationContext context);
+ ///
+ /// The to serialize the binary representation to.
+ ///
+ void Serialize(T data, SerializationContext context, IBufferWriter bufferWriter);
}
}
diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs
index 3d605a774..da3209628 100644
--- a/src/Confluent.Kafka/Producer.cs
+++ b/src/Confluent.Kafka/Producer.cs
@@ -15,7 +15,9 @@
// Refer to LICENSE for more information.
using System;
+using System.Buffers;
using System.Collections.Generic;
+using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
@@ -46,6 +48,7 @@ internal class Config
private ISerializer valueSerializer;
private IAsyncSerializer asyncKeySerializer;
private IAsyncSerializer asyncValueSerializer;
+ private ISerializationBufferProvider serializationBufferProvider;
private static readonly Dictionary defaultSerializers = new Dictionary
{
@@ -279,8 +282,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq
private void ProduceImpl(
string topic,
- byte[] val, int valOffset, int valLength,
- byte[] key, int keyOffset, int keyLength,
+ ArraySegment valueBuffer,
+ ArraySegment keyBuffer,
Timestamp timestamp,
Partition partition,
IEnumerable headers,
@@ -308,8 +311,8 @@ private void ProduceImpl(
err = KafkaHandle.Produce(
topic,
- val, valOffset, valLength,
- key, keyOffset, keyLength,
+ valueBuffer.Array, valueBuffer.Offset, valueBuffer.Count,
+ keyBuffer.Array, keyBuffer.Offset, keyBuffer.Count,
partition.Value,
timestamp.UnixTimestampMs,
headers,
@@ -325,8 +328,8 @@ private void ProduceImpl(
{
err = KafkaHandle.Produce(
topic,
- val, valOffset, valLength,
- key, keyOffset, keyLength,
+ valueBuffer.Array, valueBuffer.Offset, valueBuffer.Count,
+ keyBuffer.Array, keyBuffer.Offset, keyBuffer.Count,
partition.Value,
timestamp.UnixTimestampMs,
headers,
@@ -495,6 +498,18 @@ public Handle Handle
}
}
+ private void InitializeSerializationBufferProvider(ISerializationBufferProvider serializationBufferProvider)
+ {
+ if (serializationBufferProvider == null)
+ {
+ this.serializationBufferProvider = DefaultSerializationBufferProvider.Instance;
+ }
+ else
+ {
+ this.serializationBufferProvider = serializationBufferProvider;
+ }
+ }
+
private void InitializeSerializers(
ISerializer keySerializer,
ISerializer valueSerializer,
@@ -560,6 +575,8 @@ internal Producer(DependentProducerBuilder builder)
InitializeSerializers(
builder.KeySerializer, builder.ValueSerializer,
builder.AsyncKeySerializer, builder.AsyncValueSerializer);
+
+ InitializeSerializationBufferProvider(builder.SerializationBufferProvider);
}
internal Producer(ProducerBuilder builder)
@@ -735,6 +752,8 @@ internal Producer(ProducerBuilder builder)
InitializeSerializers(
builder.KeySerializer, builder.ValueSerializer,
builder.AsyncKeySerializer, builder.AsyncValueSerializer);
+
+ InitializeSerializationBufferProvider(builder.SerializationBufferProvider);
}
@@ -746,95 +765,113 @@ public async Task> ProduceAsync(
{
Headers headers = message.Headers ?? new Headers();
- byte[] keyBytes;
- try
- {
- keyBytes = (keySerializer != null)
- ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
- : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false);
- }
- catch (Exception ex)
+ using (var buffer = this.serializationBufferProvider.Create())
{
- throw new ProduceException(
- new Error(ErrorCode.Local_KeySerialization),
- new DeliveryResult
- {
- Message = message,
- TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
- },
- ex);
- }
+ ArraySegment keySegment;
+ ArraySegment valueSegment;
- byte[] valBytes;
- try
- {
- valBytes = (valueSerializer != null)
- ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
- : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- throw new ProduceException(
- new Error(ErrorCode.Local_ValueSerialization),
- new DeliveryResult
+ try
+ {
+ if (keySerializer != null)
{
- Message = message,
- TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
- },
- ex);
- }
-
- try
- {
- if (enableDeliveryReports)
+ keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers), buffer);
+ }
+ else
+ {
+ await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers), buffer).ConfigureAwait(false);
+ }
+ }
+ catch (Exception ex)
{
- var handler = new TypedTaskDeliveryHandlerShim(
- topicPartition.Topic,
- enableDeliveryReportKey ? message.Key : default(TKey),
- enableDeliveryReportValue ? message.Value : default(TValue));
+ throw new ProduceException(
+ new Error(ErrorCode.Local_KeySerialization),
+ new DeliveryResult
+ {
+ Message = message,
+ TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
+ },
+ ex);
+ }
- if (cancellationToken != null && cancellationToken.CanBeCanceled)
+ keySegment = buffer.GetComitted();
+
+ try
+ {
+ if (valueSerializer != null)
+ {
+ valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers), buffer);
+ }
+ else
{
- handler.CancellationTokenRegistration
- = cancellationToken.Register(() => handler.TrySetCanceled());
+ await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers), buffer).ConfigureAwait(false);
}
+ }
+ catch (Exception ex)
+ {
+ throw new ProduceException(
+ new Error(ErrorCode.Local_ValueSerialization),
+ new DeliveryResult
+ {
+ Message = message,
+ TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
+ },
+ ex);
+ }
- ProduceImpl(
- topicPartition.Topic,
- valBytes, 0, valBytes == null ? 0 : valBytes.Length,
- keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
- message.Timestamp, topicPartition.Partition, headers,
- handler);
+ valueSegment = buffer.GetComitted(offset: keySegment.Count);
- return await handler.Task.ConfigureAwait(false);
- }
- else
+ try
{
- ProduceImpl(
- topicPartition.Topic,
- valBytes, 0, valBytes == null ? 0 : valBytes.Length,
- keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
- message.Timestamp, topicPartition.Partition, headers,
- null);
+ if (enableDeliveryReports)
+ {
+ var handler = new TypedTaskDeliveryHandlerShim(
+ topicPartition.Topic,
+ enableDeliveryReportKey ? message.Key : default(TKey),
+ enableDeliveryReportValue ? message.Value : default(TValue));
+
+ if (cancellationToken != null && cancellationToken.CanBeCanceled)
+ {
+ handler.CancellationTokenRegistration
+ = cancellationToken.Register(() => handler.TrySetCanceled());
+ }
- var result = new DeliveryResult
+ ProduceImpl(
+ topicPartition.Topic,
+ valueSegment,
+ keySegment,
+ message.Timestamp, topicPartition.Partition, headers,
+ handler);
+
+ return await handler.Task.ConfigureAwait(false);
+ }
+ else
{
- TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset),
- Message = message
- };
+ ProduceImpl(
+ topicPartition.Topic,
+ valueSegment,
+ keySegment,
+ message.Timestamp, topicPartition.Partition, headers,
+ null);
+
+ var result = new DeliveryResult
+ {
+ TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset),
+ Message = message
+ };
- return result;
+ return result;
+ }
+ }
+ catch (KafkaException ex)
+ {
+ throw new ProduceException(
+ ex.Error,
+ new DeliveryResult
+ {
+ Message = message,
+ TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
+ });
}
- }
- catch (KafkaException ex)
- {
- throw new ProduceException(
- ex.Error,
- new DeliveryResult
- {
- Message = message,
- TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
- });
}
}
@@ -869,67 +906,85 @@ public void Produce(
Headers headers = message.Headers ?? new Headers();
- byte[] keyBytes;
- try
- {
- keyBytes = (keySerializer != null)
- ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
- : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
- }
- catch (Exception ex)
+ using (var buffer = this.serializationBufferProvider.Create())
{
- throw new ProduceException(
- new Error(ErrorCode.Local_KeySerialization, ex.ToString()),
- new DeliveryResult
+ ArraySegment keySegment;
+ ArraySegment valueSegment;
+
+ try
+ {
+ if (keySerializer != null)
{
- Message = message,
- TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset),
- },
- ex);
- }
+ keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers), buffer);
+ }
+ else
+ {
+ throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
+ }
+ }
+ catch (Exception ex)
+ {
+ throw new ProduceException(
+ new Error(ErrorCode.Local_KeySerialization, ex.ToString()),
+ new DeliveryResult
+ {
+ Message = message,
+ TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset),
+ },
+ ex);
+ }
- byte[] valBytes;
- try
- {
- valBytes = (valueSerializer != null)
- ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
- : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
- }
- catch (Exception ex)
- {
- throw new ProduceException(
- new Error(ErrorCode.Local_ValueSerialization, ex.ToString()),
- new DeliveryResult
+ keySegment = buffer.GetComitted();
+
+ try
+ {
+ if (valueSerializer != null)
{
- Message = message,
- TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset),
- },
- ex);
- }
+ valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers), buffer);
+ }
+ else
+ {
+ throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
+ }
+ }
+ catch (Exception ex)
+ {
+ throw new ProduceException(
+ new Error(ErrorCode.Local_ValueSerialization, ex.ToString()),
+ new DeliveryResult
+ {
+ Message = message,
+ TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset),
+ },
+ ex);
+ }
- try
- {
- ProduceImpl(
- topicPartition.Topic,
- valBytes, 0, valBytes == null ? 0 : valBytes.Length,
- keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
- message.Timestamp, topicPartition.Partition,
- headers,
- new TypedDeliveryHandlerShim_Action(
+ valueSegment = buffer.GetComitted(offset: keySegment.Count);
+
+ try
+ {
+ ProduceImpl(
topicPartition.Topic,
- enableDeliveryReportKey ? message.Key : default(TKey),
- enableDeliveryReportValue ? message.Value : default(TValue),
- deliveryHandler));
- }
- catch (KafkaException ex)
- {
- throw new ProduceException(
- ex.Error,
- new DeliveryReport
+ valueSegment,
+ keySegment,
+ message.Timestamp, topicPartition.Partition,
+ headers,
+ new TypedDeliveryHandlerShim_Action(
+ topicPartition.Topic,
+ enableDeliveryReportKey ? message.Key : default(TKey),
+ enableDeliveryReportValue ? message.Value : default(TValue),
+ deliveryHandler));
+ }
+ catch (KafkaException ex)
+ {
+ throw new ProduceException(
+ ex.Error,
+ new DeliveryReport
{
Message = message,
TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
});
+ }
}
}
diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs
index bb24db3a2..0a9526114 100644
--- a/src/Confluent.Kafka/ProducerBuilder.cs
+++ b/src/Confluent.Kafka/ProducerBuilder.cs
@@ -15,6 +15,7 @@
// Refer to LICENSE for more information.
using System;
+using System.Buffers;
using System.Collections.Generic;
@@ -114,6 +115,11 @@ public class ProducerBuilder
///
internal protected IAsyncSerializer AsyncValueSerializer { get; set; }
+ ///
+ /// The configured serialization buffer provider.
+ ///
+ internal protected ISerializationBufferProvider SerializationBufferProvider { get; set; }
+
internal Producer.Config ConstructBaseConfig(Producer producer)
{
return new Producer.Config
@@ -367,6 +373,20 @@ public ProducerBuilder SetValueSerializer(IAsyncSerializer
return this;
}
+ ///
+ /// The to use to when serializing keys and values.
+ ///
+ public ProducerBuilder SetSerializationBufferProvider(ISerializationBufferProvider serializationBufferProvider)
+ {
+ if (this.SerializationBufferProvider != null)
+ {
+ throw new InvalidOperationException("Serialization buffer provider may not be specified more than once.");
+ }
+
+ this.SerializationBufferProvider = serializationBufferProvider ?? throw new ArgumentNullException(nameof(serializationBufferProvider));
+ return this;
+ }
+
///
/// Build a new IProducer implementation instance.
///
diff --git a/src/Confluent.Kafka/Serializers.cs b/src/Confluent.Kafka/Serializers.cs
index b9e9ac2eb..2556c89b4 100644
--- a/src/Confluent.Kafka/Serializers.cs
+++ b/src/Confluent.Kafka/Serializers.cs
@@ -15,6 +15,8 @@
// Refer to LICENSE for more information.
using System;
+using System.Buffers;
+using System.Runtime.InteropServices;
using System.Text;
@@ -32,14 +34,18 @@ public static class Serializers
private class Utf8Serializer : ISerializer
{
- public byte[] Serialize(string data, SerializationContext context)
+ public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter)
{
- if (data == null)
+ if (data != null)
{
- return null;
- }
+ var size = Encoding.UTF8.GetByteCount(data);
+
+ var buffer = bufferWriter.GetMemory(size);
+ MemoryMarshal.TryGetArray(buffer, out var arraySegment);
- return Encoding.UTF8.GetBytes(data);
+ Encoding.UTF8.GetBytes(data, 0, data.Length, arraySegment.Array, arraySegment.Offset);
+ bufferWriter.Advance(size);
+ }
}
}
@@ -51,8 +57,10 @@ public byte[] Serialize(string data, SerializationContext context)
private class NullSerializer : ISerializer
{
- public byte[] Serialize(Null data, SerializationContext context)
- => null;
+ public void Serialize(Null data, SerializationContext context, IBufferWriter bufferWriter)
+ {
+
+ }
}
@@ -63,9 +71,9 @@ public byte[] Serialize(Null data, SerializationContext context)
private class Int64Serializer : ISerializer
{
- public byte[] Serialize(long data, SerializationContext context)
+ public void Serialize(long data, SerializationContext context, IBufferWriter bufferWriter)
{
- var result = new byte[8];
+ var result = bufferWriter.GetSpan(sizeHint: 8);
result[0] = (byte)(data >> 56);
result[1] = (byte)(data >> 48);
result[2] = (byte)(data >> 40);
@@ -74,7 +82,8 @@ public byte[] Serialize(long data, SerializationContext context)
result[5] = (byte)(data >> 16);
result[6] = (byte)(data >> 8);
result[7] = (byte)data;
- return result;
+
+ bufferWriter.Advance(8);
}
}
@@ -86,9 +95,9 @@ public byte[] Serialize(long data, SerializationContext context)
private class Int32Serializer : ISerializer
{
- public byte[] Serialize(int data, SerializationContext context)
+ public void Serialize(int data, SerializationContext context, IBufferWriter bufferWriter)
{
- var result = new byte[4]; // int is always 32 bits on .NET.
+ var result = bufferWriter.GetSpan(sizeHint: 4); // int is always 32 bits on .NET.
// network byte order -> big endian -> most significant byte in the smallest address.
// Note: At the IL level, the conv.u1 operator is used to cast int to byte which truncates
// the high order bits if overflow occurs.
@@ -97,7 +106,8 @@ public byte[] Serialize(int data, SerializationContext context)
result[1] = (byte)(data >> 16); // & 0xff;
result[2] = (byte)(data >> 8); // & 0xff;
result[3] = (byte)data; // & 0xff;
- return result;
+
+ bufferWriter.Advance(count: 4);
}
}
@@ -109,24 +119,35 @@ public byte[] Serialize(int data, SerializationContext context)
private class SingleSerializer : ISerializer
{
- public byte[] Serialize(float data, SerializationContext context)
+ public void Serialize(float data, SerializationContext context, IBufferWriter bufferWriter)
{
if (BitConverter.IsLittleEndian)
{
unsafe
{
- byte[] result = new byte[4];
+ var result = bufferWriter.GetSpan(4);
byte* p = (byte*)(&data);
result[3] = *p++;
result[2] = *p++;
result[1] = *p++;
result[0] = *p++;
- return result;
+
+ bufferWriter.Advance(4);
}
}
else
{
- return BitConverter.GetBytes(data);
+ unsafe
+ {
+ var result = bufferWriter.GetSpan(4);
+ byte* p = (byte*)(&data);
+ result[0] = *p++;
+ result[1] = *p++;
+ result[2] = *p++;
+ result[3] = *p++;
+
+ bufferWriter.Advance(4);
+ }
}
}
}
@@ -139,13 +160,13 @@ public byte[] Serialize(float data, SerializationContext context)
private class DoubleSerializer : ISerializer
{
- public byte[] Serialize(double data, SerializationContext context)
+ public void Serialize(double data, SerializationContext context, IBufferWriter bufferWriter)
{
if (BitConverter.IsLittleEndian)
{
unsafe
{
- byte[] result = new byte[8];
+ var result = bufferWriter.GetSpan(8);
byte* p = (byte*)(&data);
result[7] = *p++;
result[6] = *p++;
@@ -155,13 +176,26 @@ public byte[] Serialize(double data, SerializationContext context)
result[2] = *p++;
result[1] = *p++;
result[0] = *p++;
- return result;
}
}
else
{
- return BitConverter.GetBytes(data);
+ unsafe
+ {
+ var result = bufferWriter.GetSpan(8);
+ byte* p = (byte*)(&data);
+ result[0] = *p++;
+ result[1] = *p++;
+ result[2] = *p++;
+ result[3] = *p++;
+ result[4] = *p++;
+ result[5] = *p++;
+ result[6] = *p++;
+ result[7] = *p++;
+ }
}
+
+ bufferWriter.Advance(8);
}
}
@@ -176,8 +210,15 @@ public byte[] Serialize(double data, SerializationContext context)
private class ByteArraySerializer : ISerializer
{
- public byte[] Serialize(byte[] data, SerializationContext context)
- => data;
+ public void Serialize(byte[] data, SerializationContext context, IBufferWriter bufferWriter)
+ {
+ if (data != null)
+ {
+ var buffer = bufferWriter.GetSpan(data.Length);
+ data.CopyTo(buffer);
+ bufferWriter.Advance(data.Length);
+ }
+ }
}
}
}
diff --git a/src/Confluent.Kafka/SyncOverAsyncSerializer.cs b/src/Confluent.Kafka/SyncOverAsyncSerializer.cs
index 19a160351..87f9e7241 100644
--- a/src/Confluent.Kafka/SyncOverAsyncSerializer.cs
+++ b/src/Confluent.Kafka/SyncOverAsyncSerializer.cs
@@ -1,4 +1,5 @@
using System;
+using System.Buffers;
namespace Confluent.Kafka.SyncOverAsync
{
@@ -66,11 +67,11 @@ public SyncOverAsyncSerializer(IAsyncSerializer asyncSerializer)
///
/// Context relevant to the serialize operation.
///
- ///
- /// the serialized data.
- ///
- public byte[] Serialize(T data, SerializationContext context)
- => asyncSerializer.SerializeAsync(data, context)
+ ///
+ /// The to serialize the binary representation to.
+ ///
+ public void Serialize(T data, SerializationContext context, IBufferWriter bufferWriter)
+ => asyncSerializer.SerializeAsync(data, context, bufferWriter)
.ConfigureAwait(continueOnCapturedContext: false)
.GetAwaiter()
.GetResult();
diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs
index b1d4c5058..5b36ebb33 100644
--- a/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs
+++ b/src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs
@@ -15,6 +15,7 @@
// Refer to LICENSE for more information.
using System;
+using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
@@ -133,11 +134,14 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer
///
/// Context relevant to the serialize operation.
///
+ ///
+ /// The to serialize the binary representation to.
+ ///
///
/// A that completes with
/// serialized as a byte array.
///
- public async Task SerializeAsync(T value, SerializationContext context)
+ public async Task SerializeAsync(T value, SerializationContext context, IBufferWriter bufferWriter)
{
try
{
@@ -148,7 +152,7 @@ public async Task SerializeAsync(T value, SerializationContext context)
: new SpecificSerializerImpl(schemaRegistryClient, autoRegisterSchema, useLatestVersion, initialBufferSize, subjectNameStrategy);
}
- return await serializerImpl.Serialize(context.Topic, value, context.Component == MessageComponentType.Key).ConfigureAwait(continueOnCapturedContext: false);
+ await serializerImpl.Serialize(context.Topic, value, context.Component == MessageComponentType.Key, bufferWriter).ConfigureAwait(continueOnCapturedContext: false);
}
catch (AggregateException e)
{
diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs
index d94e1e9a2..b73695230 100644
--- a/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs
+++ b/src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs
@@ -26,7 +26,7 @@
using Confluent.Kafka;
using Avro.Generic;
using Avro.IO;
-
+using System.Buffers;
namespace Confluent.SchemaRegistry.Serdes
{
@@ -73,10 +73,13 @@ public GenericSerializerImpl(
///
/// whether or not the data represents a message key.
///
+ ///
+ /// The to serialize the binary representation to.
+ ///
///
/// serialized as a byte array.
///
- public async Task Serialize(string topic, GenericRecord data, bool isKey)
+ public async Task Serialize(string topic, GenericRecord data, bool isKey, IBufferWriter bufferWriter)
{
try
{
@@ -176,14 +179,13 @@ public async Task Serialize(string topic, GenericRecord data, bool isKey
serializeMutex.Release();
}
- using (var stream = new MemoryStream(initialBufferSize))
+ using (var stream = bufferWriter.AsStream())
using (var writer = new BinaryWriter(stream))
{
stream.WriteByte(Constants.MagicByte);
writer.Write(IPAddress.HostToNetworkOrder(schemaId));
new GenericWriter(writerSchema)
.Write(data, new BinaryEncoder(stream));
- return stream.ToArray();
}
}
catch (AggregateException e)
diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs
index cd0c22279..83f98fba4 100644
--- a/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs
+++ b/src/Confluent.SchemaRegistry.Serdes.Avro/IAvroSerializerImpl.cs
@@ -14,6 +14,7 @@
//
// Refer to LICENSE for more information.
+using System.Buffers;
using System.Threading.Tasks;
@@ -21,6 +22,6 @@ namespace Confluent.SchemaRegistry.Serdes
{
internal interface IAvroSerializerImpl
{
- Task Serialize(string topic, T data, bool isKey);
+ Task Serialize(string topic, T data, bool isKey, IBufferWriter bufferWriter);
}
}
diff --git a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs
index cd44c30f5..8dc4f2717 100644
--- a/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs
+++ b/src/Confluent.SchemaRegistry.Serdes.Avro/SpecificSerializerImpl.cs
@@ -18,6 +18,7 @@
#pragma warning disable CS0618
using System;
+using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@@ -173,7 +174,7 @@ private static SerializerSchemaData ExtractSchemaData(Type writerType)
return serializerSchemaData;
}
- public async Task Serialize(string topic, T data, bool isKey)
+ public async Task Serialize(string topic, T data, bool isKey, IBufferWriter bufferWriter)
{
try
{
@@ -237,16 +238,13 @@ public async Task Serialize(string topic, T data, bool isKey)
serializeMutex.Release();
}
- using (var stream = new MemoryStream(initialBufferSize))
+ using (var stream = bufferWriter.AsStream())
using (var writer = new BinaryWriter(stream))
{
stream.WriteByte(Constants.MagicByte);
writer.Write(IPAddress.HostToNetworkOrder(currentSchemaData.WriterSchemaId.Value));
currentSchemaData.AvroWriter.Write(data, new BinaryEncoder(stream));
-
- // TODO: maybe change the ISerializer interface so that this copy isn't necessary.
- return stream.ToArray();
}
}
catch (AggregateException e)
diff --git a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs
index 88a91e685..e216e8953 100644
--- a/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs
+++ b/src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs
@@ -28,7 +28,9 @@
using NJsonSchema.Generation;
using NJsonSchema.Validation;
using Confluent.Kafka;
-
+using System.Buffers;
+using System.Text;
+using System.Runtime.InteropServices;
namespace Confluent.SchemaRegistry.Serdes
{
@@ -139,13 +141,16 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer
///
/// Context relevant to the serialize operation.
///
+ ///
+ /// The to serialize the binary representation to.
+ ///
///
/// A that completes with
/// serialized as a byte array.
///
- public async Task SerializeAsync(T value, SerializationContext context)
+ public async Task SerializeAsync(T value, SerializationContext context, IBufferWriter bufferWriter)
{
- if (value == null) { return null; }
+ if (value == null) { return; }
var serializedString = Newtonsoft.Json.JsonConvert.SerializeObject(value, this.jsonSchemaGeneratorSettings?.ActualSerializerSettings);
var validationResult = validator.Validate(serializedString, this.schema);
@@ -196,15 +201,19 @@ public async Task SerializeAsync(T value, SerializationContext context)
{
serializeMutex.Release();
}
-
- using (var stream = new MemoryStream(initialBufferSize))
- using (var writer = new BinaryWriter(stream))
- {
- stream.WriteByte(Constants.MagicByte);
- writer.Write(IPAddress.HostToNetworkOrder(schemaId.Value));
- writer.Write(System.Text.Encoding.UTF8.GetBytes(serializedString));
- return stream.ToArray();
- }
+
+ var targetSize = 1 + 4 + Encoding.UTF8.GetByteCount(serializedString);
+ var buffer = bufferWriter.GetMemory(targetSize);
+ buffer.Span[0] = Constants.MagicByte;
+
+ var schemaIdNumber = IPAddress.HostToNetworkOrder(schemaId.Value);
+ buffer.Span[1] = (byte)schemaIdNumber;
+ buffer.Span[2] = (byte)(schemaIdNumber >> 8);
+ buffer.Span[3] = (byte)(schemaIdNumber >> 16);
+ buffer.Span[4] = (byte)(schemaIdNumber >> 24);
+
+ bufferWriter.Advance(5);
+ Serializers.Utf8.Serialize(serializedString, SerializationContext.Empty, bufferWriter);
}
catch (AggregateException e)
{
diff --git a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs
index e535ad8df..47be46b0c 100644
--- a/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs
+++ b/src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs
@@ -18,6 +18,7 @@
#pragma warning disable CS0618
using System;
+using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@@ -31,25 +32,26 @@
namespace Confluent.SchemaRegistry.Serdes
{
- ///
- /// Protobuf Serializer.
- ///
- ///
- /// Serialization format:
- /// byte 0: A magic byte that identifies this as a message with
- /// Confluent Platform framing.
- /// bytes 1-4: Unique global id of the Protobuf schema that was used
- /// for encoding (as registered in Confluent Schema Registry),
- /// big endian.
- /// following bytes: 1. A size-prefixed array of indices that identify the
- /// specific message type in the schema (a given schema
- /// can contain many message types and they can be nested).
- /// Size and indices are unsigned varints. The common case
- /// where the message type is the first message in the
- /// schema (i.e. index data would be [1,0]) is encoded as
- /// a single 0 byte as an optimization.
- /// 2. The protobuf serialized data.
- ///
+
+///
+/// Protobuf Serializer.
+///
+///
+/// Serialization format:
+/// byte 0: A magic byte that identifies this as a message with
+/// Confluent Platform framing.
+/// bytes 1-4: Unique global id of the Protobuf schema that was used
+/// for encoding (as registered in Confluent Schema Registry),
+/// big endian.
+/// following bytes: 1. A size-prefixed array of indices that identify the
+/// specific message type in the schema (a given schema
+/// can contain many message types and they can be nested).
+/// Size and indices are unsigned varints. The common case
+/// where the message type is the first message in the
+/// schema (i.e. index data would be [1,0]) is encoded as
+/// a single 0 byte as an optimization.
+/// 2. The protobuf serialized data.
+///
public class ProtobufSerializer : IAsyncSerializer where T : IMessage, new()
{
private const int DefaultInitialBufferSize = 1024;
@@ -242,13 +244,16 @@ private async Task> RegisterOrGetReferences(FileDescriptor
///
/// Context relevant to the serialize operation.
///
+ ///
+ /// The to serialize the binary representation to.
+ ///
///
/// A that completes with
/// serialized as a byte array.
///
- public async Task SerializeAsync(T value, SerializationContext context)
+ public async Task SerializeAsync(T value, SerializationContext context, IBufferWriter bufferWriter)
{
- if (value == null) { return null; }
+ if (value == null) { return; }
try
{
@@ -307,14 +312,23 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema
serializeMutex.Release();
}
- using (var stream = new MemoryStream(initialBufferSize))
- using (var writer = new BinaryWriter(stream))
+
+ var targetSize = 1 + 4 + this.indexArray.Length;
+ var buffer = bufferWriter.GetMemory(targetSize);
+ buffer.Span[0] = Constants.MagicByte;
+
+ var schemaIdNumber = IPAddress.HostToNetworkOrder(schemaId.Value);
+ buffer.Span[1] = (byte)schemaIdNumber;
+ buffer.Span[2] = (byte)(schemaIdNumber >> 8);
+ buffer.Span[3] = (byte)(schemaIdNumber >> 16);
+ buffer.Span[4] = (byte)(schemaIdNumber >> 24);
+ this.indexArray.CopyTo(buffer.Span.Slice(start: 5));
+
+ bufferWriter.Advance(5 + this.indexArray.Length);
+
+ using (var stream = bufferWriter.AsStream())
{
- stream.WriteByte(Constants.MagicByte);
- writer.Write(IPAddress.HostToNetworkOrder(schemaId.Value));
- writer.Write(this.indexArray);
value.WriteTo(stream);
- return stream.ToArray();
}
}
catch (AggregateException e)
diff --git a/test/Confluent.Kafka.IntegrationTests/Serdes.cs b/test/Confluent.Kafka.IntegrationTests/Serdes.cs
index e5d61c95b..3ebd130b3 100644
--- a/test/Confluent.Kafka.IntegrationTests/Serdes.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Serdes.cs
@@ -14,6 +14,7 @@
//
// Refer to LICENSE for more information.
+using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
@@ -23,10 +24,10 @@ namespace Confluent.Kafka.IntegrationTests
{
class SimpleAsyncSerializer : IAsyncSerializer
{
- public async Task SerializeAsync(string data, SerializationContext context)
+ public async Task SerializeAsync(string data, SerializationContext context, IBufferWriter bufferWriter)
{
await Task.Delay(500).ConfigureAwait(false);
- return Serializers.Utf8.Serialize(data, context);
+ Serializers.Utf8.Serialize(data, context, bufferWriter);
}
public ISerializer SyncOverAsync()
@@ -37,10 +38,10 @@ public ISerializer SyncOverAsync()
class SimpleSyncSerializer : ISerializer
{
- public byte[] Serialize(string data, SerializationContext context)
+ public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter)
{
Thread.Sleep(500);
- return Serializers.Utf8.Serialize(data, context);
+ Serializers.Utf8.Serialize(data, context, bufferWriter);
}
}
}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs
index a2dea1084..1a058c2ed 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs
@@ -47,7 +47,7 @@ public void AssignPastEnd(string bootstrapServers)
DeliveryResult dr;
using (var producer = new ProducerBuilder(producerConfig).Build())
{
- dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize(testString, SerializationContext.Empty) }).Result;
+ dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray(testString, SerializationContext.Empty) }).Result;
Assert.True(dr.Offset >= 0);
producer.Flush(TimeSpan.FromSeconds(10));
}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs
index 7ba394eff..74f665219 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs
@@ -20,7 +20,7 @@
using System.Text;
using System.Collections.Generic;
using Xunit;
-
+using System.Buffers;
namespace Confluent.Kafka.IntegrationTests
{
@@ -35,9 +35,11 @@ class MyProducerBuilder : ProducerBuilder
{
class Utf32Serializer : ISerializer
{
- public byte[] Serialize(string data, SerializationContext context)
+ public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter)
{
- return Encoding.UTF32.GetBytes(data);
+ var size = Encoding.UTF32.GetByteCount(data);
+ var buffer = bufferWriter.GetSpan(size);
+ Encoding.UTF32.GetBytes(data).CopyTo(buffer);
}
}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs
index 3c33cf62c..52db0e812 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableHeaders.cs
@@ -48,7 +48,7 @@ public void Consumer_DisableHeaders(string bootstrapServers)
singlePartitionTopic,
new Message
{
- Value = Serializers.Utf8.Serialize("my-value", SerializationContext.Empty),
+ Value = Serializers.Utf8.ToByteArray("my-value", SerializationContext.Empty),
Headers = new Headers() { new Header("my-header", new byte[] { 42 }) }
}
).Result;
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs
index 4aaafd7b3..09476fb6f 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_DisableTimestamps.cs
@@ -48,7 +48,7 @@ public void Consumer_DisableTimestamps(string bootstrapServers)
singlePartitionTopic,
new Message
{
- Value = Serializers.Utf8.Serialize("my-value", SerializationContext.Empty),
+ Value = Serializers.Utf8.ToByteArray("my-value", SerializationContext.Empty),
Headers = new Headers() { new Header("my-header", new byte[] { 42 }) }
}
).Result;
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs
index 3166b1170..23f0fe5e7 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_OffsetsForTimes.cs
@@ -130,8 +130,8 @@ private static DeliveryResult[] ProduceMessages(string bootstrap
new TopicPartition(topic, partition),
new Message
{
- Key = Serializers.Utf8.Serialize($"test key {index}", SerializationContext.Empty),
- Value = Serializers.Utf8.Serialize($"test val {index}", SerializationContext.Empty),
+ Key = Serializers.Utf8.ToByteArray($"test key {index}", SerializationContext.Empty),
+ Value = Serializers.Utf8.ToByteArray($"test val {index}", SerializationContext.Empty),
Timestamp = new Timestamp(baseTime + index*1000, TimestampType.CreateTime),
Headers = null
}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs
index 45015d5eb..012748060 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Pause_Resume.cs
@@ -61,13 +61,13 @@ public void Consumer_Pause_Resume(string bootstrapServers)
ConsumeResult record = consumer.Consume(TimeSpan.FromSeconds(2));
Assert.Null(record);
- producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.Serialize("test value", SerializationContext.Empty) }).Wait();
+ producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.ToByteArray("test value", SerializationContext.Empty) }).Wait();
record = consumer.Consume(TimeSpan.FromSeconds(10));
Assert.NotNull(record?.Message);
Assert.Equal(0, record?.Offset);
consumer.Pause(assignment);
- producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.Serialize("test value 2", SerializationContext.Empty) }).Wait();
+ producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.ToByteArray("test value 2", SerializationContext.Empty) }).Wait();
record = consumer.Consume(TimeSpan.FromSeconds(2));
Assert.Null(record);
consumer.Resume(assignment);
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs
index bbd556cdb..3fada93d9 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Seek.cs
@@ -49,9 +49,9 @@ public void Consumer_Seek(string bootstrapServers)
.Build())
{
const string checkValue = "check value";
- var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize(checkValue, SerializationContext.Empty) }).Result;
- var dr2 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("second value", SerializationContext.Empty) }).Result;
- var dr3 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("third value", SerializationContext.Empty) }).Result;
+ var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray(checkValue, SerializationContext.Empty) }).Result;
+ var dr2 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("second value", SerializationContext.Empty) }).Result;
+ var dr3 = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("third value", SerializationContext.Empty) }).Result;
consumer.Assign(new TopicPartitionOffset[] { new TopicPartitionOffset(singlePartitionTopic, 0, dr.Offset) });
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs
index 00f328731..098544490 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs
@@ -64,7 +64,7 @@ public void Consumer_StoreOffsets(string bootstrapServers)
ConsumeResult record = consumer.Consume(TimeSpan.FromSeconds(10));
Assert.Null(record);
- producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test store offset value", SerializationContext.Empty) }).Wait();
+ producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test store offset value", SerializationContext.Empty) }).Wait();
record = consumer.Consume(TimeSpan.FromSeconds(10));
Assert.NotNull(record?.Message);
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs b/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs
index 61c097da2..0295c8af2 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/DuplicateConsumerAssign.cs
@@ -52,7 +52,7 @@ public void DuplicateConsumerAssign(string bootstrapServers)
DeliveryResult dr;
using (var producer = new ProducerBuilder(producerConfig).Build())
{
- dr = producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.Serialize(testString, SerializationContext.Empty) }).Result;
+ dr = producer.ProduceAsync(topic.Name, new Message { Value = Serializers.Utf8.ToByteArray(testString, SerializationContext.Empty) }).Result;
Assert.NotNull(dr);
producer.Flush(TimeSpan.FromSeconds(10));
}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs b/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs
index cb0264bcf..091a6d1f0 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/GarbageCollect.cs
@@ -40,7 +40,7 @@ public void GarbageCollect(string bootstrapServers)
using (var producer = new ProducerBuilder(producerConfig).Build())
{
- producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test string", SerializationContext.Empty) }).Wait();
+ producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test string", SerializationContext.Empty) }).Wait();
}
using (var consumer = new ConsumerBuilder(consumerConfig).Build())
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs
index 0bc8c9960..afd017b6d 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/Headers_SerializationContext.cs
@@ -17,6 +17,7 @@
#pragma warning disable xUnit1026
using System;
+using System.Buffers;
using System.Text;
using System.Threading.Tasks;
using Xunit;
@@ -29,11 +30,17 @@ public partial class Tests
class TestSerializer : IAsyncSerializer
{
- public Task SerializeAsync(string data, SerializationContext context)
+ public Task SerializeAsync(string data, SerializationContext context, IBufferWriter bufferWriter)
{
Assert.NotNull(context.Headers);
context.Headers.Add("test_header", new byte[] { 100, 42 });
- return Task.FromResult(Encoding.UTF8.GetBytes("test_value"));
+
+ var bytes = Encoding.UTF8.GetBytes("test_value");
+ var buffer = bufferWriter.GetSpan(bytes.Length);
+ bytes.CopyTo(buffer);
+ bufferWriter.Advance(bytes.Length);
+
+ return Task.CompletedTask;
}
}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs b/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs
index 055baf8ff..639e2e5a8 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/LogDelegate.cs
@@ -60,7 +60,7 @@ public void LogDelegate(string bootstrapServers)
.SetLogHandler((_, m) => logCount += 1)
.Build())
{
- dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test value", SerializationContext.Empty) }).Result;
+ dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test value", SerializationContext.Empty) }).Result;
producer.Flush(TimeSpan.FromSeconds(10));
}
Assert.True(logCount > 0);
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs b/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs
index bea52fa0d..252d32c73 100644
--- a/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/OnPartitionsAssignedNotSet.cs
@@ -46,7 +46,7 @@ public void OnPartitionsAssignedNotSet(string bootstrapServers)
// Producing onto the topic to make sure it exists.
using (var producer = new ProducerBuilder(producerConfig).Build())
{
- var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.Serialize("test string", SerializationContext.Empty) }).Result;
+ var dr = producer.ProduceAsync(singlePartitionTopic, new Message { Value = Serializers.Utf8.ToByteArray("test string", SerializationContext.Empty) }).Result;
Assert.NotEqual(Offset.Unset, dr.Offset);
producer.Flush(TimeSpan.FromSeconds(10));
}
diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/SerializationExtensions.cs b/test/Confluent.Kafka.IntegrationTests/Tests/SerializationExtensions.cs
new file mode 100644
index 000000000..a52aea71b
--- /dev/null
+++ b/test/Confluent.Kafka.IntegrationTests/Tests/SerializationExtensions.cs
@@ -0,0 +1,33 @@
+// Copyright 2018 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+#pragma warning disable xUnit1026
+
+
+namespace Confluent.Kafka.IntegrationTests
+{
+ public static class SerializationExtensions
+ {
+ public static byte[] ToByteArray(this ISerializer serializer, T value, SerializationContext serializationContext)
+ {
+ using (var buffer = DefaultSerializationBufferProvider.Instance.Create())
+ {
+ serializer.Serialize(value, serializationContext, buffer);
+ return buffer.GetComitted().ToArray();
+ }
+ }
+ }
+}
diff --git a/test/Confluent.Kafka.SyncOverAsync/Program.cs b/test/Confluent.Kafka.SyncOverAsync/Program.cs
index 7898afe31..4a69a0008 100644
--- a/test/Confluent.Kafka.SyncOverAsync/Program.cs
+++ b/test/Confluent.Kafka.SyncOverAsync/Program.cs
@@ -15,6 +15,7 @@
// Refer to LICENSE for more information.
using System;
+using System.Buffers;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -33,10 +34,10 @@ namespace Confluent.Kafka.SyncOverAsync
{
class SimpleAsyncSerializer : IAsyncSerializer
{
- public async Task SerializeAsync(string data, SerializationContext context)
+ public async Task SerializeAsync(string data, SerializationContext context, IBufferWriter bufferWriter)
{
await Task.Delay(500);
- return Serializers.Utf8.Serialize(data, context);
+ Serializers.Utf8.Serialize(data, context, bufferWriter);
}
public ISerializer SyncOverAsync()
@@ -47,10 +48,10 @@ public ISerializer SyncOverAsync()
class SimpleSyncSerializer : ISerializer
{
- public byte[] Serialize(string data, SerializationContext context)
+ public void Serialize(string data, SerializationContext context, IBufferWriter bufferWriter)
{
Thread.Sleep(500);
- return Serializers.Utf8.Serialize(data, context);
+ Serializers.Utf8.Serialize(data, context, bufferWriter);
}
}
diff --git a/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs b/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs
index 3acf442b9..69c8aaaa0 100644
--- a/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs
+++ b/test/Confluent.Kafka.UnitTests/Serialization/ByteArray.cs
@@ -27,13 +27,13 @@ public class ByteArrayTests
[InlineData(new byte[] { 1, 2, 3, 4, 5 })]
public void CanReconstructByteArray(byte[] values)
{
- Assert.Equal(values, Deserializers.ByteArray.Deserialize(Serializers.ByteArray.Serialize(values, SerializationContext.Empty), false, SerializationContext.Empty));
+ Assert.Equal(values, Deserializers.ByteArray.Deserialize(Serializers.ByteArray.ToByteArray(values, SerializationContext.Empty), false, SerializationContext.Empty));
}
[Fact]
public void CanReconstructByteArrayNull()
{
- Assert.Null(Deserializers.ByteArray.Deserialize(Serializers.ByteArray.Serialize(null, SerializationContext.Empty), true, SerializationContext.Empty));
+ Assert.Null(Deserializers.ByteArray.Deserialize(Serializers.ByteArray.ToByteArray(null, SerializationContext.Empty), true, SerializationContext.Empty));
}
}
}
diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Double.cs b/test/Confluent.Kafka.UnitTests/Serialization/Double.cs
index 6581154a1..e2820d2f1 100644
--- a/test/Confluent.Kafka.UnitTests/Serialization/Double.cs
+++ b/test/Confluent.Kafka.UnitTests/Serialization/Double.cs
@@ -28,7 +28,7 @@ public void CanReconstructDouble()
{
foreach (var value in TestData)
{
- Assert.Equal(value, Deserializers.Double.Deserialize(Serializers.Double.Serialize(value, SerializationContext.Empty), false, SerializationContext.Empty));
+ Assert.Equal(value, Deserializers.Double.Deserialize(Serializers.Double.ToByteArray(value, SerializationContext.Empty), false, SerializationContext.Empty));
}
}
@@ -37,7 +37,7 @@ public void IsBigEndian()
{
var buffer = new byte[] { 23, 0, 0, 0, 0, 0, 0, 0 };
var value = BitConverter.ToDouble(buffer, 0);
- var data = Serializers.Double.Serialize(value, SerializationContext.Empty);
+ var data = Serializers.Double.ToByteArray(value, SerializationContext.Empty);
Assert.Equal(23, data[7]);
Assert.Equal(0, data[0]);
}
diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Float.cs b/test/Confluent.Kafka.UnitTests/Serialization/Float.cs
index 00168a5db..401f4df67 100644
--- a/test/Confluent.Kafka.UnitTests/Serialization/Float.cs
+++ b/test/Confluent.Kafka.UnitTests/Serialization/Float.cs
@@ -28,7 +28,7 @@ public void CanReconstructFloat()
{
foreach (var value in TestData)
{
- Assert.Equal(value, Deserializers.Single.Deserialize(Serializers.Single.Serialize(value, SerializationContext.Empty), false, SerializationContext.Empty));
+ Assert.Equal(value, Deserializers.Single.Deserialize(Serializers.Single.ToByteArray(value, SerializationContext.Empty), false, SerializationContext.Empty));
}
}
@@ -37,7 +37,7 @@ public void IsBigEndian()
{
var buffer = new byte[] { 23, 0, 0, 0 };
var value = BitConverter.ToSingle(buffer, 0);
- var data = Serializers.Single.Serialize(value, SerializationContext.Empty);
+ var data = Serializers.Single.ToByteArray(value, SerializationContext.Empty);
Assert.Equal(23, data[3]);
Assert.Equal(0, data[0]);
}
diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Int.cs b/test/Confluent.Kafka.UnitTests/Serialization/Int.cs
index a76e85538..b1ad4d214 100644
--- a/test/Confluent.Kafka.UnitTests/Serialization/Int.cs
+++ b/test/Confluent.Kafka.UnitTests/Serialization/Int.cs
@@ -34,7 +34,7 @@ public class IntTests
[Fact]
public void IsBigEndian()
{
- var bytes = Serializers.Int32.Serialize(42, SerializationContext.Empty);
+ var bytes = Serializers.Int32.ToByteArray(42, SerializationContext.Empty);
Assert.Equal(4, bytes.Length);
// most significant byte in smallest address.
Assert.Equal(0, bytes[0]);
@@ -49,7 +49,7 @@ public void SerializationAgreesWithSystemNetHostToNetworkOrder()
int networkOrder = System.Net.IPAddress.HostToNetworkOrder(theInt);
var bytes1 = BitConverter.GetBytes(networkOrder);
- var bytes2 = Serializers.Int32.Serialize(theInt, SerializationContext.Empty);
+ var bytes2 = Serializers.Int32.ToByteArray(theInt, SerializationContext.Empty);
Assert.Equal(bytes1.Length, bytes2.Length);
@@ -65,7 +65,7 @@ public void CanReconstructInt()
{
foreach (int theInt in toTest)
{
- var reconstructed = Deserializers.Int32.Deserialize(Serializers.Int32.Serialize(theInt, SerializationContext.Empty), false, SerializationContext.Empty);
+ var reconstructed = Deserializers.Int32.Deserialize(Serializers.Int32.ToByteArray(theInt, SerializationContext.Empty), false, SerializationContext.Empty);
Assert.Equal(theInt, reconstructed);
}
}
diff --git a/test/Confluent.Kafka.UnitTests/Serialization/Long.cs b/test/Confluent.Kafka.UnitTests/Serialization/Long.cs
index 755105c71..b291a35c6 100644
--- a/test/Confluent.Kafka.UnitTests/Serialization/Long.cs
+++ b/test/Confluent.Kafka.UnitTests/Serialization/Long.cs
@@ -26,13 +26,13 @@ public class LongTests
[MemberData(nameof(TestData))]
public void CanReconstructLong(long value)
{
- Assert.Equal(value, Deserializers.Int64.Deserialize(Serializers.Int64.Serialize(value, SerializationContext.Empty), false, SerializationContext.Empty));
+ Assert.Equal(value, Deserializers.Int64.Deserialize(Serializers.Int64.ToByteArray(value, SerializationContext.Empty), false, SerializationContext.Empty));
}
[Fact]
public void IsBigEndian()
{
- var data = Serializers.Int64.Serialize(23L, SerializationContext.Empty);
+ var data = Serializers.Int64.ToByteArray(23L, SerializationContext.Empty);
Assert.Equal(23, data[7]);
Assert.Equal(0, data[0]);
}
diff --git a/test/Confluent.Kafka.UnitTests/Serialization/SerializationExtensions.cs b/test/Confluent.Kafka.UnitTests/Serialization/SerializationExtensions.cs
new file mode 100644
index 000000000..06d852cde
--- /dev/null
+++ b/test/Confluent.Kafka.UnitTests/Serialization/SerializationExtensions.cs
@@ -0,0 +1,33 @@
+// Copyright 2016-2017 Confluent Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Refer to LICENSE for more information.
+
+using System.Linq;
+
+
+namespace Confluent.Kafka.UnitTests.Serialization
+{
+ public static class SerializationExtensions
+ {
+ public static byte[] ToByteArray(this ISerializer serializer, T value, SerializationContext serializationContext)
+ {
+ using (var buffer = DefaultSerializationBufferProvider.Instance.Create())
+ {
+ serializer.Serialize(value, serializationContext, buffer);
+ return buffer.GetComitted().ToArray();
+ }
+ }
+ }
+}
diff --git a/test/Confluent.Kafka.UnitTests/Serialization/String.cs b/test/Confluent.Kafka.UnitTests/Serialization/String.cs
index f1f8ff381..9ba76c370 100644
--- a/test/Confluent.Kafka.UnitTests/Serialization/String.cs
+++ b/test/Confluent.Kafka.UnitTests/Serialization/String.cs
@@ -28,10 +28,10 @@ public class StringTests
[Fact]
public void SerializeDeserialize()
{
- Assert.Equal("hello world", Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("hello world", SerializationContext.Empty), false, SerializationContext.Empty));
- Assert.Equal("ឆ្មាត្រូវបានហែលទឹក", Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("ឆ្មាត្រូវបានហែលទឹក", SerializationContext.Empty), false, SerializationContext.Empty));
- Assert.Equal("вы не банан", Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("вы не банан", SerializationContext.Empty), false, SerializationContext.Empty));
- Assert.Null(Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize(null, SerializationContext.Empty), true, SerializationContext.Empty));
+ Assert.Equal("hello world", Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray("hello world", SerializationContext.Empty), false, SerializationContext.Empty));
+ Assert.Equal("ឆ្មាត្រូវបានហែលទឹក", Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray("ឆ្មាត្រូវបានហែលទឹក", SerializationContext.Empty), false, SerializationContext.Empty));
+ Assert.Equal("вы не банан", Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray("вы не банан", SerializationContext.Empty), false, SerializationContext.Empty));
+ Assert.Null(Deserializers.Utf8.Deserialize(Serializers.Utf8.ToByteArray(null, SerializationContext.Empty), true, SerializationContext.Empty));
// TODO: check some serialize / deserialize operations that are not expected to work, including some
// cases where Deserialize can be expected to throw an exception.
diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs
index afb3db938..3273ba06d 100644
--- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs
+++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/JsonSerializeDeserialize.cs
@@ -32,6 +32,41 @@
namespace Confluent.SchemaRegistry.Serdes.UnitTests
{
+ public static class SerializationExtensions
+ {
+ public static byte[] ToByteArray(this ISerializer serializer, T value, SerializationContext serializationContext)
+ {
+ using (var buffer = DefaultSerializationBufferProvider.Instance.Create())
+ {
+ serializer.Serialize(value, serializationContext, buffer);
+ var comitted = buffer.GetComitted();
+
+ if (comitted.Count == 0)
+ {
+ return null;
+ }
+
+ return comitted.ToArray();
+ }
+ }
+
+ public static async Task ToByteArray(this IAsyncSerializer serializer, T value, SerializationContext serializationContext)
+ {
+ using (var buffer = DefaultSerializationBufferProvider.Instance.Create())
+ {
+ await serializer.SerializeAsync(value, serializationContext, buffer);
+ var comitted = buffer.GetComitted();
+
+ if (comitted.Count == 0)
+ {
+ return null;
+ }
+
+ return comitted.ToArray();
+ }
+ }
+ }
+
public class JsonSerializeDeserialzeTests
{
public class UInt32Value
@@ -104,7 +139,7 @@ public void Null()
var jsonSerializer = new JsonSerializer(schemaRegistryClient);
var jsonDeserializer = new JsonDeserializer();
- var bytes = jsonSerializer.SerializeAsync(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytes = jsonSerializer.ToByteArray(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Null(bytes);
Assert.Null(jsonDeserializer.DeserializeAsync(bytes, true, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -117,7 +152,7 @@ public void UInt32SerDe()
var jsonDeserializer = new JsonDeserializer();
var v = new UInt32Value { Value = 1234 };
- var bytes = jsonSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytes = jsonSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(v.Value, jsonDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result.Value);
}
@@ -142,7 +177,7 @@ public async Task WithJsonSerializerSettingsSerDe()
var jsonDeserializer = new JsonDeserializer(jsonSchemaGeneratorSettings: jsonSchemaGeneratorSettings);
var v = new UInt32Value { Value = value };
- var bytes = await jsonSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic));
+ var bytes = await jsonSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic));
Assert.NotNull(bytes);
Assert.Equal(expectedJson, Encoding.UTF8.GetString(bytes.AsSpan().Slice(5)));
@@ -166,7 +201,7 @@ public async Task WithJsonSchemaGeneratorSettingsSerDe(EnumHandling enumHandling
var jsonDeserializer = new JsonDeserializer(jsonSchemaGeneratorSettings: jsonSchemaGeneratorSettings);
var v = new EnumObject { Value = value };
- var bytes = await jsonSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic));
+ var bytes = await jsonSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic));
Assert.NotNull(bytes);
Assert.Equal(expectedJson, Encoding.UTF8.GetString(bytes.AsSpan().Slice(5)));
diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs
index ff3c68c55..724efd1b2 100644
--- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs
+++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/ProtoSerializeDeserialize.cs
@@ -52,7 +52,7 @@ public void Null()
var protoSerializer = new ProtobufSerializer(schemaRegistryClient);
var protoDeserializer = new ProtobufDeserializer();
- var bytes = protoSerializer.SerializeAsync(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytes = protoSerializer.ToByteArray(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Null(bytes);
Assert.Null(protoDeserializer.DeserializeAsync(bytes, true, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -64,7 +64,7 @@ public void UInt32SerDe()
var protoDeserializer = new ProtobufDeserializer();
var v = new UInt32Value { Value = 1234 };
- var bytes = protoSerializer.SerializeAsync(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytes = protoSerializer.ToByteArray(v, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(v.Value, protoDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result.Value);
}
diff --git a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs
index 9f083a4b4..836a6334d 100644
--- a/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs
+++ b/test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs
@@ -54,7 +54,7 @@ public void IntSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync(1234, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray(1234, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(1234, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -64,7 +64,7 @@ public void LongSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync(123, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray(123, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(123, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -74,7 +74,7 @@ public void BoolSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync(true, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray(true, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(true, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -84,7 +84,7 @@ public void StringSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync("abc", new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray("abc", new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal("abc", avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -94,7 +94,7 @@ public void DoubleSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync(123d, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray(123d, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(123d, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -104,7 +104,7 @@ public void FloatSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync(123f, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray(123f, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(123f, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -114,7 +114,7 @@ public void BytesSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync(new byte[] { 2, 3, 4 }, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray(new byte[] { 2, 3, 4 }, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(new byte[] { 2, 3, 4 }, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -124,7 +124,7 @@ public void NullSerDe()
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
byte[] bytes;
- bytes = avroSerializer.SerializeAsync(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ bytes = avroSerializer.ToByteArray(null, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(null, avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
@@ -141,7 +141,7 @@ public void ISpecificRecord()
name = "awesome"
};
- var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytes = serializer.ToByteArray(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Equal(user.name, result.name);
@@ -169,7 +169,7 @@ public void Multiple_ISpecificRecords()
name = "great_brand"
};
- var bytesUser = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytesUser = serializer.ToByteArray(user, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
var resultUser = deserializerUser.DeserializeAsync(bytesUser, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result as User;
Assert.NotNull(resultUser);
@@ -177,7 +177,7 @@ public void Multiple_ISpecificRecords()
Assert.Equal(user.favorite_color, resultUser.favorite_color);
Assert.Equal(user.favorite_number, resultUser.favorite_number);
- var bytesCar = serializer.SerializeAsync(car, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytesCar = serializer.ToByteArray(car, new SerializationContext(MessageComponentType.Value, testTopic)).Result;
var resultCar = deserializerCar.DeserializeAsync(bytesCar, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result as Car;
Assert.NotNull(resultCar);
@@ -189,7 +189,7 @@ public void Multiple_ISpecificRecords()
public void Poco_Serialize()
{
var serializer = new AvroSerializer>(schemaRegistryClient);
- Assert.Throws(() => serializer.SerializeAsync(new Dictionary { { "cat", "dog" } }, new SerializationContext(MessageComponentType.Key, testTopic)).GetAwaiter().GetResult());
+ Assert.Throws(() => serializer.ToByteArray(new Dictionary { { "cat", "dog" } }, new SerializationContext(MessageComponentType.Key, testTopic)).GetAwaiter().GetResult());
}
[Fact]
@@ -204,7 +204,7 @@ public void Incompatible()
{
var avroSerializer = new AvroSerializer(schemaRegistryClient);
var avroDeserializer = new AvroDeserializer(schemaRegistryClient);
- var bytes = avroSerializer.SerializeAsync("hello world", new SerializationContext(MessageComponentType.Value, testTopic)).Result;
+ var bytes = avroSerializer.ToByteArray("hello world", new SerializationContext(MessageComponentType.Value, testTopic)).Result;
Assert.Throws(() => avroDeserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic)).Result);
}
}