diff --git a/src/Confluent.Kafka/DependentProducerBuilder.cs b/src/Confluent.Kafka/DependentProducerBuilder.cs index d152ca652..c40134a83 100644 --- a/src/Confluent.Kafka/DependentProducerBuilder.cs +++ b/src/Confluent.Kafka/DependentProducerBuilder.cs @@ -37,23 +37,23 @@ public class DependentProducerBuilder /// /// The configured key serializer. /// - public ISerializer KeySerializer { get; set; } + public ISegmentSerializer KeySerializer { get; set; } /// /// The configured value serializer. /// - public ISerializer ValueSerializer { get; set; } - + public ISegmentSerializer ValueSerializer { get; set; } + + /// /// The configured async key serializer. /// - public IAsyncSerializer AsyncKeySerializer { get; set; } + public IAsyncSegmentSerializer AsyncKeySerializer { get; set; } /// /// The configured async value serializer. /// - public IAsyncSerializer AsyncValueSerializer { get; set; } - + public IAsyncSegmentSerializer AsyncValueSerializer { get; set; } /// /// An underlying librdkafka client handle that the Producer will use to @@ -70,7 +70,7 @@ public DependentProducerBuilder(Handle handle) /// public DependentProducerBuilder SetKeySerializer(ISerializer serializer) { - this.KeySerializer = serializer; + this.KeySerializer = new WrappedSyncSegmentSerializer(serializer); return this; } @@ -79,7 +79,7 @@ public DependentProducerBuilder SetKeySerializer(ISerializer /// public DependentProducerBuilder SetValueSerializer(ISerializer serializer) { - this.ValueSerializer = serializer; + this.ValueSerializer = new WrappedSyncSegmentSerializer(serializer); return this; } @@ -88,7 +88,7 @@ public DependentProducerBuilder SetValueSerializer(ISerializer public DependentProducerBuilder SetKeySerializer(IAsyncSerializer serializer) { - this.AsyncKeySerializer = serializer; + this.AsyncKeySerializer = new WrappedAsyncSyncSegmentSerializer(serializer); return this; } @@ -96,10 +96,46 @@ public DependentProducerBuilder SetKeySerializer(IAsyncSerializer< /// The async serializer to use to serialize values. /// public DependentProducerBuilder SetValueSerializer(IAsyncSerializer serializer) + { + this.AsyncValueSerializer = new WrappedAsyncSyncSegmentSerializer(serializer); + return this; + } + + /// + /// The async serializer to use to serialize keys. This uses the array segment API + /// + public DependentProducerBuilder SetKeySerializer(IAsyncSegmentSerializer serializer) + { + this.AsyncKeySerializer = serializer; + return this; + } + + /// + /// The async serializer to use to serialize values. This uses the array segment API + /// + public DependentProducerBuilder SetValueSerializer(IAsyncSegmentSerializer serializer) { this.AsyncValueSerializer = serializer; return this; } + + /// + /// The async serializer to use to serialize keys. This uses the array segment API + /// + public DependentProducerBuilder SetKeySerializer(ISegmentSerializer serializer) + { + this.KeySerializer = serializer; + return this; + } + + /// + /// The async serializer to use to serialize values. This uses the array segment API + /// + public DependentProducerBuilder SetValueSerializer(ISegmentSerializer serializer) + { + this.ValueSerializer = serializer; + return this; + } /// /// Build a new IProducer implementation instance. diff --git a/src/Confluent.Kafka/IAsyncSegmentSerializer.cs b/src/Confluent.Kafka/IAsyncSegmentSerializer.cs new file mode 100644 index 000000000..913cee36c --- /dev/null +++ b/src/Confluent.Kafka/IAsyncSegmentSerializer.cs @@ -0,0 +1,49 @@ +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Threading.Tasks; + +namespace Confluent.Kafka +{ + /// + /// Defines a serializer for use with . + /// + public interface IAsyncSegmentSerializer + { + /// + /// Serialize the key or value of a + /// instance. + /// + /// + /// The value to serialize. + /// + /// + /// Context relevant to the serialize operation. + /// + /// + /// A that + /// completes with the serialized data. + /// + Task> SerializeAsync(T data, SerializationContext context); + + /// + /// Release resources associated with the array segment + /// + /// The segment that was created in the call. + void Release(ref ArraySegment segment); + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/ISegmentSerializer.cs b/src/Confluent.Kafka/ISegmentSerializer.cs new file mode 100644 index 000000000..9ee41fd38 --- /dev/null +++ b/src/Confluent.Kafka/ISegmentSerializer.cs @@ -0,0 +1,48 @@ +// Copyright 2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + + +using System; + +namespace Confluent.Kafka +{ + /// + /// Defines a serializer for use with . + /// + public interface ISegmentSerializer + { + /// + /// Serialize the key or value of a + /// instance. + /// + /// + /// The value to serialize. + /// + /// + /// Context relevant to the serialize operation. + /// + /// + /// The serialized value. + /// + ArraySegment Serialize(T data, SerializationContext context); + + /// + /// Release resources associated with the array segment + /// + /// The segment that was created in the call. + void Release(ref ArraySegment segment); + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index ff0cb7b43..a48f9c83d 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -42,10 +42,10 @@ internal class Config public PartitionerDelegate defaultPartitioner; } - private ISerializer keySerializer; - private ISerializer valueSerializer; - private IAsyncSerializer asyncKeySerializer; - private IAsyncSerializer asyncValueSerializer; + private ISegmentSerializer keySerializer; + private ISegmentSerializer valueSerializer; + private IAsyncSegmentSerializer asyncKeySerializer; + private IAsyncSegmentSerializer asyncValueSerializer; private static readonly Dictionary defaultSerializers = new Dictionary { @@ -496,10 +496,11 @@ public Handle Handle } private void InitializeSerializers( - ISerializer keySerializer, - ISerializer valueSerializer, - IAsyncSerializer asyncKeySerializer, - IAsyncSerializer asyncValueSerializer) + ISegmentSerializer keySerializer, + ISegmentSerializer valueSerializer, + IAsyncSegmentSerializer asyncKeySerializer, + IAsyncSegmentSerializer asyncValueSerializer + ) { // setup key serializer. if (keySerializer == null && asyncKeySerializer == null) @@ -509,7 +510,7 @@ private void InitializeSerializers( throw new ArgumentNullException( $"Key serializer not specified and there is no default serializer defined for type {typeof(TKey).Name}."); } - this.keySerializer = (ISerializer)serializer; + this.keySerializer = (ISegmentSerializer)serializer; } else if (keySerializer == null && asyncKeySerializer != null) { @@ -532,7 +533,7 @@ private void InitializeSerializers( throw new ArgumentNullException( $"Value serializer not specified and there is no default serializer defined for type {typeof(TValue).Name}."); } - this.valueSerializer = (ISerializer)serializer; + this.valueSerializer = (ISegmentSerializer)serializer; } else if (valueSerializer == null && asyncValueSerializer != null) { @@ -746,7 +747,7 @@ public async Task> ProduceAsync( { Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; + ArraySegment keyBytes; try { keyBytes = (keySerializer != null) @@ -765,7 +766,7 @@ public async Task> ProduceAsync( ex); } - byte[] valBytes; + ArraySegment valBytes; try { valBytes = (valueSerializer != null) @@ -801,22 +802,49 @@ public async Task> ProduceAsync( ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes.Array, valBytes.Offset, valBytes.Count, + keyBytes.Array, keyBytes.Offset, keyBytes.Count, message.Timestamp, topicPartition.Partition, headers, handler); - + return await handler.Task.ConfigureAwait(false); } else { ProduceImpl( - topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, - message.Timestamp, topicPartition.Partition, headers, + topicPartition.Topic, + valBytes.Array, valBytes.Offset, valBytes.Count, + keyBytes.Array, keyBytes.Offset, keyBytes.Count, + message.Timestamp, topicPartition.Partition, headers, null); + // We can release immediately after calling ProduceImpl which invokes the underlying + // librdkafka method because immediately after that method is invoked it is copied + // to the underlying librdkafka buffer + try + { + if (keySerializer != null) + keySerializer.Release(ref keyBytes); + else + asyncKeySerializer.Release(ref keyBytes); + } + catch (Exception) + { + // If the release fails do not do anything - it was delivered to the hooks as expected + } + + try + { + if (valueSerializer != null) + valueSerializer.Release(ref valBytes); + else + asyncValueSerializer.Release(ref valBytes); + } + catch (Exception) + { + // If the release fails do not do anything - it was delivered to the hooks as expected + } + var result = new DeliveryResult { TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset), @@ -836,6 +864,35 @@ public async Task> ProduceAsync( TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) }); } + finally + { + // We can release immediately after calling ProduceImpl which invokes the underlying + // librdkafka method because immediately after that method is invoked it is copied + // to the underlying librdkafka buffer + try + { + if (keySerializer != null) + keySerializer.Release(ref keyBytes); + else + asyncKeySerializer.Release(ref keyBytes); + } + catch (Exception) + { + // If the release fails do not do anything - it was delivered to the hooks as expected + } + + try + { + if (valueSerializer != null) + valueSerializer.Release(ref valBytes); + else + asyncValueSerializer.Release(ref valBytes); + } + catch (Exception) + { + // If the release fails do not do anything - it was delivered to the hooks as expected + } + } } @@ -869,7 +926,7 @@ public void Produce( Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; + ArraySegment keyBytes; try { keyBytes = (keySerializer != null) @@ -888,7 +945,7 @@ public void Produce( ex); } - byte[] valBytes; + ArraySegment valBytes; try { valBytes = (valueSerializer != null) @@ -911,8 +968,8 @@ public void Produce( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes.Array, valBytes.Offset, valBytes.Count, + keyBytes.Array, keyBytes.Offset, keyBytes.Count, message.Timestamp, topicPartition.Partition, headers, deliveryHandler == null @@ -928,10 +985,39 @@ public void Produce( throw new ProduceException( ex.Error, new DeliveryReport - { - Message = message, - TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) - }); + { + Message = message, + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) + }); + } + finally + { + // We can release immediately after calling ProduceImpl which invokes the underlying + // librdkafka method because immediately after that method is invoked it is copied + // to the underlying librdkafka buffer + try + { + if (keySerializer != null) + keySerializer.Release(ref keyBytes); + else + asyncKeySerializer.Release(ref keyBytes); + } + catch (Exception) + { + // If the release fails do not do anything - it was delivered to the hooks as expected + } + + try + { + if (valueSerializer != null) + valueSerializer.Release(ref valBytes); + else + asyncValueSerializer.Release(ref valBytes); + } + catch (Exception) + { + // If the release fails do not do anything - it was delivered to the hooks as expected + } } } diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index bb24db3a2..707134c88 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -97,22 +97,22 @@ public class ProducerBuilder /// /// The configured key serializer. /// - internal protected ISerializer KeySerializer { get; set; } + internal protected ISegmentSerializer KeySerializer { get; set; } /// /// The configured value serializer. /// - internal protected ISerializer ValueSerializer { get; set; } + internal protected ISegmentSerializer ValueSerializer { get; set; } /// /// The configured async key serializer. /// - internal protected IAsyncSerializer AsyncKeySerializer { get; set; } + internal protected IAsyncSegmentSerializer AsyncKeySerializer { get; set; } /// /// The configured async value serializer. /// - internal protected IAsyncSerializer AsyncValueSerializer { get; set; } + internal protected IAsyncSegmentSerializer AsyncValueSerializer { get; set; } internal Producer.Config ConstructBaseConfig(Producer producer) { @@ -306,7 +306,7 @@ public ProducerBuilder SetKeySerializer(ISerializer serializ { throw new InvalidOperationException("Key serializer may not be specified more than once."); } - this.KeySerializer = serializer; + this.KeySerializer = new WrappedSyncSegmentSerializer(serializer); return this; } @@ -325,7 +325,7 @@ public ProducerBuilder SetValueSerializer(ISerializer seri { throw new InvalidOperationException("Value serializer may not be specified more than once."); } - this.ValueSerializer = serializer; + this.ValueSerializer = new WrappedSyncSegmentSerializer(serializer); return this; } @@ -344,7 +344,7 @@ public ProducerBuilder SetKeySerializer(IAsyncSerializer ser { throw new InvalidOperationException("Key serializer may not be specified more than once."); } - this.AsyncKeySerializer = serializer; + this.AsyncKeySerializer = new WrappedAsyncSyncSegmentSerializer(serializer); return this; } @@ -358,6 +358,82 @@ public ProducerBuilder SetKeySerializer(IAsyncSerializer ser /// Produce or ProduceAsync. /// public ProducerBuilder SetValueSerializer(IAsyncSerializer serializer) + { + if (this.ValueSerializer != null || this.AsyncValueSerializer != null) + { + throw new InvalidOperationException("Value serializer may not be specified more than once."); + } + this.AsyncValueSerializer = new WrappedAsyncSyncSegmentSerializer(serializer); + return this; + } + + /// + /// The serializer to use to serialize keys. + /// + /// + /// If your key serializer throws an exception, this will be + /// wrapped in a ProduceException with ErrorCode + /// Local_KeySerialization and thrown by the initiating call to + /// Produce or ProduceAsync. + /// + public ProducerBuilder SetKeySerializer(ISegmentSerializer serializer) + { + if (this.KeySerializer != null || this.AsyncKeySerializer != null) + { + throw new InvalidOperationException("Key serializer may not be specified more than once."); + } + this.KeySerializer = serializer; + return this; + } + + /// + /// The serializer to use to serialize values. + /// + /// + /// If your value serializer throws an exception, this will be + /// wrapped in a ProduceException with ErrorCode + /// Local_ValueSerialization and thrown by the initiating call to + /// Produce or ProduceAsync. + /// + public ProducerBuilder SetValueSerializer(ISegmentSerializer serializer) + { + if (this.ValueSerializer != null || this.AsyncValueSerializer != null) + { + throw new InvalidOperationException("Value serializer may not be specified more than once."); + } + this.ValueSerializer = serializer; + return this; + } + + /// + /// The serializer to use to serialize keys. + /// + /// + /// If your key serializer throws an exception, this will be + /// wrapped in a ProduceException with ErrorCode + /// Local_KeySerialization and thrown by the initiating call to + /// Produce or ProduceAsync. + /// + public ProducerBuilder SetKeySerializer(IAsyncSegmentSerializer serializer) + { + if (this.KeySerializer != null || this.AsyncKeySerializer != null) + { + throw new InvalidOperationException("Key serializer may not be specified more than once."); + } + this.AsyncKeySerializer = serializer; + return this; + } + + /// + /// The serializer to use to serialize values. + /// + /// + /// If your value serializer throws an exception, this will be + /// wrapped in a ProduceException with ErrorCode + /// Local_ValueSerialization and thrown by the initiating call to + /// Produce or ProduceAsync. + /// + public ProducerBuilder SetValueSerializer(IAsyncSegmentSerializer serializer) { if (this.ValueSerializer != null || this.AsyncValueSerializer != null) { diff --git a/src/Confluent.Kafka/WrappedAsyncSegmentSerializer.cs b/src/Confluent.Kafka/WrappedAsyncSegmentSerializer.cs new file mode 100644 index 000000000..b9a0931ef --- /dev/null +++ b/src/Confluent.Kafka/WrappedAsyncSegmentSerializer.cs @@ -0,0 +1,56 @@ +// Copyright 2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + + +using System; +using System.Threading.Tasks; + +namespace Confluent.Kafka +{ + + /// + /// A wrapped segment serializer to facilitate up-casting to the segment based API + /// + /// The type of the value that can be serialized by this serializer + public class WrappedAsyncSyncSegmentSerializer : IAsyncSegmentSerializer + { + private readonly IAsyncSerializer serializer; + + /// + /// Created a using an inner . + /// This just decorates the returned byte array in an array segment wrapper using the length of the + /// returned array to establish bounds. This does require awaiting on the returned task. + /// + /// The inner serializer to use + public WrappedAsyncSyncSegmentSerializer(IAsyncSerializer innerSerializer) + { + serializer = innerSerializer; + } + + /// + public async Task> SerializeAsync(T data, SerializationContext context) + { + byte[] result = await serializer.SerializeAsync(data, context).ConfigureAwait(false); + return new ArraySegment(result); + } + + /// + public void Release(ref ArraySegment segment) + { + // Do nothing + } + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/WrappedSyncSegmentSerializer.cs b/src/Confluent.Kafka/WrappedSyncSegmentSerializer.cs new file mode 100644 index 000000000..8a926e294 --- /dev/null +++ b/src/Confluent.Kafka/WrappedSyncSegmentSerializer.cs @@ -0,0 +1,53 @@ +// Copyright 2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; + +namespace Confluent.Kafka +{ + + /// + /// A wrapped segment serializer to facilitate up-casting to the segment based API + /// + /// The type of the value that can be serialized by this serializer + public class WrappedSyncSegmentSerializer : ISegmentSerializer + { + private readonly ISerializer serializer; + + /// + /// Created a using an inner . + /// This just decorates the returned byte array in an array segment wrapper using the length of the + /// returned array to establish bounds. + /// + /// The inner serializer to use + public WrappedSyncSegmentSerializer(ISerializer innerSerializer) + { + serializer = innerSerializer; + } + + /// + public ArraySegment Serialize(T data, SerializationContext context) + { + return new ArraySegment(serializer.Serialize(data, context)); + } + + /// + public void Release(ref ArraySegment segment) + { + // Do nothing + } + } +} \ No newline at end of file diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs index 7ba394eff..ed8bdb1bf 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_CustomDefaults.cs @@ -49,7 +49,7 @@ public override IProducer Build() { if (KeySerializer == null && AsyncKeySerializer == null) { - this.KeySerializer = (ISerializer)(new Utf32Serializer()); + this.KeySerializer = new WrappedSyncSegmentSerializer((ISerializer)new Utf32Serializer()); } } @@ -57,7 +57,7 @@ public override IProducer Build() { if (ValueSerializer == null && AsyncValueSerializer == null) { - this.ValueSerializer = (ISerializer)(new Utf32Serializer()); + this.ValueSerializer = new WrappedSyncSegmentSerializer((ISerializer)new Utf32Serializer()); } }