diff --git a/src/Components/Samples/BlazorServerApp/Pages/Index.razor b/src/Components/Samples/BlazorServerApp/Pages/Index.razor index 16dac3192520..a8236e75dd68 100644 --- a/src/Components/Samples/BlazorServerApp/Pages/Index.razor +++ b/src/Components/Samples/BlazorServerApp/Pages/Index.razor @@ -1,5 +1,19 @@ @page "/" +@inject IJSRuntime JS

Hello, world!

Welcome to your new app. + + + + + +@code { + async Task SendStream() + { + using var data = new System.IO.MemoryStream(new byte[10 * 1024 * 1024]); + using var streamRef = new DotNetStreamReference(data); + await JS.InvokeVoidAsync("getTheStream", streamRef); + } +} diff --git a/src/Components/Samples/BlazorServerApp/Pages/_Host.cshtml b/src/Components/Samples/BlazorServerApp/Pages/_Host.cshtml index 1c35c6954a47..f5443e6511a8 100644 --- a/src/Components/Samples/BlazorServerApp/Pages/_Host.cshtml +++ b/src/Components/Samples/BlazorServerApp/Pages/_Host.cshtml @@ -24,5 +24,17 @@ }); })() + diff --git a/src/Components/Samples/BlazorServerApp/Program.cs b/src/Components/Samples/BlazorServerApp/Program.cs index 562b3ade5b5b..8a023858cec3 100644 --- a/src/Components/Samples/BlazorServerApp/Program.cs +++ b/src/Components/Samples/BlazorServerApp/Program.cs @@ -8,6 +8,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.JSInterop; namespace BlazorServerApp { @@ -24,5 +25,12 @@ public static IHostBuilder CreateHostBuilder(string[] args) => { webBuilder.UseStartup(); }); + + [JSInvokable] + public static DotNetStreamReference GetFileData() + { + var arbitraryFilename = typeof(Program).Assembly.Location; + return new DotNetStreamReference(File.OpenRead(arbitraryFilename)); + } } } diff --git a/src/Components/Server/src/Circuits/CircuitHost.cs b/src/Components/Server/src/Circuits/CircuitHost.cs index 29dc0bf2f625..c3f74e77cbb4 100644 --- a/src/Components/Server/src/Circuits/CircuitHost.cs +++ b/src/Components/Server/src/Circuits/CircuitHost.cs @@ -4,8 +4,10 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.IO; using System.Security.Claims; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.Components.Authorization; using Microsoft.AspNetCore.Components.Web; @@ -505,6 +507,39 @@ public void SendPendingBatches() _ = Renderer.Dispatcher.InvokeAsync(() => Renderer.ProcessBufferedRenderBatches()); } + public async Task SendDotNetStreamAsync(long streamId, ChannelWriter> writer) + { + if (!JSRuntime.TryClaimPendingStreamForSending(streamId, out var stream, out var leaveOpen)) + { + throw new InvalidOperationException($"The stream with ID {streamId} is not available. It may have timed out."); + } + + try + { + var buffer = new byte[64 * 1024]; // TODO: Consider optimal chunk size, consider pooling + int bytesRead; + while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0) + { + // We have to stop sending if the circuit disconnects. It doesn't stop otherwise. + if (_disposed) + { + break; + } + + // TODO: Is there any risk that, since there's no cancellation token here, the client could + // make us pause indefinitely? + await writer.WriteAsync(new ArraySegment(buffer, 0, bytesRead)); + } + } + finally + { + if (!leaveOpen) + { + stream?.Dispose(); + } + } + } + private void AssertInitialized() { if (!_initialized) diff --git a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs index a11ad560c057..b01a25a4b908 100644 --- a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs +++ b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs @@ -2,7 +2,11 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Collections.Concurrent; +using System.IO; using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -16,6 +20,7 @@ internal class RemoteJSRuntime : JSRuntime private readonly CircuitOptions _options; private readonly ILogger _logger; private CircuitClientProxy _clientProxy; + private ConcurrentDictionary _pendingStreams = new(); public ElementReferenceContext ElementReferenceContext { get; } @@ -89,6 +94,48 @@ protected override void BeginInvokeJS(long asyncHandle, string identifier, strin _clientProxy.SendAsync("JS.BeginInvokeJS", asyncHandle, identifier, argsJson, (int)resultType, targetInstanceId); } + protected override void BeginTransmittingStream(long streamId, Stream stream, bool leaveOpen) + { + _ = TransmitStreamAsync(streamId, stream, leaveOpen); + } + + private async Task TransmitStreamAsync(long streamId, Stream stream, bool leaveOpen) + { + if (!_pendingStreams.TryAdd(streamId, (stream, leaveOpen))) + { + throw new ArgumentException($"The stream {streamId} is already pending."); + } + + // SignalR only supports streaming being initiated from the JS side, so we have to ask it to + // start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up + // and discard it. + var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; + cancellationToken.Register(() => + { + // If by now the stream hasn't been claimed for sending, stop tracking it + if (_pendingStreams.TryRemove(streamId, out var timedOutStream) && !timedOutStream.LeaveOpen) + { + timedOutStream.Stream.Dispose(); + } + }); + + await _clientProxy.SendAsync("JS.BeginTransmitStream", streamId); + } + + public bool TryClaimPendingStreamForSending(long streamId, out Stream stream, out bool leaveOpen) + { + if (_pendingStreams.TryRemove(streamId, out var pendingStream)) + { + (stream, leaveOpen) = pendingStream; + return true; + } + else + { + (stream, leaveOpen) = (default, default); + return false; + } + } + public static class Log { private static readonly Action _beginInvokeJS = diff --git a/src/Components/Server/src/ComponentHub.cs b/src/Components/Server/src/ComponentHub.cs index a0b4db6accd5..512a18b6775d 100644 --- a/src/Components/Server/src/ComponentHub.cs +++ b/src/Components/Server/src/ComponentHub.cs @@ -2,7 +2,10 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; +using System.IO; using System.Runtime.CompilerServices; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.Components.Server.Circuits; using Microsoft.AspNetCore.DataProtection; @@ -209,6 +212,42 @@ public async ValueTask EndInvokeJSFromDotNet(long asyncHandle, bool succeeded, s _ = circuitHost.EndInvokeJSFromDotNet(asyncHandle, succeeded, arguments); } + public ChannelReader> SendDotNetStreamToJS(long streamId) + { + var channel = Channel.CreateUnbounded>(); + _ = WriteStreamDataAsync(channel.Writer, streamId); + return channel.Reader; + + async Task WriteStreamDataAsync(ChannelWriter> writer, long streamId) + { + Exception localException = null; + + try + { + var circuitHost = await GetActiveCircuitAsync(); + if (circuitHost == null) + { + return; + } + + await circuitHost.SendDotNetStreamAsync(streamId, writer); + } + catch (Exception ex) + { + localException = ex; + } + finally + { + // TODO: Don't send the exception to the client if detailed errors is off. + // In that case, log detailed error info on the server only. From experiments it + // doesn't seem like SignalR sends the actual exception info anyway, which is a + // bit awkward because in development we would like it to show up. Maybe if we're + // logging the detailed error info on the .NET side anyway that might be enough. + writer.Complete(localException); + } + } + } + public async ValueTask DispatchBrowserEvent(string eventDescriptor, string eventArgs) { var circuitHost = await GetActiveCircuitAsync(); diff --git a/src/Components/Web.JS/src/Boot.Server.ts b/src/Components/Web.JS/src/Boot.Server.ts index 1ddc3f8728c7..ae9e1b770f00 100644 --- a/src/Components/Web.JS/src/Boot.Server.ts +++ b/src/Components/Web.JS/src/Boot.Server.ts @@ -106,6 +106,20 @@ async function initializeConnection(options: CircuitStartOptions, logger: Logger connection.on('JS.BeginInvokeJS', DotNet.jsCallDispatcher.beginInvokeJSFromDotNet); connection.on('JS.EndInvokeDotNet', DotNet.jsCallDispatcher.endInvokeDotNetFromJS); + connection.on('JS.BeginTransmitStream', (streamId: number) => { + const readableStream = new ReadableStream({ + start(controller) { + connection.stream('SendDotNetStreamToJS', streamId).subscribe({ + next: (chunk: Uint8Array) => controller.enqueue(chunk), + complete: () => controller.close(), + error: (err) => controller.error(err), + }); + } + }); + + DotNet.jsCallDispatcher.supplyDotNetStream(streamId, readableStream); + }); + const renderQueue = RenderQueue.getOrCreate(logger); connection.on('JS.RenderBatch', (batchId: number, batchData: Uint8Array) => { logger.log(LogLevel.Debug, `Received render batch with id ${batchId} and ${batchData.byteLength} bytes.`); diff --git a/src/JSInterop/Microsoft.JSInterop.JS/src/src/Microsoft.JSInterop.ts b/src/JSInterop/Microsoft.JSInterop.JS/src/src/Microsoft.JSInterop.ts index d2d564147ba3..c69d4a2dda1a 100644 --- a/src/JSInterop/Microsoft.JSInterop.JS/src/src/Microsoft.JSInterop.ts +++ b/src/JSInterop/Microsoft.JSInterop.JS/src/src/Microsoft.JSInterop.ts @@ -346,7 +346,27 @@ export module DotNet { ? parseJsonWithRevivers(resultJsonOrExceptionMessage) : new Error(resultJsonOrExceptionMessage); completePendingCall(parseInt(asyncCallId), success, resultOrError); - } + }, + + /** + * Supplies a stream of data being sent from .NET. + * + * @param streamId The identifier previously passed to JSRuntime's BeginTransmittingStream in .NET code + * @param streamId The stream data. + */ + supplyDotNetStream: (streamId: number, stream: ReadableStream) => { + if (_pendingStreams.has(streamId)) { + // The receiver is already waiting, so we can resolve the promise now and stop tracking this + const pendingStream = _pendingStreams.get(streamId)!; + _pendingStreams.delete(streamId); + pendingStream.resolve!(stream); + } else { + // The receiver hasn't started waiting yet, so track a pre-completed entry it can attach to later + const pendingStream = new PendingStream(); + pendingStream.resolve!(stream); + _pendingStreams.set(streamId, pendingStream); + } + }, } function formatError(error: any): string { @@ -394,9 +414,17 @@ export module DotNet { } const dotNetObjectRefKey = '__dotNetObject'; + const dotNetStreamRefKey = '__dotNetStream'; attachReviver(function reviveDotNetObject(key: any, value: any) { - if (value && typeof value === 'object' && value.hasOwnProperty(dotNetObjectRefKey)) { - return new DotNetObject(value.__dotNetObject); + if (value && typeof value === 'object') { + // At some point, instead of looking for different keys, we should have some common format + // like { __jsInteropType: 'object' } or { __jsInteropType: 'stream' } so that we only have + // to check a single property to determine whether it's a special thing. + if (value.hasOwnProperty(dotNetObjectRefKey)) { + return new DotNetObject(value.__dotNetObject); + } else if (value.hasOwnProperty(dotNetStreamRefKey)) { + return new DotNetStream(value.__dotNetStream) + } } // Unrecognized - let another reviver handle it @@ -419,6 +447,50 @@ export module DotNet { return value; }); + class DotNetStream { + private _streamPromise: Promise; + + constructor(streamId: number) { + // This constructor runs when we're JSON-deserializing some value from the .NET side. + // At this point we might already have started receiving the stream, or maybe it will come later. + // We have to handle both possible orderings, but we can count on it coming eventually because + // it's not something the developer gets to control, and it would be an error if it doesn't. + if (_pendingStreams.has(streamId)) { + // We've already started receiving the stream, so no longer need to track it as pending + this._streamPromise = _pendingStreams.get(streamId)?.streamPromise!; + _pendingStreams.delete(streamId); + } else { + // We haven't started receiving it yet, so add an entry to track it as pending + const pendingStream = new PendingStream(); + _pendingStreams.set(streamId, pendingStream); + this._streamPromise = pendingStream.streamPromise; + } + } + + stream(): Promise { + return this._streamPromise; + } + + async arrayBuffer(): Promise { + return new Response(await this.stream()).arrayBuffer(); + } + } + + const _pendingStreams = new Map(); + + class PendingStream { + streamPromise: Promise; + resolve?: (value: ReadableStream) => void; + reject?: (reason: any) => void; + + constructor() { + this.streamPromise = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); + } + } + function createJSCallResult(returnValue: any, resultType: JSCallResultType) { switch (resultType) { case JSCallResultType.Default: diff --git a/src/JSInterop/Microsoft.JSInterop/src/DotNetStreamReference.cs b/src/JSInterop/Microsoft.JSInterop/src/DotNetStreamReference.cs new file mode 100644 index 000000000000..171caaaf6820 --- /dev/null +++ b/src/JSInterop/Microsoft.JSInterop/src/DotNetStreamReference.cs @@ -0,0 +1,38 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO; + +namespace Microsoft.JSInterop +{ + /// + /// + /// + public sealed class DotNetStreamReference : IDisposable + { + /// + /// + /// + /// + /// + public DotNetStreamReference(Stream stream, bool leaveOpen = false) + { + Stream = stream ?? throw new ArgumentNullException(nameof(stream)); + LeaveOpen = leaveOpen; + } + + internal Stream Stream { get; } + + internal bool LeaveOpen { get; } + + /// + public void Dispose() + { + if (!LeaveOpen) + { + Stream.Dispose(); + } + } + } +} diff --git a/src/JSInterop/Microsoft.JSInterop/src/Infrastructure/DotNetStreamReferenceJsonConverter.cs b/src/JSInterop/Microsoft.JSInterop/src/Infrastructure/DotNetStreamReferenceJsonConverter.cs new file mode 100644 index 000000000000..c3d24b04d2bd --- /dev/null +++ b/src/JSInterop/Microsoft.JSInterop/src/Infrastructure/DotNetStreamReferenceJsonConverter.cs @@ -0,0 +1,37 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Microsoft.JSInterop.Infrastructure +{ + internal sealed class DotNetStreamReferenceJsonConverter : JsonConverter + { + public DotNetStreamReferenceJsonConverter(JSRuntime jsRuntime) + { + JSRuntime = jsRuntime; + } + + private readonly static JsonEncodedText DotNetStreamRefKey = JsonEncodedText.Encode("__dotNetStream"); + + public JSRuntime JSRuntime { get; } + + public override DotNetStreamReference Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + => throw new NotSupportedException($"{nameof(DotNetStreamReference)} cannot be supplied from JavaScript to .NET because the stream is released after being sent."); + + public override void Write(Utf8JsonWriter writer, DotNetStreamReference value, JsonSerializerOptions options) + { + // We only serialize a DotNetStreamReference using this converter when we're supplying that info + // to JS. We want to transmit the stream immediately as part of this process, so that the .NET side + // doesn't have to hold onto the stream waiting for JS to request it. If a developer doesn't really + // want to send the data, they shouldn't include the DotNetStreamReference in the object graph + // they are sending to the JS side. + var id = JSRuntime.BeginTransmittingStream(value); + writer.WriteStartObject(); + writer.WriteNumber(DotNetStreamRefKey, id); + writer.WriteEndObject(); + } + } +} diff --git a/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs b/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs index d92fd66f3f46..6ec23302ef5d 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs +++ b/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs @@ -5,6 +5,7 @@ using System.Collections.Concurrent; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.IO; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -39,6 +40,7 @@ protected JSRuntime() { new DotNetObjectReferenceJsonConverterFactory(this), new JSObjectReferenceJsonConverter(this), + new DotNetStreamReferenceJsonConverter(this), } }; } @@ -174,6 +176,31 @@ protected internal abstract void EndInvokeDotNet( DotNetInvocationInfo invocationInfo, in DotNetInvocationResult invocationResult); + /// + /// Transmits the stream data from .NET to JS. Subclasses should override this method and provide + /// an implementation that transports the data to JS and calls DotNet.jsCallDispatcher.supplyDotNetStream. + /// + /// An identifier for the stream. + /// The stream. + /// A flag that indicates whether the stream should be left open after transmission. If false, the method should dispose the stream after transmission. + protected internal virtual void BeginTransmittingStream(long streamId, Stream stream, bool leaveOpen) + { + if (!leaveOpen) + { + stream.Dispose(); + } + + throw new NotSupportedException("The current JS runtime does not support sending streams from .NET to JS."); + } + + internal long BeginTransmittingStream(DotNetStreamReference dotNetStreamReference) + { + // It's fine to share the ID sequence + var streamId = Interlocked.Increment(ref _nextObjectReferenceId); + BeginTransmittingStream(streamId, dotNetStreamReference.Stream, dotNetStreamReference.LeaveOpen); + return streamId; + } + [UnconditionalSuppressMessage("ReflectionAnalysis", "IL2072:RequiresUnreferencedCode", Justification = "We enforce trimmer attributes for JSON deserialized types on InvokeAsync.")] internal void EndInvokeJS(long taskId, bool succeeded, ref Utf8JsonReader jsonReader) { diff --git a/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt b/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt index 1a47a75ae77a..be08ead6c86b 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt +++ b/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt @@ -1,4 +1,7 @@ #nullable enable +Microsoft.JSInterop.DotNetStreamReference +Microsoft.JSInterop.DotNetStreamReference.Dispose() -> void +Microsoft.JSInterop.DotNetStreamReference.DotNetStreamReference(System.IO.Stream! stream, bool leaveOpen = false) -> void Microsoft.JSInterop.Implementation.JSObjectReferenceJsonWorker Microsoft.JSInterop.Infrastructure.DotNetInvocationResult.ResultJson.get -> string? static Microsoft.JSInterop.Implementation.JSObjectReferenceJsonWorker.ReadJSObjectReferenceIdentifier(ref System.Text.Json.Utf8JsonReader reader) -> long @@ -28,3 +31,4 @@ static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JS static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, System.TimeSpan timeout, params object?[]? args) -> System.Threading.Tasks.ValueTask *REMOVED*static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object![]! args) -> System.Threading.Tasks.ValueTask static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object?[]? args) -> System.Threading.Tasks.ValueTask +virtual Microsoft.JSInterop.JSRuntime.BeginTransmittingStream(long streamId, System.IO.Stream! stream, bool leaveOpen) -> void