Skip to content

Delay socket receive/send until first read/flush #34458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, Cancel
_outputOptions,
_options.WaitForDataBeforeAllocatingBuffer);

socketConnection.Start();
return socketConnection;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.IO.Pipelines;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed partial class SocketConnection
{
// We could implement this on SocketConnection to remove an extra allocation but this is a
// bit cleaner
private class SocketDuplexPipe : IDuplexPipe
{
public SocketDuplexPipe(SocketConnection connection)
{
Input = new SocketPipeReader(connection);
Output = new SocketPipeWriter(connection);
}

public PipeReader Input { get; }

public PipeWriter Output { get; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.IO.Pipelines;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed partial class SocketConnection
{
private class SocketPipeReader : PipeReader
{
private readonly SocketConnection _socketConnection;
private readonly PipeReader _reader;

public SocketPipeReader(SocketConnection socketConnection)
{
_socketConnection = socketConnection;
_reader = socketConnection.InnerTransport.Input;
}

public override void AdvanceTo(SequencePosition consumed)
{
_reader.AdvanceTo(consumed);
}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
_reader.AdvanceTo(consumed, examined);
}

public override void CancelPendingRead()
{
_reader.CancelPendingRead();
}

public override void Complete(Exception? exception = null)
{
_reader.Complete(exception);
}

public override ValueTask CompleteAsync(Exception? exception = null)
{
return _reader.CompleteAsync(exception);
}

public override Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default)
{
_socketConnection.EnsureStarted();
return _reader.CopyToAsync(destination, cancellationToken);
}

public override Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default)
{
_socketConnection.EnsureStarted();
return _reader.CopyToAsync(destination, cancellationToken);
}

protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
{
_socketConnection.EnsureStarted();
return _reader.ReadAtLeastAsync(minimumSize, cancellationToken);
}

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
_socketConnection.EnsureStarted();
return _reader.ReadAsync(cancellationToken);
}

public override bool TryRead(out ReadResult result)
{
_socketConnection.EnsureStarted();
return _reader.TryRead(out result);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.IO.Pipelines;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed partial class SocketConnection
{
private class SocketPipeWriter : PipeWriter
{
private readonly SocketConnection _socketConnection;
private readonly PipeWriter _writer;

public SocketPipeWriter(SocketConnection socketConnection)
{
_socketConnection = socketConnection;
_writer = socketConnection.InnerTransport.Output;
}

public override bool CanGetUnflushedBytes => _writer.CanGetUnflushedBytes;

public override long UnflushedBytes => _writer.UnflushedBytes;

public override void Advance(int bytes)
{
_writer.Advance(bytes);
}

public override void CancelPendingFlush()
{
_writer.CancelPendingFlush();
}

public override void Complete(Exception? exception = null)
{
_writer.Complete(exception);
}

public override ValueTask CompleteAsync(Exception? exception = null)
{
return _writer.CompleteAsync(exception);
}

public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
_socketConnection.EnsureStarted();
return _writer.WriteAsync(source, cancellationToken);
}

public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
_socketConnection.EnsureStarted();
return _writer.FlushAsync(cancellationToken);
}

public override Memory<byte> GetMemory(int sizeHint = 0)
{
return _writer.GetMemory(sizeHint);
}

public override Span<byte> GetSpan(int sizeHint = 0)
{
return _writer.GetSpan(sizeHint);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal sealed partial class SocketConnection : TransportConnection
private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource();
private bool _connectionClosed;
private readonly bool _waitForData;
private int _connectionStarted;

internal SocketConnection(Socket socket,
MemoryPool<byte> memoryPool,
Expand Down Expand Up @@ -67,31 +68,32 @@ internal SocketConnection(Socket socket,

var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);

// Set the transport and connection id
Transport = _originalTransport = pair.Transport;
_originalTransport = pair.Transport;
Application = pair.Application;

Transport = new SocketDuplexPipe(this);
Copy link
Member

@halter73 halter73 Jul 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
Transport = new SocketDuplexPipe(this);
Transport = new SocketDuplexPipe(this, pair.Transport);

And remove InnerTransport. Edit: Or just reference _originalTransport directly in SocketDuplexPipe since it's nested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind prefer using properties rather than a private field, even if it's internal. This is purely a stylistic thing though.


InitiaizeFeatures();
}

public IDuplexPipe InnerTransport => _originalTransport;

public PipeWriter Input => Application.Output;

public PipeReader Output => Application.Input;

public override MemoryPool<byte> MemoryPool { get; }

public void Start()
private void EnsureStarted()
{
try
if (_connectionStarted == 1 || Interlocked.CompareExchange(ref _connectionStarted, 1, 0) == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the "fast-check" if (_connectionStarted == 1 should this be a volatile read?

Or from different view: EnsureStarted is only called in DisposeAsync so is it worth to have a fast-check? Just the compare & exchange should be enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's called in more than 2 places. The fast check is in the hot path (every read/write/flush)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's a partial class. Thanks.

But is still safe without a volatile read? Now I believe it's safe, as in the case of _connectionStarted = 0 the CompareExchange is hit which has the proper memory-fences.

{
// Spawn send and receive logic
_receivingTask = DoReceive();
_sendingTask = DoSend();
}
catch (Exception ex)
{
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
return;
}

// Offload these to avoid potentially blocking the first read/write/flush
_receivingTask = Task.Run(DoReceive);
_sendingTask = Task.Run(DoSend);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task.Run causes an extra Task allocation. The compiler generated Task returned by the DoReceive method, and the wrapping Task created by Task.Run. If you just place await Task.Yield().ConfigureAwait(false); as the first statement in DoReceive, you will avoid this extra allocation.

Copy link
Member Author

@davidfowl davidfowl Jul 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can live with an extra allocation 😄. There's now 5 more here per connection. These are also long lived so I'm not concerned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 allocations because of DoSend, but fair enough.

}

public override void Abort(ConnectionAbortedException abortReason)
Expand All @@ -106,6 +108,9 @@ public override void Abort(ConnectionAbortedException abortReason)
// Only called after connection middleware is complete which means the ConnectionClosed token has fired.
public override async ValueTask DisposeAsync()
{
// Just in case we haven't started the connection, start it here so we can clean up properly.
EnsureStarted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove the _receivingTask != null and _sendingTask != null checks now. Technically these could still be null if some background thread flipped _connectionStarted and hasn't set these yet, but that would indicate some misusage of the ConnectionContext during dispose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it wouldn't be a misusage if the read or write was canceled before disposing.


_originalTransport.Input.Complete();
_originalTransport.Output.Complete();

Expand All @@ -125,7 +130,7 @@ public override async ValueTask DisposeAsync()
}
catch (Exception ex)
{
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.");
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ internal void Bind()
setting.OutputOptions,
waitForData: _options.WaitForDataBeforeAllocatingBuffer);

connection.Start();

_settingsIndex = (_settingsIndex + 1) % _settingsCount;

return connection;
Expand Down
Loading