Skip to content

.NET to JS Streaming Interop #34817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions src/Components/Server/src/Circuits/CircuitHost.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Globalization;
using System.Security.Claims;
using System.Text.Json;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Components.Web;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.JSInterop;
using Microsoft.JSInterop.Infrastructure;

namespace Microsoft.AspNetCore.Components.Server.Circuits
Expand Down Expand Up @@ -440,6 +445,56 @@ internal async Task<bool> ReceiveJSDataChunk(long streamId, long chunkId, byte[]
}
}

public async Task<int> SendDotNetStreamAsync(DotNetStreamReference dotNetStreamReference, long streamId, byte[] buffer)
{
AssertInitialized();
AssertNotDisposed();

try
{
return await Renderer.Dispatcher.InvokeAsync<int>(async () => await dotNetStreamReference.Stream.ReadAsync(buffer));
}
catch (Exception ex)
{
// An error completing stream interop means that the user sent invalid data, a well-behaved
// client won't do this.
Log.SendDotNetStreamException(_logger, streamId, ex);
await TryNotifyClientErrorAsync(Client, GetClientErrorMessage(ex, "Unable to send .NET stream."));
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(ex, isTerminating: false));
return 0;
}
}

public async Task<DotNetStreamReference> TryClaimPendingStream(long streamId)
{
AssertInitialized();
AssertNotDisposed();

DotNetStreamReference dotNetStreamReference = null;

try
{
return await Renderer.Dispatcher.InvokeAsync<DotNetStreamReference>(() =>
{
if (!JSRuntime.TryClaimPendingStreamForSending(streamId, out dotNetStreamReference))
{
throw new InvalidOperationException($"The stream with ID {streamId} is not available. It may have timed out.");
}

return dotNetStreamReference;
});
}
catch (Exception ex)
{
// An error completing stream interop means that the user sent invalid data, a well-behaved
// client won't do this.
Log.SendDotNetStreamException(_logger, streamId, ex);
await TryNotifyClientErrorAsync(Client, GetClientErrorMessage(ex, "Unable to locate .NET stream."));
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(ex, isTerminating: false));
return default;
}
}

// OnLocationChangedAsync is used in a fire-and-forget context, so it's responsible for its own
// error handling.
public async Task OnLocationChangedAsync(string uri, bool intercepted)
Expand Down Expand Up @@ -615,6 +670,7 @@ private static class Log
private static readonly Action<ILogger, long, Exception> _receiveByteArraySuccess;
private static readonly Action<ILogger, long, Exception> _receiveByteArrayException;
private static readonly Action<ILogger, long, Exception> _receiveJSDataChunkException;
private static readonly Action<ILogger, long, Exception> _sendDotNetStreamException;
private static readonly Action<ILogger, Exception> _dispatchEventFailedToParseEventData;
private static readonly Action<ILogger, string, Exception> _dispatchEventFailedToDispatchEvent;
private static readonly Action<ILogger, string, CircuitId, Exception> _locationChange;
Expand Down Expand Up @@ -660,6 +716,7 @@ private static class EventIds
public static readonly EventId ReceiveByteArraySucceeded = new EventId(213, "ReceiveByteArraySucceeded");
public static readonly EventId ReceiveByteArrayException = new EventId(214, "ReceiveByteArrayException");
public static readonly EventId ReceiveJSDataChunkException = new EventId(215, "ReceiveJSDataChunkException");
public static readonly EventId SendDotNetStreamException = new EventId(216, "SendDotNetStreamException");
}

static Log()
Expand Down Expand Up @@ -790,9 +847,14 @@ static Log()
"The ReceiveByteArray call with id '{id}' failed.");

_receiveJSDataChunkException = LoggerMessage.Define<long>(
LogLevel.Debug,
EventIds.ReceiveJSDataChunkException,
"The ReceiveJSDataChunk call with stream id '{streamId}' failed.");

_sendDotNetStreamException = LoggerMessage.Define<long>(
LogLevel.Debug,
EventIds.ReceiveJSDataChunkException,
"The ReceiveJSDataChunk call with stream id '{streamId}' failed.");
EventIds.SendDotNetStreamException,
"The SendDotNetStreamAsync call with id '{id}' failed.");

_dispatchEventFailedToParseEventData = LoggerMessage.Define(
LogLevel.Debug,
Expand Down Expand Up @@ -856,9 +918,10 @@ public static void CircuitHandlerFailed(ILogger logger, CircuitHandler handler,
public static void EndInvokeDispatchException(ILogger logger, Exception ex) => _endInvokeDispatchException(logger, ex);
public static void EndInvokeJSFailed(ILogger logger, long asyncHandle, string arguments) => _endInvokeJSFailed(logger, asyncHandle, arguments, null);
public static void EndInvokeJSSucceeded(ILogger logger, long asyncCall) => _endInvokeJSSucceeded(logger, asyncCall, null);
internal static void ReceiveByteArraySuccess(ILogger logger, long id) => _receiveByteArraySuccess(logger, id, null);
internal static void ReceiveByteArrayException(ILogger logger, long id, Exception ex) => _receiveByteArrayException(logger, id, ex);
internal static void ReceiveJSDataChunkException(ILogger logger, long streamId, Exception ex) => _receiveJSDataChunkException(logger, streamId, ex);
public static void ReceiveByteArraySuccess(ILogger logger, long id) => _receiveByteArraySuccess(logger, id, null);
public static void ReceiveByteArrayException(ILogger logger, long id, Exception ex) => _receiveByteArrayException(logger, id, ex);
public static void ReceiveJSDataChunkException(ILogger logger, long streamId, Exception ex) => _receiveJSDataChunkException(logger, streamId, ex);
public static void SendDotNetStreamException(ILogger logger, long streamId, Exception ex) => _sendDotNetStreamException(logger, streamId, ex);
public static void DispatchEventFailedToParseEventData(ILogger logger, Exception ex) => _dispatchEventFailedToParseEventData(logger, ex);
public static void DispatchEventFailedToDispatchEvent(ILogger logger, string eventHandlerId, Exception ex) => _dispatchEventFailedToDispatchEvent(logger, eventHandlerId ?? "", ex);

Expand Down
36 changes: 36 additions & 0 deletions src/Components/Server/src/Circuits/RemoteJSRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
Expand All @@ -20,6 +21,7 @@ internal class RemoteJSRuntime : JSRuntime
private readonly CircuitOptions _options;
private readonly ILogger<RemoteJSRuntime> _logger;
private CircuitClientProxy _clientProxy;
private readonly ConcurrentDictionary<long, DotNetStreamReference> _pendingDotNetToJSStreams = new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the concurrent dictionary here? Shouldn't this already be scoped to the circuit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancellationToken.Register which is used to evict streams which aren't read from after the timeout, does not capture the Synchronization context.. Hence, without a ConcurrentDictionary we could run into potential issues when the cancellation token expires (and attempts to evict the stream) at the exact time we get a call into TryClaimPendingStreamForSending to evict the stream for consumption.

private bool _permanentlyDisconnected;
private readonly long _maximumIncomingBytes;
private int _byteArraysToBeRevivedTotalBytes;
Expand Down Expand Up @@ -151,6 +153,40 @@ protected override void ReceiveByteArray(int id, byte[] data)
base.ReceiveByteArray(id, data);
}

protected override async Task TransmitStreamAsync(long streamId, DotNetStreamReference dotNetStreamReference)
{
if (!_pendingDotNetToJSStreams.TryAdd(streamId, dotNetStreamReference))
{
throw new ArgumentException($"The stream {streamId} is already pending.");
}

// SignalR only supports streaming being initiated from the JS side, so we have to ask it to
// start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up
// and discard it.
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;
cancellationToken.Register(() =>
{
// If by now the stream hasn't been claimed for sending, stop tracking it
if (_pendingDotNetToJSStreams.TryRemove(streamId, out var timedOutStream) && !timedOutStream.LeaveOpen)
{
timedOutStream.Stream.Dispose();
}
});

await _clientProxy.SendAsync("JS.BeginTransmitStream", streamId);
}

public bool TryClaimPendingStreamForSending(long streamId, out DotNetStreamReference pendingStream)
{
if (_pendingDotNetToJSStreams.TryRemove(streamId, out pendingStream))
{
return true;
}

pendingStream = default;
return false;
}

public void MarkPermanentlyDisconnected()
{
_permanentlyDisconnected = true;
Expand Down
41 changes: 41 additions & 0 deletions src/Components/Server/src/ComponentHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Components.Server.Circuits;
using Microsoft.AspNetCore.DataProtection;
Expand Down Expand Up @@ -242,6 +243,41 @@ public async ValueTask<bool> ReceiveJSDataChunk(long streamId, long chunkId, byt
return await circuitHost.ReceiveJSDataChunk(streamId, chunkId, chunk, error);
}

public async IAsyncEnumerable<ArraySegment<byte>> SendDotNetStreamToJS(long streamId)
{
var circuitHost = await GetActiveCircuitAsync();
if (circuitHost == null)
{
yield break;
}

var dotNetStreamReference = await circuitHost.TryClaimPendingStream(streamId);
if (dotNetStreamReference == default)
{
yield break;
}

var buffer = ArrayPool<byte>.Shared.Rent(32 * 1024);

try
{
int bytesRead;
while ((bytesRead = await circuitHost.SendDotNetStreamAsync(dotNetStreamReference, streamId, buffer)) > 0)
{
yield return new ArraySegment<byte>(buffer, 0, bytesRead);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer, clearArray: true);

if (!dotNetStreamReference.LeaveOpen)
{
dotNetStreamReference.Stream?.Dispose();
}
}
}

public async ValueTask OnRenderCompleted(long renderId, string errorMessageOrNull)
{
var circuitHost = await GetActiveCircuitAsync();
Expand Down Expand Up @@ -325,6 +361,9 @@ private static class Log
private static readonly Action<ILogger, string, Exception> _invalidCircuitId =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(8, "InvalidCircuitId"), "ConnectAsync received an invalid circuit id '{CircuitIdSecret}'");

private static readonly Action<ILogger, Exception> _sendingDotNetStreamFailed =
LoggerMessage.Define(LogLevel.Debug, new EventId(9, "SendingDotNetStreamFailed"), "Sending the .NET stream data to JS failed");

public static void ReceivedConfirmationForBatch(ILogger logger, long batchId) => _receivedConfirmationForBatch(logger, batchId, null);

public static void CircuitAlreadyInitialized(ILogger logger, CircuitId circuitId) => _circuitAlreadyInitialized(logger, circuitId, null);
Expand All @@ -337,6 +376,8 @@ private static class Log

public static void CircuitInitializationFailed(ILogger logger, Exception exception) => _circuitInitializationFailed(logger, exception);

public static void SendingDotNetStreamFailed(ILogger logger, Exception exception) => _sendingDotNetStreamFailed(logger, exception);

public static void CreatedCircuit(ILogger logger, CircuitId circuitId, string circuitSecret, string connectionId)
{
// Redact the secret unless tracing is on.
Expand Down
56 changes: 56 additions & 0 deletions src/Components/Shared/src/TransmitDataStreamToJS.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using Microsoft.JSInterop;

namespace Microsoft.AspNetCore.Components
{
/// <Summary>
/// A stream that pulls each chunk on demand using JavaScript interop. This implementation is used for
/// WebAssembly and WebView applications.
/// </Summary>
internal static class TransmitDataStreamToJS
{
internal static async Task TransmitStreamAsync(IJSRuntime runtime, long streamId, DotNetStreamReference dotNetStreamReference)
{
var buffer = ArrayPool<byte>.Shared.Rent(32 * 1024);

try
{
int bytesRead;
while ((bytesRead = await dotNetStreamReference.Stream.ReadAsync(buffer)) > 0)
{
await runtime.InvokeVoidAsync("Blazor._internal.receiveDotNetDataStream", streamId, buffer, bytesRead, null);
}

// Notify client that the stream has completed
await runtime.InvokeVoidAsync("Blazor._internal.receiveDotNetDataStream", streamId, Array.Empty<byte>(), 0, null);
}
catch (Exception ex)
{
try
{
// Attempt to notify the client of the error.
await runtime.InvokeVoidAsync("Blazor._internal.receiveDotNetDataStream", streamId, Array.Empty<byte>(), 0, ex.Message);
}
catch
{
// JS Interop encountered an issue, unable to send error message to JS.
}

throw;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer, clearArray: true);

if (!dotNetStreamReference.LeaveOpen)
{
dotNetStreamReference.Stream?.Dispose();
}
}
}

}
}
14 changes: 14 additions & 0 deletions src/Components/Web.JS/src/Boot.Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ async function initializeConnection(options: CircuitStartOptions, logger: Logger
connection.on('JS.EndInvokeDotNet', DotNet.jsCallDispatcher.endInvokeDotNetFromJS);
connection.on('JS.ReceiveByteArray', DotNet.jsCallDispatcher.receiveByteArray);

connection.on('JS.BeginTransmitStream', (streamId: number) => {
const readableStream = new ReadableStream({
start(controller) {
connection.stream('SendDotNetStreamToJS', streamId).subscribe({
next: (chunk: Uint8Array) => controller.enqueue(chunk),
complete: () => controller.close(),
error: (err) => controller.error(err),
});
}
});

DotNet.jsCallDispatcher.supplyDotNetStream(streamId, readableStream);
});

const renderQueue = RenderQueue.getOrCreate(logger);
connection.on('JS.RenderBatch', (batchId: number, batchData: Uint8Array) => {
logger.log(LogLevel.Debug, `Received render batch with id ${batchId} and ${batchData.byteLength} bytes.`);
Expand Down
4 changes: 3 additions & 1 deletion src/Components/Web.JS/src/GlobalExports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { DefaultReconnectionHandler } from './Platform/Circuits/DefaultReconnect
import { CircuitStartOptions } from './Platform/Circuits/CircuitStartOptions';
import { WebAssemblyStartOptions } from './Platform/WebAssemblyStartOptions';
import { Platform, Pointer, System_String, System_Array, System_Object, System_Boolean, System_Byte, System_Int } from './Platform/Platform';
import { getNextChunk } from './StreamingInterop';
import { getNextChunk, receiveDotNetDataStream } from './StreamingInterop';
import { RootComponentsFunctions } from './Rendering/JSRootComponents';
import { attachWebRendererInterop } from './Rendering/WebRendererInteropMethods';

Expand Down Expand Up @@ -56,6 +56,7 @@ interface IBlazor {
getSatelliteAssemblies?: any,
sendJSDataStream?: (data: any, streamId: number, chunkSize: number) => void,
getJSDataStreamChunk?: (data: any, position: number, chunkSize: number) => Promise<Uint8Array>,
receiveDotNetDataStream?: (streamId: number, data: any, bytesRead: number, errorMessage: string) => void,
attachWebRendererInterop?: typeof attachWebRendererInterop,

// APIs invoked by hot reload
Expand All @@ -76,6 +77,7 @@ export const Blazor: IBlazor = {
PageTitle,
InputFile,
getJSDataStreamChunk: getNextChunk,
receiveDotNetDataStream: receiveDotNetDataStream,
attachWebRendererInterop,
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { attachRootComponentToElement, renderBatch } from '../../Rendering/Rende
import { setApplicationIsTerminated, tryDeserializeMessage } from './WebViewIpcCommon';
import { sendRenderCompleted } from './WebViewIpcSender';
import { internalFunctions as navigationManagerFunctions } from '../../Services/NavigationManager';
import { receiveDotNetDataStream } from '../../StreamingInterop';

export function startIpcReceiver() {
const messageHandlers = {
Expand Down
27 changes: 27 additions & 0 deletions src/Components/Web.JS/src/StreamingInterop.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { DotNet } from '@microsoft/dotnet-js-interop';

export async function getNextChunk(data: ArrayBufferView | Blob, position: number, nextChunkSize: number): Promise<Uint8Array> {
if (data instanceof Blob) {
return await getChunkFromBlob(data, position, nextChunkSize);
Expand All @@ -17,3 +19,28 @@ function getChunkFromArrayBufferView(data: ArrayBufferView, position: number, ne
const nextChunkData = new Uint8Array(data.buffer, data.byteOffset + position, nextChunkSize);
return nextChunkData;
}

const transmittingDotNetToJSStreams = new Map<number, ReadableStreamController<any>>();
export function receiveDotNetDataStream(streamId: number, data: Uint8Array, bytesRead: number, errorMessage: string): void {
let streamController = transmittingDotNetToJSStreams.get(streamId);
if (!streamController) {
const readableStream = new ReadableStream({
start(controller) {
transmittingDotNetToJSStreams.set(streamId, controller);
streamController = controller;
}
});

DotNet.jsCallDispatcher.supplyDotNetStream(streamId, readableStream);
}

if (errorMessage) {
streamController!.error(errorMessage);
transmittingDotNetToJSStreams.delete(streamId);
} else if (bytesRead === 0) {
streamController!.close();
transmittingDotNetToJSStreams.delete(streamId);
} else {
streamController!.enqueue(data.length === bytesRead ? data : data.subarray(0, bytesRead));
}
}
Loading