Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Dispose of IDisposable context [Design] #204

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.AspNet.Server.Kestrel.Networking;
using Microsoft.Framework.Logging;
Expand All @@ -13,6 +14,8 @@ public class Connection : ConnectionContext, IConnectionControl
private static readonly Action<UvStreamHandle, int, Exception, object> _readCallback = ReadCallback;
private static readonly Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback = AllocCallback;

private static long _lastConnectionId;

private readonly UvStreamHandle _socket;
private Frame _frame;
private long _connectionId = 0;
Expand All @@ -24,14 +27,16 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context
{
_socket = socket;
ConnectionControl = this;

_connectionId = Interlocked.Increment(ref _lastConnectionId);
}

public void Start()
{
Log.ConnectionStart(_connectionId);

SocketInput = new SocketInput(Memory);
SocketOutput = new SocketOutput(Thread, _socket, Log);
SocketOutput = new SocketOutput(Thread, _socket, _connectionId, Log);
_frame = new Frame(this);
_socket.ReadStart(_allocCallback, _readCallback, this);
}
Expand Down Expand Up @@ -67,14 +72,17 @@ private void OnRead(UvStreamHandle handle, int status, Exception error)
}
else if (normalDone || errorDone)
{
Log.ConnectionReadFin(_connectionId);
SocketInput.RemoteIntakeFin = true;
_socket.ReadStop();

if (errorDone && error != null)
{
Log.LogError("Connection.OnRead", error);
}
else
{
Log.ConnectionReadFin(_connectionId);
}
}


Expand Down Expand Up @@ -113,19 +121,19 @@ void IConnectionControl.End(ProduceEndType endType)
}
_connectionState = ConnectionState.Shutdown;

Log.ConnectionWriteFin(_connectionId, 0);
Log.ConnectionWriteFin(_connectionId);
Thread.Post(
state =>
{
Log.ConnectionWriteFin(_connectionId, 1);
var self = (Connection)state;
var shutdown = new UvShutdownReq(Log);
var shutdown = new UvShutdownReq(self.Log);
shutdown.Init(self.Thread.Loop);
shutdown.Shutdown(self._socket, (req, status, _) =>
shutdown.Shutdown(self._socket, (req, status, state2) =>
{
Log.ConnectionWriteFin(_connectionId, 1);
var self2 = (Connection)state2;
self2.Log.ConnectionWroteFin(_connectionId, status);
req.Dispose();
}, null);
}, this);
},
this);
break;
Expand Down
4 changes: 3 additions & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ private void FireOnCompleted()
private async Task ExecuteAsync()
{
Exception error = null;
IDisposable disposable = null;
try
{
await Application.Invoke(this).ConfigureAwait(false);
disposable = await Application.Invoke(this).ConfigureAwait(false);

// Trigger FireOnStarting if ProduceStart hasn't been called yet.
// We call it here, so it can go through our normal error handling
Expand All @@ -241,6 +242,7 @@ private async Task ExecuteAsync()
{
FireOnCompleted();
ProduceEnd(error);
disposable?.Dispose();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public Task StartAsync(
string host,
int port,
KestrelThread thread,
Func<Frame, Task> application)
Func<Frame, Task<IDisposable>> application)
{
Thread = thread;
Application = application;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ListenerContext(ListenerContext listenerContext)

public KestrelThread Thread { get; set; }

public Func<Frame, Task> Application { get; set; }
public Func<Frame, Task<IDisposable>> Application { get; set; }

public IMemoryPool Memory { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task StartAsync(
string host,
int port,
KestrelThread thread,
Func<Frame, Task> application)
Func<Frame, Task<IDisposable>> application)
{
await StartAsync(scheme, host, port, thread, application).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ protected ListenerSecondary(ServiceContext serviceContext) : base(serviceContext
public Task StartAsync(
string pipeName,
KestrelThread thread,
Func<Frame, Task> application)
Func<Frame, Task<IDisposable>> application)
{
Thread = thread;
Application = application;
Expand Down
8 changes: 5 additions & 3 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class SocketOutput : ISocketOutput

private readonly KestrelThread _thread;
private readonly UvStreamHandle _socket;
private readonly long _connectionId;
private readonly IKestrelTrace _log;

// This locks access to to all of the below fields
Expand All @@ -31,10 +32,11 @@ public class SocketOutput : ISocketOutput
private WriteContext _nextWriteContext;
private readonly Queue<CallbackContext> _callbacksPending;

public SocketOutput(KestrelThread thread, UvStreamHandle socket, IKestrelTrace log)
public SocketOutput(KestrelThread thread, UvStreamHandle socket, long connectionId, IKestrelTrace log)
{
_thread = thread;
_socket = socket;
_connectionId = connectionId;
_log = log;
_callbacksPending = new Queue<CallbackContext>();
}
Expand All @@ -46,7 +48,7 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
Array.Copy(buffer.Array, buffer.Offset, copy, 0, buffer.Count);
buffer = new ArraySegment<byte>(copy);

_log.ConnectionWrite(0, buffer.Count);
_log.ConnectionWrite(_connectionId, buffer.Count);

bool triggerCallbackNow = false;

Expand Down Expand Up @@ -155,7 +157,7 @@ private void WriteAllPending()
// This is called on the libuv event loop
private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, UvWriteReq req, int status, Exception error)
{
_log.ConnectionWriteCallback(0, status);
_log.ConnectionWriteCallback(_connectionId, status);

lock (_lockObj)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ public interface IKestrelTrace : ILogger

void ConnectionReadFin(long connectionId);

void ConnectionWriteFin(long connectionId, int step);
void ConnectionWriteFin(long connectionId);

void ConnectionWroteFin(long connectionId, int status);

void ConnectionKeepAlive(long connectionId);

Expand Down
34 changes: 21 additions & 13 deletions src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelTrace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,65 @@ public KestrelTrace(ILogger logger)

public void ConnectionStart(long connectionId)
{
_logger.LogDebug(13, $"{nameof(ConnectionStart)} -> Id: {connectionId}");
_logger.LogDebug(1, @"Connection id ""{ConnectionId}"" started.", connectionId);
}

public void ConnectionStop(long connectionId)
{
_logger.LogDebug(14, $"{nameof(ConnectionStop)} -> Id: {connectionId}");
_logger.LogDebug(2, @"Connection id ""{ConnectionId}"" stopped.", connectionId);
}

public void ConnectionRead(long connectionId, int status)
public void ConnectionRead(long connectionId, int count)
{
_logger.LogDebug(4, $"{nameof(ConnectionRead)} -> Id: {connectionId}, Status: {status}");
// Don't log for now since this could be *too* verbose.
// Reserved: Event ID 3
}

public void ConnectionPause(long connectionId)
{
_logger.LogDebug(5, $"{nameof(ConnectionPause)} -> Id: {connectionId}");
_logger.LogDebug(4, @"Connection id ""{ConnectionId}"" paused.", connectionId);
}

public void ConnectionResume(long connectionId)
{
_logger.LogDebug(6, $"{nameof(ConnectionResume)} -> Id: {connectionId}");
_logger.LogDebug(5, @"Connection id ""{ConnectionId}"" resumed.", connectionId);
}

public void ConnectionReadFin(long connectionId)
{
_logger.LogDebug(7, $"{nameof(ConnectionReadFin)} -> Id: {connectionId}");
_logger.LogDebug(6, @"Connection id ""{ConnectionId}"" received FIN.", connectionId);
}

public void ConnectionWriteFin(long connectionId, int step)
public void ConnectionWriteFin(long connectionId)
{
_logger.LogDebug(8, $"{nameof(ConnectionWriteFin)} -> Id: {connectionId}, Step: {step}");
_logger.LogDebug(7, @"Connection id ""{ConnectionId}"" sending FIN.", connectionId);
}

public void ConnectionWroteFin(long connectionId, int status)
{
_logger.LogDebug(8, @"Connection id ""{ConnectionId}"" sent FIN with status ""{Status}"".", connectionId, status);
}

public void ConnectionKeepAlive(long connectionId)
{
_logger.LogDebug(9, $"{nameof(ConnectionKeepAlive)} -> Id: {connectionId}");
_logger.LogDebug(9, @"Connection id ""{ConnectionId}"" completed keep alive response.", connectionId);
}

public void ConnectionDisconnect(long connectionId)
{
_logger.LogDebug(10, $"{nameof(ConnectionDisconnect)} -> Id: {connectionId}");
_logger.LogDebug(10, @"Connection id ""{ConnectionId}"" disconnected.", connectionId);
}

public void ConnectionWrite(long connectionId, int count)
{
_logger.LogDebug(11, $"{nameof(ConnectionWrite)} -> Id: {connectionId}, Count: {count}");
// Don't log for now since this could be *too* verbose.
// Reserved: Event ID 11
}

public void ConnectionWriteCallback(long connectionId, int status)
{
_logger.LogDebug(12, $"{nameof(ConnectionWriteCallback)} -> Id: {connectionId}, Status: {status}");
// Don't log for now since this could be *too* verbose.
// Reserved: Event ID 12
}

public void Log(LogLevel logLevel, int eventId, object state, Exception exception, Func<object, Exception, string> formatter)
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void Dispose()
Threads.Clear();
}

public IDisposable CreateServer(string scheme, string host, int port, Func<Frame, Task> application)
public IDisposable CreateServer(string scheme, string host, int port, Func<Frame, Task<IDisposable>> application)
{
var listeners = new List<IDisposable>();
var usingPipes = host.StartsWith(Constants.UnixPipeHostPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Reflection;
using System.Resources;
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Microsoft.AspNet.Server.KestrelTests")]
[assembly: AssemblyMetadata("Serviceable", "True")]
[assembly: AssemblyMetadata("Serviceable", "True")]
[assembly: NeutralResourcesLanguage("en-us")]
5 changes: 3 additions & 2 deletions src/Microsoft.AspNet.Server.Kestrel/ServerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public IFeatureCollection Initialize(IConfiguration configuration)
return serverFeatures;
}

public IDisposable Start(IFeatureCollection serverFeatures, Func<IFeatureCollection, Task> application)
public IDisposable Start(IFeatureCollection serverFeatures, Func<IFeatureCollection, Task<IDisposable>> application)
{
var disposables = new Stack<IDisposable>();
var disposer = new Disposable(() =>
Expand Down Expand Up @@ -73,7 +73,8 @@ public IDisposable Start(IFeatureCollection serverFeatures, Func<IFeatureCollect
async frame =>
{
var request = new ServerRequest(frame);
await application.Invoke(request.Features).ConfigureAwait(false);
return await application.Invoke(request.Features).ConfigureAwait(false);

}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public async Task ResponsesAreChunkedAutomatically()
frame.ResponseHeaders.Clear();
await frame.ResponseBody.WriteAsync(Encoding.ASCII.GetBytes("Hello "), 0, 6);
await frame.ResponseBody.WriteAsync(Encoding.ASCII.GetBytes("World!"), 0, 6);
return null;
}))
{
using (var connection = new TestConnection())
Expand Down Expand Up @@ -50,6 +51,7 @@ public async Task ZeroLengthWritesAreIgnored()
await frame.ResponseBody.WriteAsync(Encoding.ASCII.GetBytes("Hello "), 0, 6);
await frame.ResponseBody.WriteAsync(new byte[0], 0, 0);
await frame.ResponseBody.WriteAsync(Encoding.ASCII.GetBytes("World!"), 0, 6);
return null;
}))
{
using (var connection = new TestConnection())
Expand Down Expand Up @@ -80,6 +82,7 @@ public async Task EmptyResponseBodyHandledCorrectlyWithZeroLengthWrite()
{
frame.ResponseHeaders.Clear();
await frame.ResponseBody.WriteAsync(new byte[0], 0, 0);
return null;
}))
{
using (var connection = new TestConnection())
Expand Down
16 changes: 11 additions & 5 deletions test/Microsoft.AspNet.Server.KestrelTests/EngineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Microsoft.AspNet.Server.KestrelTests
/// </summary>
public class EngineTests
{
private async Task App(Frame frame)
private async Task<IDisposable> App(Frame frame)
{
frame.ResponseHeaders.Clear();
for (; ;)
Expand All @@ -33,6 +33,7 @@ private async Task App(Frame frame)
}
await frame.ResponseBody.WriteAsync(buffer, 0, count);
}
return null;
}

ILibraryManager LibraryManager
Expand All @@ -58,7 +59,7 @@ ILibraryManager LibraryManager
}
}

private async Task AppChunked(Frame frame)
private async Task<IDisposable> AppChunked(Frame frame)
{
frame.ResponseHeaders.Clear();
var data = new MemoryStream();
Expand All @@ -75,12 +76,13 @@ private async Task AppChunked(Frame frame)
var bytes = data.ToArray();
frame.ResponseHeaders["Content-Length"] = new[] { bytes.Length.ToString() };
await frame.ResponseBody.WriteAsync(bytes, 0, bytes.Length);
return null;
}

private Task EmptyApp(Frame frame)
private Task<IDisposable> EmptyApp(Frame frame)
{
frame.ResponseHeaders.Clear();
return Task.FromResult<object>(null);
return Task.FromResult<IDisposable>(null);
}

[Fact]
Expand Down Expand Up @@ -472,6 +474,8 @@ public async Task ZeroContentLengthNotSetAutomaticallyForCertainStatusCodes()
var statusString = await reader.ReadLineAsync();
frame.StatusCode = int.Parse(statusString);
}

return null;
}))
{
using (var connection = new TestConnection())
Expand Down Expand Up @@ -715,7 +719,7 @@ public async Task ThrowingInOnStartingResultsIn500Response()

// If we write to the response stream, we will not get a 500.

return Task.FromResult<object>(null);
return Task.FromResult<IDisposable>(null);
}))
{
using (var connection = new TestConnection())
Expand Down Expand Up @@ -770,6 +774,8 @@ public async Task ThrowingInOnStartingResultsInFailedWrites()

// The second write should succeed since the OnStarting callback will not be called again
await frame.ResponseBody.WriteAsync(Encoding.ASCII.GetBytes("Exception!!"), 0, 11);

return null;
}))
{
using (var connection = new TestConnection())
Expand Down
Loading