Skip to content

Add separate activities for Hub methods #55439

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/SignalR/server/Core/src/HubConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public partial class HubConnectionContext
// Tracks groups that the connection has been added to
internal HashSet<string> GroupNames { get; } = new HashSet<string>();

internal Activity? OriginalActivity { get; }

/// <summary>
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
/// </summary>
Expand All @@ -73,6 +75,7 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo
_streamBufferCapacity = contextOptions.StreamBufferCapacity;
_maxMessageSize = contextOptions.MaximumReceiveMessageSize;
_statefulReconnectBufferSize = contextOptions.StatefulReconnectBufferSize;
OriginalActivity = contextOptions.OriginalActivity;

_connectionContext = connectionContext;
_logger = loggerFactory.CreateLogger(typeof(HubConnectionContext));
Expand Down
4 changes: 4 additions & 0 deletions src/SignalR/server/Core/src/HubConnectionContextOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;

namespace Microsoft.AspNetCore.SignalR;

/// <summary>
Expand Down Expand Up @@ -39,4 +41,6 @@ public class HubConnectionContextOptions
/// Gets or sets the maximum bytes to buffer per connection when using stateful reconnect.
/// </summary>
internal long StatefulReconnectBufferSize { get; set; } = 100_000;

internal Activity? OriginalActivity { get; set; }
}
6 changes: 6 additions & 0 deletions src/SignalR/server/Core/src/HubConnectionHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Linq;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.SignalR.Internal;
Expand Down Expand Up @@ -125,8 +126,13 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
TimeProvider = TimeProvider,
MaximumParallelInvocations = _maxParallelInvokes,
StatefulReconnectBufferSize = _statefulReconnectBufferSize,
OriginalActivity = Activity.Current,
};

// Get off the parent span.
// This is likely the Http Request span and we want Hub method invocations to not be collected under a long running span.
Activity.Current = null;

Log.ConnectedStarting(_logger);

var connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory);
Expand Down
74 changes: 72 additions & 2 deletions src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace Microsoft.AspNetCore.SignalR.Internal;

internal sealed partial class DefaultHubDispatcher<THub> : HubDispatcher<THub> where THub : Hub
{
private static readonly string _fullHubName = typeof(THub).FullName ?? typeof(THub).Name;

private readonly Dictionary<string, HubMethodDescriptor> _methods = new(StringComparer.OrdinalIgnoreCase);
private readonly Utf8HashLookup _cachedMethodNames = new();
private readonly IServiceScopeFactory _serviceScopeFactory;
Expand Down Expand Up @@ -76,11 +78,14 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)

var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>();
var hub = hubActivator.Create();
Activity? activity = null;
try
{
// OnConnectedAsync won't work with client results (ISingleClientProxy.InvokeAsync)
InitializeHub(hub, connection, invokeAllowed: false);

activity = StartActivity(connection, scope.ServiceProvider, nameof(hub.OnConnectedAsync));

if (_onConnectedMiddleware != null)
{
var context = new HubLifetimeContext(connection.HubCallerContext, scope.ServiceProvider, hub);
Expand All @@ -91,8 +96,14 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)
await hub.OnConnectedAsync();
}
}
catch (Exception ex)
{
SetActivityError(activity, ex);
throw;
}
finally
{
activity?.Stop();
hubActivator.Release(hub);
}
}
Expand All @@ -103,10 +114,13 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection,

var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>();
var hub = hubActivator.Create();
Activity? activity = null;
try
{
InitializeHub(hub, connection);

activity = StartActivity(connection, scope.ServiceProvider, nameof(hub.OnDisconnectedAsync));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure this doesn't allocate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's allocating?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nothing should be.


if (_onDisconnectedMiddleware != null)
{
var context = new HubLifetimeContext(connection.HubCallerContext, scope.ServiceProvider, hub);
Expand All @@ -117,8 +131,14 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection,
await hub.OnDisconnectedAsync(exception);
}
}
catch (Exception ex)
{
SetActivityError(activity, ex);
throw;
}
finally
{
activity?.Stop();
hubActivator.Release(hub);
}
}
Expand Down Expand Up @@ -367,6 +387,10 @@ static async Task ExecuteInvocation(DefaultHubDispatcher<THub> dispatcher,
var logger = dispatcher._logger;
var enableDetailedErrors = dispatcher._enableDetailedErrors;

// Use hubMethodInvocationMessage.Target instead of methodExecutor.MethodInfo.Name
// We want to take HubMethodNameAttribute into account which will be the same as what the invocation target is
var activity = StartActivity(connection, scope.ServiceProvider, hubMethodInvocationMessage.Target);

object? result;
try
{
Expand All @@ -375,13 +399,17 @@ static async Task ExecuteInvocation(DefaultHubDispatcher<THub> dispatcher,
}
catch (Exception ex)
{
SetActivityError(activity, ex);

Log.FailedInvokingHubMethod(logger, hubMethodInvocationMessage.Target, ex);
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, enableDetailedErrors));
return;
}
finally
{
activity?.Stop();

// Stream response handles cleanup in StreamResultsAsync
// And normal invocations handle cleanup below in the finally
if (isStreamCall)
Expand Down Expand Up @@ -467,6 +495,8 @@ private async Task StreamAsync(string invocationId, HubConnectionContext connect

streamCts ??= CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted);

var activity = StartActivity(connection, scope.ServiceProvider, hubMethodInvocationMessage.Target);

try
{
if (!connection.ActiveRequestCancellationSources.TryAdd(invocationId, streamCts))
Expand All @@ -483,6 +513,8 @@ private async Task StreamAsync(string invocationId, HubConnectionContext connect
}
catch (Exception ex)
{
SetActivityError(activity, ex);

Log.FailedInvokingHubMethod(_logger, hubMethodInvocationMessage.Target, ex);
error = ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors);
return;
Expand All @@ -509,20 +541,27 @@ private async Task StreamAsync(string invocationId, HubConnectionContext connect
catch (ChannelClosedException ex)
{
// If the channel closes from an exception in the streaming method, grab the innerException for the error from the streaming method
Log.FailedStreaming(_logger, invocationId, descriptor.MethodExecutor.MethodInfo.Name, ex.InnerException ?? ex);
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex.InnerException ?? ex, _enableDetailedErrors);
var exception = ex.InnerException ?? ex;
SetActivityError(activity, exception);

Log.FailedStreaming(_logger, invocationId, descriptor.MethodExecutor.MethodInfo.Name, exception);
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", exception, _enableDetailedErrors);
}
catch (Exception ex)
{
// If the streaming method was canceled we don't want to send a HubException message - this is not an error case
if (!(ex is OperationCanceledException && streamCts.IsCancellationRequested))
{
SetActivityError(activity, ex);

Log.FailedStreaming(_logger, invocationId, descriptor.MethodExecutor.MethodInfo.Name, ex);
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex, _enableDetailedErrors);
}
}
finally
{
activity?.Stop();

await CleanupInvocation(connection, hubMethodInvocationMessage, hubActivator, hub, scope);

streamCts.Dispose();
Expand Down Expand Up @@ -749,4 +788,35 @@ public override IReadOnlyList<Type> GetParameterTypes(string methodName)

return null;
}

// Starts an Activity for a Hub method invocation and sets up all the tags and other state.
// Make sure to call Activity.Stop() once the Hub method completes, and consider calling SetActivityError on exception.
private static Activity? StartActivity(HubConnectionContext connectionContext, IServiceProvider serviceProvider, string methodName)
{
if (serviceProvider.GetService<SignalRActivitySource>() is SignalRActivitySource signalRActivitySource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this once? Does this need to be per invocation? (We can do this change as a follow up).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a service provider until this point. We could do the whole _activitySource ??= serviceProvider.GetService<SignalRActivitySource>() pattern to only do it once, but that relies on SignalRActivitySource always being a singleton (at least with MEDI, idk how other containers work). This is true today, but if we ever make it public that would be a concern.

&& signalRActivitySource.ActivitySource.HasListeners())
{
var requestContext = connectionContext.OriginalActivity?.Context;

return signalRActivitySource.ActivitySource.StartActivity($"{_fullHubName}/{methodName}", ActivityKind.Server, parentId: null,
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md#server-attributes
tags: [
new("rpc.method", methodName),
new("rpc.system", "signalr"),
new("rpc.service", _fullHubName),
// See https://github.com/dotnet/aspnetcore/blob/027c60168383421750f01e427e4f749d0684bc02/src/Servers/Kestrel/Core/src/Internal/Infrastructure/KestrelMetrics.cs#L308
// And https://github.com/dotnet/aspnetcore/issues/43786
//new("server.address", ...),
],
links: requestContext.HasValue ? [new ActivityLink(requestContext.Value)] : null);
}

return null;
}

private static void SetActivityError(Activity? activity, Exception ex)
{
activity?.SetTag("error.type", ex.GetType().FullName);
activity?.SetStatus(ActivityStatusCode.Error);
}
}
14 changes: 14 additions & 0 deletions src/SignalR/server/Core/src/Internal/SignalRActivitySource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;

namespace Microsoft.AspNetCore.SignalR.Internal;

// Internal for now so we don't need API review.
// Just a wrapper for the ActivitySource
// don't want to put ActivitySource directly in DI as hosting already does that and it could get overwritten.
internal sealed class SignalRActivitySource
{
public ActivitySource ActivitySource { get; } = new ActivitySource("Microsoft.AspNetCore.SignalR.Server");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static ISignalRServerBuilder AddSignalRCore(this IServiceCollection servi
services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>));
services.AddAuthorization();

services.TryAddSingleton(new SignalRActivitySource());

var builder = new SignalRServerBuilder(services);
builder.AddJsonProtocol();
return builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Text;
Expand Down Expand Up @@ -369,6 +370,12 @@ public async IAsyncEnumerable<int> StreamWithClientResult()
var sum = await Clients.Caller.InvokeAsync<int>("Sum", 1, cancellationToken: default);
yield return sum;
}

public void ActivityMethod(TestActivitySource testActivitySource)
{
var activity = testActivitySource.ActivitySource.StartActivity("inner", ActivityKind.Server);
activity.Stop();
}
}

internal class SelfRef
Expand All @@ -381,6 +388,11 @@ public SelfRef()
public SelfRef Self { get; set; }
}

public class TestActivitySource
{
public ActivitySource ActivitySource { get; set; }
}

public abstract class TestHub : Hub
{
public override Task OnConnectedAsync()
Expand Down Expand Up @@ -709,6 +721,13 @@ public async Task<ChannelReader<string>> CounterChannelAsync(int count)
return CounterChannel(count);
}

[HubMethodName("RenamedCounterChannel")]
public async Task<ChannelReader<string>> CounterChannelAsync2(int count)
{
await Task.Yield();
return CounterChannel(count);
}

public async ValueTask<ChannelReader<string>> CounterChannelValueTaskAsync(int count)
{
await Task.Yield();
Expand Down
Loading
Loading