Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Added initial connection middleware pipeline #2003

Merged
merged 5 commits into from
Aug 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions benchmarks/Kestrel.Performance/FrameWritingBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ public class FrameWritingBenchmark
private static readonly Func<object, Task> _psuedoAsyncTaskFunc = (obj) => _psuedoAsyncTask;

private readonly TestFrame<object> _frame;
private readonly IPipe _outputPipe;
private (IPipeConnection Transport, IPipeConnection Application) _pair;

private readonly byte[] _writeData;

public FrameWritingBenchmark()
{
var pipeFactory = new PipeFactory();

_outputPipe = pipeFactory.Create();
_frame = MakeFrame(pipeFactory);
_frame = MakeFrame();
_writeData = Encoding.ASCII.GetBytes("Hello, World!");
}

Expand Down Expand Up @@ -93,9 +90,11 @@ public Task WriteAsync()
return _frame.ResponseBody.WriteAsync(_writeData, 0, _writeData.Length, default(CancellationToken));
}

private TestFrame<object> MakeFrame(PipeFactory pipeFactory)
private TestFrame<object> MakeFrame()
{
var input = pipeFactory.Create();
var pipeFactory = new PipeFactory();
var pair = pipeFactory.CreateConnectionPair();
_pair = pair;

var serviceContext = new ServiceContext
{
Expand All @@ -109,8 +108,8 @@ private TestFrame<object> MakeFrame(PipeFactory pipeFactory)
{
ServiceContext = serviceContext,
PipeFactory = pipeFactory,
Input = input.Reader,
Output = _outputPipe
Application = pair.Application,
Transport = pair.Transport
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to use _outputPipe here so the Cleanup method works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

});

frame.Reset();
Expand All @@ -122,7 +121,7 @@ private TestFrame<object> MakeFrame(PipeFactory pipeFactory)
[IterationCleanup]
public void Cleanup()
{
var reader = _outputPipe.Reader;
var reader = _pair.Application.Input;
if (reader.TryRead(out var readResult))
{
reader.Advance(readResult.Buffer.End);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ private Task PlaintextChunkedWithCookie()
public void Setup()
{
var pipeFactory = new PipeFactory();
var input = pipeFactory.Create();
var output = pipeFactory.Create();
var pair = pipeFactory.CreateConnectionPair();

var serviceContext = new ServiceContext
{
Expand All @@ -126,8 +125,8 @@ public void Setup()
ServiceContext = serviceContext,
PipeFactory = pipeFactory,
TimeoutControl = new MockTimeoutControl(),
Input = input.Reader,
Output = output
Application = pair.Application,
Transport = pair.Transport
});

frame.Reset();
Expand Down
34 changes: 19 additions & 15 deletions src/Kestrel.Core/Adapter/Internal/AdaptedPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,36 @@
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using System.IO.Pipelines;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Adapter.Internal
{
public class AdaptedPipeline
public class AdaptedPipeline : IPipeConnection
{
private const int MinAllocBufferSize = 2048;

private readonly IKestrelTrace _trace;
private readonly IPipe _transportOutputPipe;
private readonly IPipeReader _transportInputPipeReader;
private readonly IPipeConnection _transport;
private readonly IPipeConnection _application;

public AdaptedPipeline(IPipeReader transportInputPipeReader,
IPipe transportOutputPipe,
public AdaptedPipeline(IPipeConnection transport,
IPipeConnection application,
IPipe inputPipe,
IPipe outputPipe,
IKestrelTrace trace)
IPipe outputPipe)
{
_transportInputPipeReader = transportInputPipeReader;
_transportOutputPipe = transportOutputPipe;
_transport = transport;
_application = application;
Input = inputPipe;
Output = outputPipe;
_trace = trace;
}

public IPipe Input { get; }

public IPipe Output { get; }

IPipeReader IPipeConnection.Input => Input.Reader;

IPipeWriter IPipeConnection.Output => Output.Writer;

public async Task RunAsync(Stream stream)
{
var inputTask = ReadInputAsync(stream);
Expand Down Expand Up @@ -65,7 +65,7 @@ private async Task WriteOutputAsync(Stream stream)
if (result.IsCancelled)
{
// Forward the cancellation to the transport pipe
_transportOutputPipe.Reader.CancelPendingRead();
_application.Input.CancelPendingRead();
break;
}

Expand Down Expand Up @@ -104,7 +104,7 @@ private async Task WriteOutputAsync(Stream stream)
finally
{
Output.Reader.Complete();
_transportOutputPipe.Writer.Complete(error);
_transport.Output.Complete();
}
}

Expand Down Expand Up @@ -161,8 +161,12 @@ private async Task ReadInputAsync(Stream stream)
Input.Writer.Complete(error);
// The application could have ended the input pipe so complete
// the transport pipe as well
_transportInputPipeReader.Complete();
_transport.Input.Complete();
}
}

public void Dispose()
{
}
}
}
110 changes: 37 additions & 73 deletions src/Kestrel.Core/Internal/ConnectionHandler.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Net;
using System.Threading;
using Microsoft.AspNetCore.Hosting.Server;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Protocols.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public class ConnectionHandler<TContext> : IConnectionHandler
public class ConnectionHandler : IConnectionHandler
{
private static long _lastFrameConnectionId = long.MinValue;

private readonly ListenOptions _listenOptions;
private readonly ServiceContext _serviceContext;
private readonly IHttpApplication<TContext> _application;
private readonly ConnectionDelegate _connectionDelegate;

public ConnectionHandler(ListenOptions listenOptions, ServiceContext serviceContext, IHttpApplication<TContext> application)
public ConnectionHandler(ServiceContext serviceContext, ConnectionDelegate connectionDelegate)
{
_listenOptions = listenOptions;
_serviceContext = serviceContext;
_application = application;
_connectionDelegate = connectionDelegate;
}

public void OnConnection(IFeatureCollection features)
Expand All @@ -34,89 +30,57 @@ public void OnConnection(IFeatureCollection features)

var transportFeature = connectionContext.Features.Get<IConnectionTransportFeature>();

var inputPipe = transportFeature.PipeFactory.Create(GetInputPipeOptions(transportFeature.InputWriterScheduler));
var outputPipe = transportFeature.PipeFactory.Create(GetOutputPipeOptions(transportFeature.OutputReaderScheduler));
// REVIEW: Unfortunately, we still need to use the service context to create the pipes since the settings
// for the scheduler and limits are specified here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just pass in the KestrelServerOptions? Do you need to pass in a scheduler and the "ThreadPool"?

var inputOptions = GetInputPipeOptions(_serviceContext, transportFeature.InputWriterScheduler);
var outputOptions = GetOutputPipeOptions(_serviceContext, transportFeature.OutputReaderScheduler);

var connectionId = CorrelationIdGenerator.GetNextId();
var frameConnectionId = Interlocked.Increment(ref _lastFrameConnectionId);
var pair = connectionContext.PipeFactory.CreateConnectionPair(inputOptions, outputOptions);

// Set the transport and connection id
connectionContext.ConnectionId = connectionId;
transportFeature.Connection = new PipeConnection(inputPipe.Reader, outputPipe.Writer);
var applicationConnection = new PipeConnection(outputPipe.Reader, inputPipe.Writer);

if (!_serviceContext.ConnectionManager.NormalConnectionCount.TryLockOne())
{
var goAway = new RejectionConnection(inputPipe, outputPipe, connectionId, _serviceContext)
{
Connection = applicationConnection
};

connectionContext.Features.Set<IConnectionApplicationFeature>(goAway);
connectionContext.ConnectionId = CorrelationIdGenerator.GetNextId();
connectionContext.Transport = pair.Transport;

goAway.Reject();
return;
}

var frameConnectionContext = new FrameConnectionContext
{
ConnectionId = connectionId,
FrameConnectionId = frameConnectionId,
ServiceContext = _serviceContext,
PipeFactory = connectionContext.PipeFactory,
ConnectionAdapters = _listenOptions.ConnectionAdapters,
Input = inputPipe,
Output = outputPipe
};
// This *must* be set before returning from OnConnection
transportFeature.Application = pair.Application;

var connectionFeature = connectionContext.Features.Get<IHttpConnectionFeature>();
// REVIEW: This task should be tracked by the server for graceful shutdown
// Today it's handled specifically for http but not for aribitrary middleware
_ = Execute(connectionContext);
}

if (connectionFeature != null)
private async Task Execute(ConnectionContext connectionContext)
{
try
{
if (connectionFeature.LocalIpAddress != null)
{
frameConnectionContext.LocalEndPoint = new IPEndPoint(connectionFeature.LocalIpAddress, connectionFeature.LocalPort);
}

if (connectionFeature.RemoteIpAddress != null)
{
frameConnectionContext.RemoteEndPoint = new IPEndPoint(connectionFeature.RemoteIpAddress, connectionFeature.RemotePort);
}
await _connectionDelegate(connectionContext);
}

var connection = new FrameConnection(frameConnectionContext)
catch (Exception ex)
{
Connection = applicationConnection
};

connectionContext.Features.Set<IConnectionApplicationFeature>(connection);

// Since data cannot be added to the inputPipe by the transport until OnConnection returns,
// Frame.ProcessRequestsAsync is guaranteed to unblock the transport thread before calling
// application code.
connection.StartRequestProcessing(_application);
_serviceContext.Log.LogCritical(0, ex, $"{nameof(ConnectionHandler)}.{nameof(Execute)}() {connectionContext.ConnectionId}");
}
}

// Internal for testing
internal PipeOptions GetInputPipeOptions(IScheduler writerScheduler) => new PipeOptions
internal static PipeOptions GetInputPipeOptions(ServiceContext serviceContext, IScheduler writerScheduler) => new PipeOptions
{
ReaderScheduler = _serviceContext.ThreadPool,
ReaderScheduler = serviceContext.ThreadPool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should pass the reader scheduler explicitly like the writer scheduler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, eventually.

WriterScheduler = writerScheduler,
MaximumSizeHigh = _serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
MaximumSizeLow = _serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0
MaximumSizeHigh = serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0,
MaximumSizeLow = serviceContext.ServerOptions.Limits.MaxRequestBufferSize ?? 0
};

internal PipeOptions GetOutputPipeOptions(IScheduler readerScheduler) => new PipeOptions
internal static PipeOptions GetOutputPipeOptions(ServiceContext serviceContext, IScheduler readerScheduler) => new PipeOptions
{
ReaderScheduler = readerScheduler,
WriterScheduler = _serviceContext.ThreadPool,
MaximumSizeHigh = GetOutputResponseBufferSize(),
MaximumSizeLow = GetOutputResponseBufferSize()
WriterScheduler = serviceContext.ThreadPool,
MaximumSizeHigh = GetOutputResponseBufferSize(serviceContext),
MaximumSizeLow = GetOutputResponseBufferSize(serviceContext)
};

private long GetOutputResponseBufferSize()
private static long GetOutputResponseBufferSize(ServiceContext serviceContext)
{
var bufferSize = _serviceContext.ServerOptions.Limits.MaxResponseBufferSize;
var bufferSize = serviceContext.ServerOptions.Limits.MaxResponseBufferSize;
if (bufferSize == 0)
{
// 0 = no buffering so we need to configure the pipe so the the writer waits on the reader directly
Expand Down
16 changes: 16 additions & 0 deletions src/Kestrel.Core/Internal/ConnectionLimitBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Microsoft.AspNetCore.Protocols;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public static class ConnectionLimitBuilderExtensions
{
public static IConnectionBuilder UseConnectionLimit(this IConnectionBuilder builder, ServiceContext serviceContext)
{
return builder.Use(next =>
{
var middleware = new ConnectionLimitMiddleware(next, serviceContext);
return middleware.OnConnectionAsync;
});
}
}
}
32 changes: 32 additions & 0 deletions src/Kestrel.Core/Internal/ConnectionLimitMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.Protocols;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
public class ConnectionLimitMiddleware
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

{
private readonly ServiceContext _serviceContext;
private readonly ConnectionDelegate _next;

public ConnectionLimitMiddleware(ConnectionDelegate next, ServiceContext serviceContext)
{
_next = next;
_serviceContext = serviceContext;
}

public Task OnConnectionAsync(ConnectionContext connection)
{
if (!_serviceContext.ConnectionManager.NormalConnectionCount.TryLockOne())
{
KestrelEventSource.Log.ConnectionRejected(connection.ConnectionId);
_serviceContext.Log.ConnectionRejected(connection.ConnectionId);
connection.Transport.Input.Complete();
connection.Transport.Output.Complete();
return Task.CompletedTask;
}

return _next(connection);
}
}
}
Loading