Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 78 additions & 10 deletions src/Aspire.Hosting/Eventing/DistributedApplicationEventing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Threading.Channels;
using Aspire.Hosting.ApplicationModel;

namespace Aspire.Hosting.Eventing;
Expand All @@ -11,6 +12,33 @@ public class DistributedApplicationEventing : IDistributedApplicationEventing
{
private readonly ConcurrentDictionary<Type, List<DistributedApplicationEventSubscription>> _eventSubscriptionListLookup = new();
private readonly ConcurrentDictionary<DistributedApplicationEventSubscription, Type> _subscriptionEventTypeLookup = new();
private readonly Channel<(Exception Exception, Type EventType, IResource? Resource)> _exceptionChannel = Channel.CreateUnbounded<(Exception, Type, IResource?)>();

/// <summary>
/// Initializes a new instance of the <see cref="DistributedApplicationEventing"/> class.
/// </summary>
public DistributedApplicationEventing()
{
// Start a background task to process exceptions from the channel
_ = Task.Run(ProcessExceptionChannelAsync);
}

private async Task ProcessExceptionChannelAsync()
{
await foreach (var (exception, eventType, resource) in _exceptionChannel.Reader.ReadAllAsync().ConfigureAwait(false))
{
try
{
var exceptionEvent = new EventPublishExceptionEvent(exception, eventType, resource);
// Use NonBlockingSequential to avoid potential deadlocks when publishing from within an event handler
await PublishAsync(exceptionEvent, EventDispatchBehavior.NonBlockingSequential).ConfigureAwait(false);
}
catch
{
// If we can't publish the exception event, there's nothing we can do
}
}
}

/// <inheritdoc cref="IDistributedApplicationEventing.PublishAsync{T}(T, CancellationToken)" />
[System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters", Justification = "Cancellation token")]
Expand All @@ -25,26 +53,38 @@ public async Task PublishAsync<T>(T @event, EventDispatchBehavior dispatchBehavi
{
if (_eventSubscriptionListLookup.TryGetValue(typeof(T), out var subscriptions))
{
// Determine the resource associated with the event if it's a resource-specific event
var resource = @event is IDistributedApplicationResourceEvent resourceEvent ? resourceEvent.Resource : null;

if (dispatchBehavior == EventDispatchBehavior.BlockingConcurrent || dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent)
{
var pendingSubscriptionCallbacks = new List<Task>(subscriptions.Count);
foreach (var subscription in subscriptions.ToArray())
{
var pendingSubscriptionCallback = subscription.Callback(@event, cancellationToken);
pendingSubscriptionCallbacks.Add(pendingSubscriptionCallback);
// Wrap each callback to catch exceptions individually
var wrappedCallback = Task.Run(async () =>
{
try
{
await subscription.Callback(@event, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
PublishExceptionEventAsync(ex, typeof(T), resource);
throw;
}
}, cancellationToken);
pendingSubscriptionCallbacks.Add(wrappedCallback);
}

if (dispatchBehavior == EventDispatchBehavior.NonBlockingConcurrent)
{
// Non-blocking concurrent.
_ = Task.Run(async () =>
{
await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false);
}, default);
// Non-blocking concurrent - fire and forget
_ = Task.WhenAll(pendingSubscriptionCallbacks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be ideal. You don't necessarily get to choose who your fire and forget partners are. If there is an event that is going to hang around waiting for interaction (for example) it would stop you being notified that your event threw an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot this code is not ideal because it means that any reporting of event handlers that throw exceptions is delayed until all event handlers have completed.

Instead of doing this start a Task to publish exceptions which reads from a shared Channel. Inside the exception handler that you added write the exception into the channel.

The Task.WhenAll(...) can stay, but should also include the task for the exception publishing task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed to use an unbounded Channel with a background task to process exceptions asynchronously in commit 0c4d1a2. This ensures exceptions are reported immediately without waiting for all event handlers to complete.

}
else
{
// Blocking concurrent.
// Blocking concurrent - wait for all to complete
await Task.WhenAll(pendingSubscriptionCallbacks).ConfigureAwait(false);
}
}
Expand All @@ -57,7 +97,15 @@ public async Task PublishAsync<T>(T @event, EventDispatchBehavior dispatchBehavi
{
foreach (var subscription in subscriptions.ToArray())
{
await subscription.Callback(@event, cancellationToken).ConfigureAwait(false);
try
{
await subscription.Callback(@event, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
PublishExceptionEventAsync(ex, typeof(T), resource);
throw;
}
}
}, default);
}
Expand All @@ -66,13 +114,33 @@ public async Task PublishAsync<T>(T @event, EventDispatchBehavior dispatchBehavi
// Blocking sequential.
foreach (var subscription in subscriptions.ToArray())
{
await subscription.Callback(@event, cancellationToken).ConfigureAwait(false);
try
{
await subscription.Callback(@event, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
PublishExceptionEventAsync(ex, typeof(T), resource);
throw;
}
}
}
}
}
}

private void PublishExceptionEventAsync(Exception exception, Type eventType, IResource? resource)
{
// Avoid infinite loop if EventPublishExceptionEvent handler throws
if (eventType == typeof(EventPublishExceptionEvent))
{
return;
}

// Write to the channel for async processing
_exceptionChannel.Writer.TryWrite((exception, eventType, resource));
}

/// <inheritdoc cref="IDistributedApplicationEventing.Subscribe{T}(Func{T, CancellationToken, Task})" />
public DistributedApplicationEventSubscription Subscribe<T>(Func<T, CancellationToken, Task> callback) where T : IDistributedApplicationEvent
{
Expand Down
40 changes: 40 additions & 0 deletions src/Aspire.Hosting/Eventing/EventPublishExceptionEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.ApplicationModel;

namespace Aspire.Hosting.Eventing;

/// <summary>
/// This event is raised when an exception occurs during event publishing.
/// </summary>
public sealed class EventPublishExceptionEvent : IDistributedApplicationEvent
{
/// <summary>
/// Initializes a new instance of the <see cref="EventPublishExceptionEvent"/> class.
/// </summary>
/// <param name="exception">The exception that was thrown.</param>
/// <param name="eventType">The type of the event that was being published.</param>
/// <param name="resource">The resource associated with the event, if it's a resource-specific event.</param>
public EventPublishExceptionEvent(Exception exception, Type eventType, IResource? resource)
{
Exception = exception;
EventType = eventType;
Resource = resource;
}

/// <summary>
/// The exception that was thrown.
/// </summary>
public Exception Exception { get; }

/// <summary>
/// The type of the event that was being published.
/// </summary>
public Type EventType { get; }

/// <summary>
/// The resource associated with the event, if it's a resource-specific event.
/// </summary>
public IResource? Resource { get; }
}
26 changes: 26 additions & 0 deletions src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
using Aspire.Hosting.Dcp;
using Aspire.Hosting.Eventing;
using Aspire.Hosting.Lifecycle;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Aspire.Hosting.Orchestrator;

Expand Down Expand Up @@ -64,6 +66,7 @@ public ApplicationOrchestrator(DistributedApplicationModel model,
_eventing.Subscribe<ConnectionStringAvailableEvent>(PublishConnectionStringValue);
// Implement WaitFor functionality using BeforeResourceStartedEvent.
_eventing.Subscribe<BeforeResourceStartedEvent>(WaitForInBeforeResourceStartedEvent);
_eventing.Subscribe<EventPublishExceptionEvent>(OnPublishEventException);
}

private async Task PublishConnectionStringValue(ConnectionStringAvailableEvent @event, CancellationToken token)
Expand Down Expand Up @@ -308,6 +311,29 @@ private async Task OnResourceEndpointsAllocated(ResourceEndpointsAllocatedEvent
await PublishResourceEndpointUrls(@event.Resource, cancellationToken).ConfigureAwait(false);
}

private Task OnPublishEventException(EventPublishExceptionEvent @event, CancellationToken cancellationToken)
{
// Log the exception to both the resource-specific logger (if available) and a general logger
if (@event.Resource is not null)
{
var resourceLogger = _loggerService.GetLogger(@event.Resource);
resourceLogger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name);
}

// Also log to a general logger using IServiceProvider
var logger = _serviceProvider.GetRequiredService<ILogger<ApplicationOrchestrator>>();
if (@event.Resource is not null)
{
logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType} for resource {ResourceName}.", @event.EventType.Name, @event.Resource.Name);
}
else
{
logger.LogError(@event.Exception, "An exception occurred while publishing event {EventType}.", @event.EventType.Name);
}

return Task.CompletedTask;
}

private async Task OnResourceChanged(OnResourceChangedContext context)
{
// Get the previous state before updating to detect transitions to stopped states
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,168 @@ public async Task ResourceStoppedEventFiresWhenResourceStops()
await resourceStoppedTcs.Task.DefaultTimeout();
}

[Fact]
public async Task ExceptionInEventHandlerIsPublishedAsEventPublishExceptionEvent()
{
using var builder = TestDistributedApplicationBuilder.Create();
EventPublishExceptionEvent? capturedExceptionEvent = null;
var exceptionMessage = "Test exception in event handler";
var tcs = new TaskCompletionSource();

builder.Eventing.Subscribe<DummyEvent>((@event, ct) =>
{
throw new InvalidOperationException(exceptionMessage);
});

builder.Eventing.Subscribe<EventPublishExceptionEvent>((@event, ct) =>
{
capturedExceptionEvent = @event;
tcs.TrySetResult();
return Task.CompletedTask;
});

// The exception should be rethrown after being published
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
{
await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingSequential);
});

// Wait for the async exception handler to complete
await tcs.Task.DefaultTimeout();

// The exception should have been caught and published
Assert.NotNull(capturedExceptionEvent);
Assert.IsType<InvalidOperationException>(capturedExceptionEvent.Exception);
Assert.Equal(exceptionMessage, capturedExceptionEvent.Exception.Message);
Assert.Equal(typeof(DummyEvent), capturedExceptionEvent.EventType);
Assert.Null(capturedExceptionEvent.Resource);
Assert.Equal(exceptionMessage, exception.Message);
}

[Fact]
public async Task ExceptionInResourceEventHandlerIncludesResource()
{
using var builder = TestDistributedApplicationBuilder.Create();
var testResource = builder.AddResource(new TestResource("test-resource"));
EventPublishExceptionEvent? capturedExceptionEvent = null;
var tcs = new TaskCompletionSource();

builder.Eventing.Subscribe(testResource.Resource, (ResourceReadyEvent @event, CancellationToken ct) =>
{
throw new InvalidOperationException("Test exception");
});

builder.Eventing.Subscribe<EventPublishExceptionEvent>((@event, ct) =>
{
capturedExceptionEvent = @event;
tcs.TrySetResult();
return Task.CompletedTask;
});

using var app = builder.Build();

// The exception should be rethrown after being published
await Assert.ThrowsAsync<InvalidOperationException>(async () =>
{
await builder.Eventing.PublishAsync(new ResourceReadyEvent(testResource.Resource, app.Services), EventDispatchBehavior.BlockingSequential);
});

// Wait for the async exception handler to complete
await tcs.Task.DefaultTimeout();

Assert.NotNull(capturedExceptionEvent);
Assert.Equal(testResource.Resource, capturedExceptionEvent.Resource);
Assert.Equal(typeof(ResourceReadyEvent), capturedExceptionEvent.EventType);
}

[Fact]
public async Task ExceptionInNonBlockingSequentialHandlerIsPublished()
{
using var builder = TestDistributedApplicationBuilder.Create();
EventPublishExceptionEvent? capturedExceptionEvent = null;
var tcs = new TaskCompletionSource();

builder.Eventing.Subscribe<DummyEvent>((@event, ct) =>
{
throw new InvalidOperationException("Test exception");
});

builder.Eventing.Subscribe<EventPublishExceptionEvent>((@event, ct) =>
{
capturedExceptionEvent = @event;
tcs.TrySetResult();
return Task.CompletedTask;
});

await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingSequential);

// Wait for the async handler to complete
await tcs.Task.DefaultTimeout();

Assert.NotNull(capturedExceptionEvent);
Assert.IsType<InvalidOperationException>(capturedExceptionEvent.Exception);
}

[Fact]
public async Task ExceptionInBlockingConcurrentHandlerIsPublished()
{
using var builder = TestDistributedApplicationBuilder.Create();
EventPublishExceptionEvent? capturedExceptionEvent = null;
var tcs = new TaskCompletionSource();

builder.Eventing.Subscribe<DummyEvent>((@event, ct) =>
{
throw new InvalidOperationException("Test exception");
});

builder.Eventing.Subscribe<EventPublishExceptionEvent>((@event, ct) =>
{
capturedExceptionEvent = @event;
tcs.TrySetResult();
return Task.CompletedTask;
});

// The exception should be rethrown after being published
await Assert.ThrowsAsync<InvalidOperationException>(async () =>
{
await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingConcurrent);
});

// Wait for the async exception handler to complete
await tcs.Task.DefaultTimeout();

Assert.NotNull(capturedExceptionEvent);
Assert.IsType<InvalidOperationException>(capturedExceptionEvent.Exception);
}

[Fact]
public async Task ExceptionInNonBlockingConcurrentHandlerIsPublished()
{
using var builder = TestDistributedApplicationBuilder.Create();
EventPublishExceptionEvent? capturedExceptionEvent = null;
var tcs = new TaskCompletionSource();

builder.Eventing.Subscribe<DummyEvent>((@event, ct) =>
{
throw new InvalidOperationException("Test exception");
});

builder.Eventing.Subscribe<EventPublishExceptionEvent>((@event, ct) =>
{
capturedExceptionEvent = @event;
tcs.TrySetResult();
return Task.CompletedTask;
});

await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingConcurrent);

// Wait for the async handler to complete
await tcs.Task.DefaultTimeout();

Assert.NotNull(capturedExceptionEvent);
Assert.IsType<InvalidOperationException>(capturedExceptionEvent.Exception);
}

public class DummyEvent : IDistributedApplicationEvent
{
}
Expand Down
Loading