Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

P/Invoke cleanup #60

Closed
wants to merge 76 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
a6cc0a1
Make the loop handle a simple piece of memory
Dec 27, 2014
681eb2a
Move all external methods into UnsafeNativeMethods.
Feb 7, 2015
bfb27f1
Removed the PlatformApis type
Feb 7, 2015
67b1645
Specialized the code for Windows
Feb 7, 2015
501a6e6
Reintroduced the proper error message for Linux without libuv.so.1
Feb 7, 2015
77af9a2
Moved the callback delegates into a separate file
Feb 7, 2015
faa3708
Inlined and simplified getting the error strings
Feb 7, 2015
2decc79
Removed an unused function
Feb 7, 2015
cc88652
The Check functions now are static and don't return back the error code
Feb 7, 2015
4260100
Removed an unused function
Feb 7, 2015
5e796e9
Inlined the pinvokes instead of routing them through Libuv
Feb 7, 2015
a7f80ac
Moved types out of Libuv.cs
Feb 7, 2015
82d7727
Moved the buffer type out of Libuv and inlined the constructor
Feb 7, 2015
ad6e5c6
Libuv is now a static class.
Feb 7, 2015
70adb13
Improved the error checking API
Feb 7, 2015
70d6830
Fixed the tests
Feb 7, 2015
66c1094
Removed superfluous unsafe
Feb 8, 2015
215aaca
Removed an unused member
Feb 8, 2015
83f0ed9
Currently only one byte-buffer is ever uv_write'ten
Feb 8, 2015
72b6863
The Init functions are now constructors
Feb 8, 2015
8f85ae9
Distinguished between Dispose and ReleaseHandle for the loop handle
Feb 8, 2015
c95ffa1
The CreateMemory calls are now part of the constructor
Feb 8, 2015
3aaafdf
Moved a class to a separate file. Fixed a file name
Feb 8, 2015
178f713
Replaced unsafe code by Marshal functions
Feb 8, 2015
39a52d4
Finally removed the unsafe requirement
Feb 8, 2015
344d3c1
Applied some code style
Feb 8, 2015
e9093c8
Used callbacks bound to an object instead of static ones
Feb 8, 2015
0fd0248
Closed the loop on the first try
Feb 8, 2015
f73bc36
Simplified the loop shutdown
Feb 8, 2015
cf71d1b
Cleanup
Feb 8, 2015
90408d1
Moved two functions down the inheritance tree
Feb 8, 2015
091d02e
The UvAsyncHandle now also sets a queueCloseHandle
Feb 8, 2015
2b0b145
Don't swallow exceptions
Feb 8, 2015
db8fc73
UvHandles don't follow the SafeHandle pattern, so that's not a good b…
Feb 9, 2015
3e822f6
Merged the remaining UvMemory implementation into other classes
Feb 9, 2015
0a0a509
Cleanup
Feb 9, 2015
0a677ef
Restored the manual native library loading and method binding
Feb 9, 2015
0ac3ce6
Sanitized the loop closing logic
Feb 9, 2015
3aa3243
Based the UvLoopHandle and Req off of the same base class
Feb 9, 2015
f875d13
Merged the two write request classes
Feb 10, 2015
d219f74
UvWriteReq now follows the SafeHandle protocol
Feb 10, 2015
a304518
Brought back the ArraySegment in the write request
Feb 10, 2015
74e1bef
Disable invalid warnings
Feb 11, 2015
ad47a69
Fixed the aspnetcore50 build
Feb 11, 2015
6ff30ce
Moved the self keepalive into the base class so the shutdown request …
Feb 11, 2015
f486522
Separate listen and stream TCP handles
Feb 20, 2015
644fe82
Made the listener's callback an instance function
Feb 20, 2015
00d2cae
Don't route the listener object through the handle
Feb 20, 2015
fbeccc8
Completely set up the TCP listener in the constructor
Feb 20, 2015
b949b68
Moved the accept function to the stream's constructor
Feb 20, 2015
56c32de
The read and alloc callbacks are mow instance functions
Feb 20, 2015
e15105a
Removed unused parameters from the callback chain
Feb 20, 2015
f210d1e
Merged the connection's Start method in the constructor
Feb 20, 2015
b0fd2af
Extracted a separate read request handle with explicit lifetime manag…
Feb 20, 2015
a04912a
Removed the trampoline callbacks
Feb 20, 2015
d6c82a9
Used the constructor in the shutdown request
Feb 20, 2015
53d0566
The Post method now works with just an Action
Feb 20, 2015
a1edb3b
Added a consumers of PostAsync
Feb 20, 2015
4db6eec
Cleanup
Feb 20, 2015
1560adc
Disposed read handles when closing the connection
Feb 20, 2015
603b9ea
Registered connections on the listener to close them during shutdown.
Feb 20, 2015
9602fd1
Improved the object validation
Feb 20, 2015
7ce00d0
Dispose the socket when the client closes the connection
Feb 20, 2015
74a04c2
Early-exit when the connection is closed
Feb 20, 2015
7245a17
Write is now Async. This fixes a race between closing the socket and …
Feb 21, 2015
7e34f49
Removed the write callback function
Feb 21, 2015
2ba41b7
Active write requests are canceled when the socket is closed
Feb 23, 2015
37e8715
Removed .Net 4.6'isms
Feb 24, 2015
b43b306
Code style
Feb 25, 2015
a93cc80
Propagate the read status code and add proper EOF
Feb 25, 2015
5ea8cc0
Used different ports in the test classes
Feb 25, 2015
0b7b166
Added API to check for active connections
Feb 25, 2015
ebf3c5f
Moved a failing test to an issue class
Feb 25, 2015
65e87b7
Added a few ways to create a leaked connection
Feb 25, 2015
3f9628f
Provide Write errors to upper layers
Feb 27, 2015
05e6404
Activated the (reliable!) engine tests
Feb 27, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Kestrel/ServerRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async Task<Stream> IHttpUpgradeFeature.UpgradeAsync()
_frame.ResponseHeaders["Upgrade"] = values;
}
}
_frame.ProduceStart();
await _frame.ProduceStartAsync();
return _frame.DuplexStream;
}
}
Expand Down
126 changes: 69 additions & 57 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,78 +35,80 @@ public interface IConnectionControl
{
void Pause();
void Resume();
void End(ProduceEndType endType);
Task EndAsync(ProduceEndType endType);
bool IsInKeepAlive { get; }
}

public class Connection : ConnectionContext, IConnectionControl
{
private static readonly Action<UvStreamHandle, int, Exception, object> _readCallback = ReadCallback;
private static readonly Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback = AllocCallback;
private readonly Action<int, Exception> _readCallback;
private readonly Func<int, UvBuffer> _allocCallback;
private readonly UvTcpStreamHandle _socket;
private readonly Listener _listener;

private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggestedSize, object state)
{
return ((Connection)state).OnAlloc(handle, suggestedSize);
}

private static void ReadCallback(UvStreamHandle handle, int nread, Exception error, object state)
{
((Connection)state).OnRead(handle, nread, error);
}

private readonly UvStreamHandle _socket;
private UvReadHandle _read;
private Frame _frame;
long _connectionId;
private bool _isInKeepAlive;

public Connection(ListenerContext context, UvStreamHandle socket) : base(context)
public Connection(Listener listener, UvTcpStreamHandle socket) : base(listener)
{
_readCallback = OnRead;
_allocCallback = OnAlloc;
_socket = socket;
ConnectionControl = this;
}
_listener = listener;

public void Start()
{
KestrelTrace.Log.ConnectionStart(_connectionId);

SocketInput = new SocketInput(Memory);
SocketOutput = new SocketOutput(Thread, _socket);
_frame = new Frame(this);
_socket.ReadStart(_allocCallback, _readCallback, this);
_read = new UvReadHandle(_socket, _allocCallback, _readCallback);
listener.AddConnection(this);
}

private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
private UvBuffer OnAlloc(int suggestedSize)
{
return handle.Libuv.buf_init(
SocketInput.Pin(2048),
2048);
const int bufferSize = 2048;
return new UvBuffer(SocketInput.Pin(bufferSize), bufferSize);
}

private void OnRead(UvStreamHandle handle, int status, Exception error)
private void OnRead(int status, Exception error)
{
SocketInput.Unpin(status);

var normalRead = error == null && status > 0;
var normalDone = status == 0 || status == -4077 || status == -4095;
var errorDone = !(normalDone || normalRead);

if (normalRead)
{
KestrelTrace.Log.ConnectionRead(_connectionId, status);
}
else if (normalDone || errorDone)
if (!normalRead)
{
KestrelTrace.Log.ConnectionReadFin(_connectionId);
SocketInput.RemoteIntakeFin = true;
_socket.ReadStop();
if (status != -4095)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you define a descriptive constant for this? The same goes for the ones above.

{
_read.Dispose();
_read = null;
_listener.RemoveConnection(this);
_socket.Dispose();

// Not sure if this is right
// It should be, but there are some interesting code paths
// while reading the message body regarding status == 0 && RemoteIntakeFin
return;
}

if (errorDone && error != null)
if (!normalDone && error != null)
{
Trace.WriteLine("Connection.OnRead " + error.ToString());
}
}

KestrelTrace.Log.ConnectionRead(_connectionId, status);

try
{
_isInKeepAlive = false;
_frame.Consume();
}
catch (Exception ex)
Expand All @@ -115,56 +117,66 @@ private void OnRead(UvStreamHandle handle, int status, Exception error)
}
}

bool IConnectionControl.IsInKeepAlive => _isInKeepAlive;

void IConnectionControl.Pause()
{
KestrelTrace.Log.ConnectionPause(_connectionId);
_socket.ReadStop();
Debug.Assert(_read != null);
_read.Dispose();
_read = null;
}

void IConnectionControl.Resume()
{
KestrelTrace.Log.ConnectionResume(_connectionId);
_socket.ReadStart(_allocCallback, _readCallback, this);
Debug.Assert(_read == null);
_read = new UvReadHandle(_socket, _allocCallback, _readCallback);
}

void IConnectionControl.End(ProduceEndType endType)
async Task IConnectionControl.EndAsync(ProduceEndType endType)
{
switch (endType)
{
case ProduceEndType.SocketShutdownSend:
KestrelTrace.Log.ConnectionWriteFin(_connectionId, 0);
Thread.Post(
x =>
{
KestrelTrace.Log.ConnectionWriteFin(_connectionId, 1);
var self = (Connection)x;
var shutdown = new UvShutdownReq();
shutdown.Init(self.Thread.Loop);
shutdown.Shutdown(self._socket, (req, status, state) =>
await Thread.PostAsync(() =>
{
if (_read == null)
return;

KestrelTrace.Log.ConnectionWriteFin(_connectionId, 1);
new UvShutdownReq(
Thread.Loop,
_socket,
(req, status) =>
{
KestrelTrace.Log.ConnectionWriteFin(_connectionId, 1);
req.Dispose();
}, null);
},
this);
// This connection is now done
});
});
break;
case ProduceEndType.ConnectionKeepAlive:
KestrelTrace.Log.ConnectionKeepAlive(_connectionId);
_frame = new Frame(this);
Thread.Post(
x => ((Frame)x).Consume(),
_frame);
_isInKeepAlive = true;
await Thread.PostAsync(_frame.Consume);
break;
case ProduceEndType.SocketDisconnect:
KestrelTrace.Log.ConnectionDisconnect(_connectionId);
Thread.Post(
x =>
{
KestrelTrace.Log.ConnectionStop(_connectionId);
((UvHandle)x).Dispose();
},
_socket);
await Thread.PostAsync(() =>
{
_listener.RemoveConnection(this);
if (_read == null)
return;

_read.Dispose();
_socket.Dispose();
KestrelTrace.Log.ConnectionStop(_connectionId);
});
break;
default:
throw new ArgumentException(nameof(endType));
}
}
}
Expand Down
72 changes: 35 additions & 37 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public FrameContext(ConnectionContext context) : base(context)

public interface IFrameControl
{
void ProduceContinue();
void Write(ArraySegment<byte> data, Action<Exception, object> callback, object state);
Task ProduceContinueAsync();
Task WriteAsync(ArraySegment<byte> data);
}

public class Frame : FrameContext, IFrameControl
Expand Down Expand Up @@ -230,53 +230,50 @@ private async Task ExecuteAsync()
}
finally
{
ProduceEnd(error);
await ProduceEndAsync(error);
}
}


public void Write(ArraySegment<byte> data, Action<Exception, object> callback, object state)
public async Task WriteAsync(ArraySegment<byte> data)
{
ProduceStart();
SocketOutput.Write(data, callback, state);
await ProduceStartAsync();
await SocketOutput.WriteAsync(data);
}

public void Upgrade(IDictionary<string, object> options, Func<object, Task> callback)
public Task Upgrade(IDictionary<string, object> options, Func<object, Task> callback)
{
_keepAlive = false;
ProduceStart();
return ProduceStartAsync();

// NOTE: needs changes
//_upgradeTask = callback(_callContext);
}

byte[] _continueBytes = Encoding.ASCII.GetBytes("HTTP/1.1 100 Continue\r\n\r\n");

public void ProduceContinue()
public Task ProduceContinueAsync()
{
if (_resultStarted) return;
if (_resultStarted)
return Task.FromResult(0);

string[] expect;
if (HttpVersion.Equals("HTTP/1.1") &&
RequestHeaders.TryGetValue("Expect", out expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
SocketOutput.Write(
new ArraySegment<byte>(_continueBytes, 0, _continueBytes.Length),
(error, _) =>
{
if (error != null)
{
Trace.WriteLine("ProduceContinue " + error.ToString());
}
},
null);
return SocketOutput.WriteAsync(
new ArraySegment<byte>(_continueBytes, 0, _continueBytes.Length));
}

return Task.FromResult(0);
}

public void ProduceStart()
public async Task ProduceStartAsync()
{
if (_resultStarted) return;
if (_resultStarted)
return;

_resultStarted = true;

FireOnSendingHeaders();
Expand All @@ -286,30 +283,31 @@ public void ProduceStart()
var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase);

var responseHeader = CreateResponseHeader(status, ResponseHeaders);
SocketOutput.Write(
responseHeader.Item1,
(error, x) =>
{
if (error != null)
{
Trace.WriteLine("ProduceStart " + error.ToString());
}
((IDisposable)x).Dispose();
},
responseHeader.Item2);
try
{
await SocketOutput.WriteAsync(responseHeader.Item1);
}
finally
{
responseHeader.Item2.Dispose();
}
}

public void ProduceEnd(Exception ex)
public Task ProduceEndAsync(Exception ex)
{
ProduceStart();
var tasks = new List<Task>(3);
tasks.Add(ProduceStartAsync());

if (!_keepAlive)
{
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
tasks.Add(ConnectionControl.EndAsync(ProduceEndType.SocketShutdownSend));
}

//NOTE: must finish reading request body
ConnectionControl.End(_keepAlive ? ProduceEndType.ConnectionKeepAlive : ProduceEndType.SocketDisconnect);
tasks.Add(ConnectionControl.EndAsync(
_keepAlive ? ProduceEndType.ConnectionKeepAlive : ProduceEndType.SocketDisconnect));

return Task.WhenAll(tasks);
}

private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
Expand Down
36 changes: 2 additions & 34 deletions src/Microsoft.AspNet.Server.Kestrel/Http/FrameResponseStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,7 @@ public override void Flush()

public override Task FlushAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_context.FrameControl.Write(
new ArraySegment<byte>(new byte[0]),
(error, arg) =>
{
var tcsArg = (TaskCompletionSource<int>)arg;
if (error != null)
{
tcsArg.SetException(error);
}
else
{
tcsArg.SetResult(0);
}
},
tcs);
return tcs.Task;
return _context.FrameControl.WriteAsync(new ArraySegment<byte>(new byte[0]));
}

public override long Seek(long offset, SeekOrigin origin)
Expand All @@ -65,23 +49,7 @@ public override void Write(byte[] buffer, int offset, int count)

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<int>();
_context.FrameControl.Write(
new ArraySegment<byte>(buffer, offset, count),
(error, arg) =>
{
var tcsArg = (TaskCompletionSource<int>)arg;
if (error != null)
{
tcsArg.SetException(error);
}
else
{
tcsArg.SetResult(0);
}
},
tcs);
return tcs.Task;
return _context.FrameControl.WriteAsync(new ArraySegment<byte>(buffer, offset, count));
}

public override bool CanRead
Expand Down
Loading