Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,8 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co
// be scheduled instead. It's not functionally incorrect to schedule the release of a synchronous operation, just it may
// lead to thread pool starvation issues if the synchronous operations are blocking thread pool threads (typically not
// advised) and more threads are not immediately available to run work items that would release those operations.
public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.Net.Sockets
{
Expand Down Expand Up @@ -56,22 +54,40 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)

private static readonly object s_lock = new object();

// In debug builds, force there to be 2 engines. In release builds, use half the number of processors when
// there are at least 6. The lower bound is to avoid using multiple engines on systems which aren't servers.
#pragma warning disable CA1802 // const works for debug, but needs to be static readonly for release
private static readonly int s_engineCount =
#if DEBUG
2;
#else
Environment.ProcessorCount >= 6 ? Environment.ProcessorCount / 2 : 1;
#endif
#pragma warning restore CA1802
private static readonly int s_maxEngineCount = GetEngineCount();

private static int GetEngineCount()
{
// The responsibility of SocketAsyncEngine is to get notifications from epoll|kqueue
// and schedule corresponding work items to ThreadPool (socket reads and writes).
//
// Using TechEmpower benchmarks that generate a LOT of SMALL socket reads and writes under a VERY HIGH load
// we have observed that a single engine is capable of keeping busy up to thirty x64 and eight ARM64 CPU Cores.
//
// The vast majority of real-life scenarios is never going to generate such a huge load (hundreds of thousands of requests per second)
// and having a single producer should be almost always enough.
//
// We want to be sure that we can handle extreme loads and that's why we have decided to use these values.
//
// It's impossible to predict all possible scenarios so we have added a possibility to configure this value using environment variables.
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count))
{
return (int)count;
}

Architecture architecture = RuntimeInformation.ProcessArchitecture;
int coresPerEngine = architecture == Architecture.Arm64 || architecture == Architecture.Arm
? 8
: 30;

return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and "round" it up, in a way that 29 cores gets 2 epoll threads

So now anyting below 44 cores on x64 will get 1 thread? Then 2 after 76 cores ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and this should be enough for vast majorty of real-life scenarios.

}

//
// The current engines. We replace an engine when it runs out of "handle" values.
// Must be accessed under s_lock.
//
private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_engineCount];
private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_maxEngineCount];
private static int s_allocateFromEngine = 0;

private readonly IntPtr _port;
Expand Down Expand Up @@ -106,7 +122,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
//
private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue;
#endif
private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)32;
private static readonly IntPtr MinHandlesForAdditionalEngine = s_maxEngineCount == 1 ? MaxHandles : (IntPtr)32;

//
// Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop
Expand All @@ -129,7 +145,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
//
// Maps handle values to SocketAsyncContext instances.
//
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContext> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContext>();
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>();

//
// Queue of events generated by EventLoop() that would be processed by the thread pool
Expand Down Expand Up @@ -197,7 +213,7 @@ private static void AllocateToken(SocketAsyncContext context, out SocketAsyncEng
// Round-robin to the next engine once we have sufficient sockets on this one.
if (!engine.HasLowNumberOfSockets)
{
s_allocateFromEngine = (s_allocateFromEngine + 1) % s_engineCount;
s_allocateFromEngine = (s_allocateFromEngine + 1) % s_maxEngineCount;
}
}
}
Expand All @@ -208,7 +224,8 @@ private IntPtr AllocateHandle(SocketAsyncContext context)
Debug.Assert(!IsFull, "Expected !IsFull");

IntPtr handle = _nextHandle;
_handleToContextMap.TryAdd(handle, context);
Debug.Assert(handle != ShutdownHandle, "ShutdownHandle must not be added to the dictionary");
_handleToContextMap.TryAdd(handle, new SocketAsyncContextWrapper(context));

_nextHandle = IntPtr.Add(_nextHandle, 1);
_outstandingHandles = IntPtr.Add(_outstandingHandles, 1);
Expand Down Expand Up @@ -318,8 +335,10 @@ private void EventLoop()
{
bool shutdown = false;
Interop.Sys.SocketEvent* buffer = _buffer;
ConcurrentDictionary<IntPtr, SocketAsyncContext> handleToContextMap = _handleToContextMap;
ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> handleToContextMap = _handleToContextMap;
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
IntPtr shutdownHandle = ShutdownHandle;
SocketAsyncContext? context = null;
while (!shutdown)
{
int numEvents = EventBufferCount;
Expand All @@ -333,38 +352,36 @@ private void EventLoop()
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");

bool enqueuedEvent = false;
for (int i = 0; i < numEvents; i++)
foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(buffer, numEvents))
{
IntPtr handle = buffer[i].Data;
if (handle == ShutdownHandle)
{
shutdown = true;
}
else
IntPtr handle = socketEvent.Data;

if (handleToContextMap.TryGetValue(handle, out SocketAsyncContextWrapper contextWrapper) && (context = contextWrapper.Context) != null)
{
Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}");
handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context);
if (context != null)

Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);
if (events != Interop.Sys.SocketEvents.None)
{
Interop.Sys.SocketEvents events = buffer[i].Events;
events = context.HandleSyncEventsSpeculatively(events);
if (events != Interop.Sys.SocketEvents.None)
{
var ev = new SocketIOEvent(context, events);
eventQueue.Enqueue(ev);
enqueuedEvent = true;

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
ev = default;
}
var ev = new SocketIOEvent(context, events);
eventQueue.Enqueue(ev);
enqueuedEvent = true;

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
context = null;
ev = default;
}

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
context = null;
contextWrapper = default;
}
else if (handle == shutdownHandle)
{
shutdown = true;
}
}

Expand Down Expand Up @@ -488,6 +505,18 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err
return error == Interop.Error.SUCCESS;
}

// struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks
// the goal is to have a dedicated generic instantiation and using:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&)
// instead of:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&)
private readonly struct SocketAsyncContextWrapper
{
public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context;

internal SocketAsyncContext Context { get; }
}

private readonly struct SocketIOEvent
{
public SocketAsyncContext Context { get; }
Expand Down