Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions src/Confluent.Kafka/DependentProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class DependentProducerBuilder<TKey, TValue>
/// <summary>
/// The configured key serializer.
/// </summary>
public ISerializer<TKey> KeySerializer { get; set; }
public ISegmentSerializer<TKey> KeySerializer { get; set; }

/// <summary>
/// The configured value serializer.
/// </summary>
public ISerializer<TValue> ValueSerializer { get; set; }

public ISegmentSerializer<TValue> ValueSerializer { get; set; }


/// <summary>
/// The configured async key serializer.
/// </summary>
public IAsyncSerializer<TKey> AsyncKeySerializer { get; set; }
public IAsyncSegmentSerializer<TKey> AsyncKeySerializer { get; set; }

/// <summary>
/// The configured async value serializer.
/// </summary>
public IAsyncSerializer<TValue> AsyncValueSerializer { get; set; }

public IAsyncSegmentSerializer<TValue> AsyncValueSerializer { get; set; }

/// <summary>
/// An underlying librdkafka client handle that the Producer will use to
Expand All @@ -70,7 +70,7 @@ public DependentProducerBuilder(Handle handle)
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetKeySerializer(ISerializer<TKey> serializer)
{
this.KeySerializer = serializer;
this.KeySerializer = new WrappedSyncSegmentSerializer<TKey>(serializer);
return this;
}

Expand All @@ -79,7 +79,7 @@ public DependentProducerBuilder<TKey, TValue> SetKeySerializer(ISerializer<TKey>
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetValueSerializer(ISerializer<TValue> serializer)
{
this.ValueSerializer = serializer;
this.ValueSerializer = new WrappedSyncSegmentSerializer<TValue>(serializer);
return this;
}

Expand All @@ -88,18 +88,54 @@ public DependentProducerBuilder<TKey, TValue> SetValueSerializer(ISerializer<TVa
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetKeySerializer(IAsyncSerializer<TKey> serializer)
{
this.AsyncKeySerializer = serializer;
this.AsyncKeySerializer = new WrappedAsyncSyncSegmentSerializer<TKey>(serializer);
return this;
}

/// <summary>
/// The async serializer to use to serialize values.
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetValueSerializer(IAsyncSerializer<TValue> serializer)
{
this.AsyncValueSerializer = new WrappedAsyncSyncSegmentSerializer<TValue>(serializer);
return this;
}

/// <summary>
/// The async serializer to use to serialize keys. This uses the array segment API
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetKeySerializer(IAsyncSegmentSerializer<TKey> serializer)
{
this.AsyncKeySerializer = serializer;
return this;
}

/// <summary>
/// The async serializer to use to serialize values. This uses the array segment API
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetValueSerializer(IAsyncSegmentSerializer<TValue> serializer)
{
this.AsyncValueSerializer = serializer;
return this;
}

/// <summary>
/// The async serializer to use to serialize keys. This uses the array segment API
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetKeySerializer(ISegmentSerializer<TKey> serializer)
{
this.KeySerializer = serializer;
return this;
}

/// <summary>
/// The async serializer to use to serialize values. This uses the array segment API
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetValueSerializer(ISegmentSerializer<TValue> serializer)
{
this.ValueSerializer = serializer;
return this;
}

/// <summary>
/// Build a new IProducer implementation instance.
Expand Down
49 changes: 49 additions & 0 deletions src/Confluent.Kafka/IAsyncSegmentSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2016-2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;
using System.Threading.Tasks;

namespace Confluent.Kafka
{
/// <summary>
/// Defines a serializer for use with <see cref="Confluent.Kafka.Producer{TKey,TValue}" />.
/// </summary>
public interface IAsyncSegmentSerializer<T>
{
/// <summary>
/// Serialize the key or value of a <see cref="Message{TKey,TValue}" />
/// instance.
/// </summary>
/// <param name="data">
/// The value to serialize.
/// </param>
/// <param name="context">
/// Context relevant to the serialize operation.
/// </param>
/// <returns>
/// A <see cref="System.Threading.Tasks.Task" /> that
/// completes with the serialized data.
/// </returns>
Task<ArraySegment<byte>> SerializeAsync(T data, SerializationContext context);

/// <summary>
/// Release resources associated with the array segment
/// </summary>
/// <param name="segment">The segment that was created in the <see cref="SerializeAsync"/> call.</param>
void Release(ref ArraySegment<byte> segment);
}
}
48 changes: 48 additions & 0 deletions src/Confluent.Kafka/ISegmentSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.


using System;

namespace Confluent.Kafka
{
/// <summary>
/// Defines a serializer for use with <see cref="Confluent.Kafka.Producer{TKey,TValue}" />.
/// </summary>
public interface ISegmentSerializer<T>
{
/// <summary>
/// Serialize the key or value of a <see cref="Message{TKey,TValue}" />
/// instance.
/// </summary>
/// <param name="data">
/// The value to serialize.
/// </param>
/// <param name="context">
/// Context relevant to the serialize operation.
/// </param>
/// <returns>
/// The serialized value.
/// </returns>
ArraySegment<byte> Serialize(T data, SerializationContext context);

/// <summary>
/// Release resources associated with the array segment
/// </summary>
/// <param name="segment">The segment that was created in the <see cref="Serialize"/> call.</param>
void Release(ref ArraySegment<byte> segment);
}
}
Loading