Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions src/Confluent.Kafka/ArrayPoolBufferWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2016-2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;
using System.Buffers;


namespace Confluent.Kafka
{
/// <summary>
/// Implements a <see cref="IBufferWriter{Byte}"/> using <see cref="ArrayPool{Byte}.Shared"/> for memory allocation.
/// </summary>
internal sealed class ArrayPoolBufferWriter : ISerializationBuffer
{
/// <summary>
/// The default buffer size to use to expand empty arrays.
/// </summary>
private const int DefaultInitialBufferSize = 256;

private readonly ArrayPool<byte> pool;
private int index;
private byte[] array;

/// <summary>
/// Initializes a new instance of the <see cref="ArrayPoolBufferWriter"/> class.
/// </summary>
public ArrayPoolBufferWriter()
{
this.pool = ArrayPool<byte>.Shared;
this.array = pool.Rent(DefaultInitialBufferSize);
this.index = 0;
}

/// <summary>
/// Returns the memory allocation on finialization if not explicitly disposed in user code.
/// </summary>
~ArrayPoolBufferWriter() => Dispose();

/// <inheritdoc />
public void Advance(int count)
{
byte[] array = this.array;

if (array is null)
{
throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter));
}

if (count < 0)
{
throw new ArgumentOutOfRangeException("Count must be greater than 0");
}

if (this.index > array.Length - count)
{
throw new ArgumentOutOfRangeException("Cannot advance further than current capacity");
}

this.index += count;
}

/// <inheritdoc />
public void Dispose()
{
if (this.array != null)
{
this.pool.Return(this.array, true);
this.array = null;
}
}


public ArraySegment<byte> GetComitted(int offset = 0)
{
return new ArraySegment<byte>(this.array, offset, this.index);
}

/// <inheritdoc />
public Memory<byte> GetMemory(int sizeHint = 0)
{
EnsureCapacity(sizeHint);
return this.array.AsMemory(this.index);
}

/// <inheritdoc />
public Span<byte> GetSpan(int sizeHint = 0)
{
EnsureCapacity(sizeHint);
return this.array.AsSpan(this.index);
}

private void EnsureCapacity(int sizeHint)
{
var array = this.array;

if (array is null)
{
throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter));
}

if (sizeHint < 0)
{
throw new ArgumentOutOfRangeException("Cannot advance further than current capacity");
}

if (sizeHint == 0)
{
sizeHint = 1;
}

if (sizeHint > array.Length - this.index)
{
int minimumSize = this.index + sizeHint;
var newArray = pool.Rent(minimumSize);

Array.Copy(array, 0, newArray, 0, this.index);
pool.Return(array, true);
array = newArray;
}
}
}
}
45 changes: 45 additions & 0 deletions src/Confluent.Kafka/BufferWriterExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2016-2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;
using System.Buffers;
using System.IO;


namespace Confluent.Kafka
{
/// <summary>
/// Extends a <see cref="IBufferWriter{Byte}"/> with a <see cref="Stream"/> adapter.
/// </summary>
public static class BufferWriterExtensions
{
/// <summary>
/// Gets a <see cref="Stream"/> adapting implementation working on a <see cref="IBufferWriter{Byte}"/> as underlying memory.
/// </summary>
/// <param name="bufferWriter">The <see cref="IBufferWriter{Byte}"/> used for underlying memory.</param>
/// <returns></returns>
/// <exception cref="ArgumentNullException">Thrown if the provises <paramref name="bufferWriter"/> is null.</exception>
public static Stream AsStream(this IBufferWriter<byte> bufferWriter)
{
if (bufferWriter is null)
{
throw new ArgumentNullException(nameof(bufferWriter));
}

return new BufferWriterStream(bufferWriter);
}
}
}
158 changes: 158 additions & 0 deletions src/Confluent.Kafka/BufferWriterStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2016-2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;


namespace Confluent.Kafka
{
/// <summary>
/// A <see cref="Stream"/> implementation wrapping an <see cref="IBufferWriter{Byte}"/> instance.
/// </summary>
internal sealed class BufferWriterStream : Stream
{
private readonly IBufferWriter<byte> bufferWriter;
private bool disposed;

/// <summary>
/// Initializes a new instance of the <see cref="BufferWriterStream"/> class.
/// </summary>
/// <param name="bufferWriter">The target <see cref="IBufferWriter{Byte}"/> instance to use.</param>
public BufferWriterStream(IBufferWriter<byte> bufferWriter)
{
this.bufferWriter = bufferWriter ?? throw new ArgumentNullException(nameof(bufferWriter));
}

/// <inheritdoc/>
public override bool CanRead => false;

/// <inheritdoc/>
public override bool CanSeek => false;

/// <inheritdoc/>
public override bool CanWrite
{
get => !this.disposed;
}

/// <inheritdoc/>
public override long Length => throw new NotSupportedException();

/// <inheritdoc/>
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

/// <inheritdoc/>
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
throw new NotSupportedException();
}

/// <inheritdoc/>
public override void Flush()
{
}

/// <inheritdoc/>
public override Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.FromResult(true);
}

/// <inheritdoc/>
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
throw new NotSupportedException();
}

/// <inheritdoc/>
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Write(buffer, offset, count);
cancellationToken.ThrowIfCancellationRequested();

return Task.FromResult(true);
}

/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}

/// <inheritdoc/>
public override void SetLength(long value)
{
throw new NotSupportedException();
}

/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}

/// <inheritdoc/>
public override int ReadByte()
{
throw new NotSupportedException();
}

/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count)
{
if (this.disposed)
{
throw new ObjectDisposedException(nameof(BufferWriterStream));
}

var source = buffer.AsSpan(offset, count);
var destination = this.bufferWriter.GetSpan(count);

source.CopyTo(destination);

this.bufferWriter.Advance(count);
}

/// <inheritdoc/>
public override void WriteByte(byte value)
{
if (this.disposed)
{
throw new ObjectDisposedException(nameof(BufferWriterStream));
}

this.bufferWriter.GetSpan(1)[0] = value;

this.bufferWriter.Advance(1);
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
this.disposed = true;
}
}
}
1 change: 1 addition & 0 deletions src/Confluent.Kafka/Confluent.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<PackageReference Include="librdkafka.redist" Version="1.8.2">
<PrivateAssets Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">None</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Buffers" Version="4.5.1" />
<PackageReference Include="System.Memory" Version="4.5.0" />
</ItemGroup>

Expand Down
43 changes: 43 additions & 0 deletions src/Confluent.Kafka/DefaultSerializationBufferProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Buffers;


namespace Confluent.Kafka
{
/// <summary>
/// The default <see cref="ISerializationBufferProvider"/>. Creates buffers using <see cref="ArrayPool{Byte}"/> internal memory.
/// </summary>
public sealed class DefaultSerializationBufferProvider : ISerializationBufferProvider
{
/// <summary>
/// Gets a singleton instance of the <see cref="DefaultSerializationBufferProvider "/>
/// </summary>
public static DefaultSerializationBufferProvider Instance { get; } = new DefaultSerializationBufferProvider();

private DefaultSerializationBufferProvider()
{

}

/// <inheritdoc />
public ISerializationBuffer Create()
{
return new ArrayPoolBufferWriter();
}
}
}
Loading