Skip to content

Commit 0cf96e6

Browse files
authored
.NET to JS Streaming Interop (#34817)
* .NET to JS Streaming Interop * Refactor WebView Impl * Update TransmitDataStreamToJS.cs * Fix Build * Unit Tests * E2E Tests * E2E test fixes * Remove dotNetToJSReceiveDotNetStreamReference Sync Tests * Cleanup usings * Cleanup Tests * IAsyncEnumerable Based Approach * IAsyncEnumerable Based Approach Cleanup
1 parent e932a8a commit 0cf96e6

24 files changed

+662
-21
lines changed

src/Components/Server/src/Circuits/CircuitHost.cs

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Buffers;
45
using System.Globalization;
56
using System.Security.Claims;
67
using System.Text.Json;
8+
using System.Threading;
9+
using System.Threading.Channels;
10+
using System.Threading.Tasks;
711
using Microsoft.AspNetCore.Components.Authorization;
812
using Microsoft.AspNetCore.Components.Web;
913
using Microsoft.AspNetCore.SignalR;
1014
using Microsoft.Extensions.DependencyInjection;
1115
using Microsoft.Extensions.Logging;
16+
using Microsoft.JSInterop;
1217
using Microsoft.JSInterop.Infrastructure;
1318

1419
namespace Microsoft.AspNetCore.Components.Server.Circuits
@@ -440,6 +445,56 @@ internal async Task<bool> ReceiveJSDataChunk(long streamId, long chunkId, byte[]
440445
}
441446
}
442447

448+
public async Task<int> SendDotNetStreamAsync(DotNetStreamReference dotNetStreamReference, long streamId, byte[] buffer)
449+
{
450+
AssertInitialized();
451+
AssertNotDisposed();
452+
453+
try
454+
{
455+
return await Renderer.Dispatcher.InvokeAsync<int>(async () => await dotNetStreamReference.Stream.ReadAsync(buffer));
456+
}
457+
catch (Exception ex)
458+
{
459+
// An error completing stream interop means that the user sent invalid data, a well-behaved
460+
// client won't do this.
461+
Log.SendDotNetStreamException(_logger, streamId, ex);
462+
await TryNotifyClientErrorAsync(Client, GetClientErrorMessage(ex, "Unable to send .NET stream."));
463+
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(ex, isTerminating: false));
464+
return 0;
465+
}
466+
}
467+
468+
public async Task<DotNetStreamReference> TryClaimPendingStream(long streamId)
469+
{
470+
AssertInitialized();
471+
AssertNotDisposed();
472+
473+
DotNetStreamReference dotNetStreamReference = null;
474+
475+
try
476+
{
477+
return await Renderer.Dispatcher.InvokeAsync<DotNetStreamReference>(() =>
478+
{
479+
if (!JSRuntime.TryClaimPendingStreamForSending(streamId, out dotNetStreamReference))
480+
{
481+
throw new InvalidOperationException($"The stream with ID {streamId} is not available. It may have timed out.");
482+
}
483+
484+
return dotNetStreamReference;
485+
});
486+
}
487+
catch (Exception ex)
488+
{
489+
// An error completing stream interop means that the user sent invalid data, a well-behaved
490+
// client won't do this.
491+
Log.SendDotNetStreamException(_logger, streamId, ex);
492+
await TryNotifyClientErrorAsync(Client, GetClientErrorMessage(ex, "Unable to locate .NET stream."));
493+
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(ex, isTerminating: false));
494+
return default;
495+
}
496+
}
497+
443498
// OnLocationChangedAsync is used in a fire-and-forget context, so it's responsible for its own
444499
// error handling.
445500
public async Task OnLocationChangedAsync(string uri, bool intercepted)
@@ -615,6 +670,7 @@ private static class Log
615670
private static readonly Action<ILogger, long, Exception> _receiveByteArraySuccess;
616671
private static readonly Action<ILogger, long, Exception> _receiveByteArrayException;
617672
private static readonly Action<ILogger, long, Exception> _receiveJSDataChunkException;
673+
private static readonly Action<ILogger, long, Exception> _sendDotNetStreamException;
618674
private static readonly Action<ILogger, Exception> _dispatchEventFailedToParseEventData;
619675
private static readonly Action<ILogger, string, Exception> _dispatchEventFailedToDispatchEvent;
620676
private static readonly Action<ILogger, string, CircuitId, Exception> _locationChange;
@@ -660,6 +716,7 @@ private static class EventIds
660716
public static readonly EventId ReceiveByteArraySucceeded = new EventId(213, "ReceiveByteArraySucceeded");
661717
public static readonly EventId ReceiveByteArrayException = new EventId(214, "ReceiveByteArrayException");
662718
public static readonly EventId ReceiveJSDataChunkException = new EventId(215, "ReceiveJSDataChunkException");
719+
public static readonly EventId SendDotNetStreamException = new EventId(216, "SendDotNetStreamException");
663720
}
664721

665722
static Log()
@@ -790,9 +847,14 @@ static Log()
790847
"The ReceiveByteArray call with id '{id}' failed.");
791848

792849
_receiveJSDataChunkException = LoggerMessage.Define<long>(
850+
LogLevel.Debug,
851+
EventIds.ReceiveJSDataChunkException,
852+
"The ReceiveJSDataChunk call with stream id '{streamId}' failed.");
853+
854+
_sendDotNetStreamException = LoggerMessage.Define<long>(
793855
LogLevel.Debug,
794-
EventIds.ReceiveJSDataChunkException,
795-
"The ReceiveJSDataChunk call with stream id '{streamId}' failed.");
856+
EventIds.SendDotNetStreamException,
857+
"The SendDotNetStreamAsync call with id '{id}' failed.");
796858

797859
_dispatchEventFailedToParseEventData = LoggerMessage.Define(
798860
LogLevel.Debug,
@@ -856,9 +918,10 @@ public static void CircuitHandlerFailed(ILogger logger, CircuitHandler handler,
856918
public static void EndInvokeDispatchException(ILogger logger, Exception ex) => _endInvokeDispatchException(logger, ex);
857919
public static void EndInvokeJSFailed(ILogger logger, long asyncHandle, string arguments) => _endInvokeJSFailed(logger, asyncHandle, arguments, null);
858920
public static void EndInvokeJSSucceeded(ILogger logger, long asyncCall) => _endInvokeJSSucceeded(logger, asyncCall, null);
859-
internal static void ReceiveByteArraySuccess(ILogger logger, long id) => _receiveByteArraySuccess(logger, id, null);
860-
internal static void ReceiveByteArrayException(ILogger logger, long id, Exception ex) => _receiveByteArrayException(logger, id, ex);
861-
internal static void ReceiveJSDataChunkException(ILogger logger, long streamId, Exception ex) => _receiveJSDataChunkException(logger, streamId, ex);
921+
public static void ReceiveByteArraySuccess(ILogger logger, long id) => _receiveByteArraySuccess(logger, id, null);
922+
public static void ReceiveByteArrayException(ILogger logger, long id, Exception ex) => _receiveByteArrayException(logger, id, ex);
923+
public static void ReceiveJSDataChunkException(ILogger logger, long streamId, Exception ex) => _receiveJSDataChunkException(logger, streamId, ex);
924+
public static void SendDotNetStreamException(ILogger logger, long streamId, Exception ex) => _sendDotNetStreamException(logger, streamId, ex);
862925
public static void DispatchEventFailedToParseEventData(ILogger logger, Exception ex) => _dispatchEventFailedToParseEventData(logger, ex);
863926
public static void DispatchEventFailedToDispatchEvent(ILogger logger, string eventHandlerId, Exception ex) => _dispatchEventFailedToDispatchEvent(logger, eventHandlerId ?? "", ex);
864927

src/Components/Server/src/Circuits/RemoteJSRuntime.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System;
5+
using System.Collections.Concurrent;
56
using System.Collections.Generic;
67
using System.IO;
78
using System.Text.Json;
@@ -20,6 +21,7 @@ internal class RemoteJSRuntime : JSRuntime
2021
private readonly CircuitOptions _options;
2122
private readonly ILogger<RemoteJSRuntime> _logger;
2223
private CircuitClientProxy _clientProxy;
24+
private readonly ConcurrentDictionary<long, DotNetStreamReference> _pendingDotNetToJSStreams = new();
2325
private bool _permanentlyDisconnected;
2426
private readonly long _maximumIncomingBytes;
2527
private int _byteArraysToBeRevivedTotalBytes;
@@ -151,6 +153,40 @@ protected override void ReceiveByteArray(int id, byte[] data)
151153
base.ReceiveByteArray(id, data);
152154
}
153155

156+
protected override async Task TransmitStreamAsync(long streamId, DotNetStreamReference dotNetStreamReference)
157+
{
158+
if (!_pendingDotNetToJSStreams.TryAdd(streamId, dotNetStreamReference))
159+
{
160+
throw new ArgumentException($"The stream {streamId} is already pending.");
161+
}
162+
163+
// SignalR only supports streaming being initiated from the JS side, so we have to ask it to
164+
// start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up
165+
// and discard it.
166+
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;
167+
cancellationToken.Register(() =>
168+
{
169+
// If by now the stream hasn't been claimed for sending, stop tracking it
170+
if (_pendingDotNetToJSStreams.TryRemove(streamId, out var timedOutStream) && !timedOutStream.LeaveOpen)
171+
{
172+
timedOutStream.Stream.Dispose();
173+
}
174+
});
175+
176+
await _clientProxy.SendAsync("JS.BeginTransmitStream", streamId);
177+
}
178+
179+
public bool TryClaimPendingStreamForSending(long streamId, out DotNetStreamReference pendingStream)
180+
{
181+
if (_pendingDotNetToJSStreams.TryRemove(streamId, out pendingStream))
182+
{
183+
return true;
184+
}
185+
186+
pendingStream = default;
187+
return false;
188+
}
189+
154190
public void MarkPermanentlyDisconnected()
155191
{
156192
_permanentlyDisconnected = true;

src/Components/Server/src/ComponentHub.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Diagnostics;
77
using System.Runtime.CompilerServices;
88
using System.Text.Json;
9+
using System.Threading.Channels;
910
using System.Threading.Tasks;
1011
using Microsoft.AspNetCore.Components.Server.Circuits;
1112
using Microsoft.AspNetCore.DataProtection;
@@ -242,6 +243,41 @@ public async ValueTask<bool> ReceiveJSDataChunk(long streamId, long chunkId, byt
242243
return await circuitHost.ReceiveJSDataChunk(streamId, chunkId, chunk, error);
243244
}
244245

246+
public async IAsyncEnumerable<ArraySegment<byte>> SendDotNetStreamToJS(long streamId)
247+
{
248+
var circuitHost = await GetActiveCircuitAsync();
249+
if (circuitHost == null)
250+
{
251+
yield break;
252+
}
253+
254+
var dotNetStreamReference = await circuitHost.TryClaimPendingStream(streamId);
255+
if (dotNetStreamReference == default)
256+
{
257+
yield break;
258+
}
259+
260+
var buffer = ArrayPool<byte>.Shared.Rent(32 * 1024);
261+
262+
try
263+
{
264+
int bytesRead;
265+
while ((bytesRead = await circuitHost.SendDotNetStreamAsync(dotNetStreamReference, streamId, buffer)) > 0)
266+
{
267+
yield return new ArraySegment<byte>(buffer, 0, bytesRead);
268+
}
269+
}
270+
finally
271+
{
272+
ArrayPool<byte>.Shared.Return(buffer, clearArray: true);
273+
274+
if (!dotNetStreamReference.LeaveOpen)
275+
{
276+
dotNetStreamReference.Stream?.Dispose();
277+
}
278+
}
279+
}
280+
245281
public async ValueTask OnRenderCompleted(long renderId, string errorMessageOrNull)
246282
{
247283
var circuitHost = await GetActiveCircuitAsync();
@@ -325,6 +361,9 @@ private static class Log
325361
private static readonly Action<ILogger, string, Exception> _invalidCircuitId =
326362
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(8, "InvalidCircuitId"), "ConnectAsync received an invalid circuit id '{CircuitIdSecret}'");
327363

364+
private static readonly Action<ILogger, Exception> _sendingDotNetStreamFailed =
365+
LoggerMessage.Define(LogLevel.Debug, new EventId(9, "SendingDotNetStreamFailed"), "Sending the .NET stream data to JS failed");
366+
328367
public static void ReceivedConfirmationForBatch(ILogger logger, long batchId) => _receivedConfirmationForBatch(logger, batchId, null);
329368

330369
public static void CircuitAlreadyInitialized(ILogger logger, CircuitId circuitId) => _circuitAlreadyInitialized(logger, circuitId, null);
@@ -337,6 +376,8 @@ private static class Log
337376

338377
public static void CircuitInitializationFailed(ILogger logger, Exception exception) => _circuitInitializationFailed(logger, exception);
339378

379+
public static void SendingDotNetStreamFailed(ILogger logger, Exception exception) => _sendingDotNetStreamFailed(logger, exception);
380+
340381
public static void CreatedCircuit(ILogger logger, CircuitId circuitId, string circuitSecret, string connectionId)
341382
{
342383
// Redact the secret unless tracing is on.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Buffers;
5+
using Microsoft.JSInterop;
6+
7+
namespace Microsoft.AspNetCore.Components
8+
{
9+
/// <Summary>
10+
/// A stream that pulls each chunk on demand using JavaScript interop. This implementation is used for
11+
/// WebAssembly and WebView applications.
12+
/// </Summary>
13+
internal static class TransmitDataStreamToJS
14+
{
15+
internal static async Task TransmitStreamAsync(IJSRuntime runtime, long streamId, DotNetStreamReference dotNetStreamReference)
16+
{
17+
var buffer = ArrayPool<byte>.Shared.Rent(32 * 1024);
18+
19+
try
20+
{
21+
int bytesRead;
22+
while ((bytesRead = await dotNetStreamReference.Stream.ReadAsync(buffer)) > 0)
23+
{
24+
await runtime.InvokeVoidAsync("Blazor._internal.receiveDotNetDataStream", streamId, buffer, bytesRead, null);
25+
}
26+
27+
// Notify client that the stream has completed
28+
await runtime.InvokeVoidAsync("Blazor._internal.receiveDotNetDataStream", streamId, Array.Empty<byte>(), 0, null);
29+
}
30+
catch (Exception ex)
31+
{
32+
try
33+
{
34+
// Attempt to notify the client of the error.
35+
await runtime.InvokeVoidAsync("Blazor._internal.receiveDotNetDataStream", streamId, Array.Empty<byte>(), 0, ex.Message);
36+
}
37+
catch
38+
{
39+
// JS Interop encountered an issue, unable to send error message to JS.
40+
}
41+
42+
throw;
43+
}
44+
finally
45+
{
46+
ArrayPool<byte>.Shared.Return(buffer, clearArray: true);
47+
48+
if (!dotNetStreamReference.LeaveOpen)
49+
{
50+
dotNetStreamReference.Stream?.Dispose();
51+
}
52+
}
53+
}
54+
55+
}
56+
}

src/Components/Web.JS/src/Boot.Server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ async function initializeConnection(options: CircuitStartOptions, logger: Logger
101101
connection.on('JS.EndInvokeDotNet', DotNet.jsCallDispatcher.endInvokeDotNetFromJS);
102102
connection.on('JS.ReceiveByteArray', DotNet.jsCallDispatcher.receiveByteArray);
103103

104+
connection.on('JS.BeginTransmitStream', (streamId: number) => {
105+
const readableStream = new ReadableStream({
106+
start(controller) {
107+
connection.stream('SendDotNetStreamToJS', streamId).subscribe({
108+
next: (chunk: Uint8Array) => controller.enqueue(chunk),
109+
complete: () => controller.close(),
110+
error: (err) => controller.error(err),
111+
});
112+
}
113+
});
114+
115+
DotNet.jsCallDispatcher.supplyDotNetStream(streamId, readableStream);
116+
});
117+
104118
const renderQueue = RenderQueue.getOrCreate(logger);
105119
connection.on('JS.RenderBatch', (batchId: number, batchData: Uint8Array) => {
106120
logger.log(LogLevel.Debug, `Received render batch with id ${batchId} and ${batchData.byteLength} bytes.`);

src/Components/Web.JS/src/GlobalExports.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { DefaultReconnectionHandler } from './Platform/Circuits/DefaultReconnect
99
import { CircuitStartOptions } from './Platform/Circuits/CircuitStartOptions';
1010
import { WebAssemblyStartOptions } from './Platform/WebAssemblyStartOptions';
1111
import { Platform, Pointer, System_String, System_Array, System_Object, System_Boolean, System_Byte, System_Int } from './Platform/Platform';
12-
import { getNextChunk } from './StreamingInterop';
12+
import { getNextChunk, receiveDotNetDataStream } from './StreamingInterop';
1313
import { RootComponentsFunctions } from './Rendering/JSRootComponents';
1414
import { attachWebRendererInterop } from './Rendering/WebRendererInteropMethods';
1515

@@ -56,6 +56,7 @@ interface IBlazor {
5656
getSatelliteAssemblies?: any,
5757
sendJSDataStream?: (data: any, streamId: number, chunkSize: number) => void,
5858
getJSDataStreamChunk?: (data: any, position: number, chunkSize: number) => Promise<Uint8Array>,
59+
receiveDotNetDataStream?: (streamId: number, data: any, bytesRead: number, errorMessage: string) => void,
5960
attachWebRendererInterop?: typeof attachWebRendererInterop,
6061

6162
// APIs invoked by hot reload
@@ -76,6 +77,7 @@ export const Blazor: IBlazor = {
7677
PageTitle,
7778
InputFile,
7879
getJSDataStreamChunk: getNextChunk,
80+
receiveDotNetDataStream: receiveDotNetDataStream,
7981
attachWebRendererInterop,
8082
},
8183
};

src/Components/Web.JS/src/Platform/WebView/WebViewIpcReceiver.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { attachRootComponentToElement, renderBatch } from '../../Rendering/Rende
55
import { setApplicationIsTerminated, tryDeserializeMessage } from './WebViewIpcCommon';
66
import { sendRenderCompleted } from './WebViewIpcSender';
77
import { internalFunctions as navigationManagerFunctions } from '../../Services/NavigationManager';
8+
import { receiveDotNetDataStream } from '../../StreamingInterop';
89

910
export function startIpcReceiver() {
1011
const messageHandlers = {

src/Components/Web.JS/src/StreamingInterop.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { DotNet } from '@microsoft/dotnet-js-interop';
2+
13
export async function getNextChunk(data: ArrayBufferView | Blob, position: number, nextChunkSize: number): Promise<Uint8Array> {
24
if (data instanceof Blob) {
35
return await getChunkFromBlob(data, position, nextChunkSize);
@@ -17,3 +19,28 @@ function getChunkFromArrayBufferView(data: ArrayBufferView, position: number, ne
1719
const nextChunkData = new Uint8Array(data.buffer, data.byteOffset + position, nextChunkSize);
1820
return nextChunkData;
1921
}
22+
23+
const transmittingDotNetToJSStreams = new Map<number, ReadableStreamController<any>>();
24+
export function receiveDotNetDataStream(streamId: number, data: Uint8Array, bytesRead: number, errorMessage: string): void {
25+
let streamController = transmittingDotNetToJSStreams.get(streamId);
26+
if (!streamController) {
27+
const readableStream = new ReadableStream({
28+
start(controller) {
29+
transmittingDotNetToJSStreams.set(streamId, controller);
30+
streamController = controller;
31+
}
32+
});
33+
34+
DotNet.jsCallDispatcher.supplyDotNetStream(streamId, readableStream);
35+
}
36+
37+
if (errorMessage) {
38+
streamController!.error(errorMessage);
39+
transmittingDotNetToJSStreams.delete(streamId);
40+
} else if (bytesRead === 0) {
41+
streamController!.close();
42+
transmittingDotNetToJSStreams.delete(streamId);
43+
} else {
44+
streamController!.enqueue(data.length === bytesRead ? data : data.subarray(0, bytesRead));
45+
}
46+
}

0 commit comments

Comments
 (0)