Skip to content

.NET-to-JS stream proof of concept #32848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Components/Samples/BlazorServerApp/Pages/Index.razor
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
@page "/"
@inject IJSRuntime JS

<h1>Hello, world!</h1>

Welcome to your new app.

<button @onclick="SendStream">Send stream</button>

<button onclick="getFileData()">Get file data (as return value)</button>

@code {
async Task SendStream()
{
using var data = new System.IO.MemoryStream(new byte[10 * 1024 * 1024]);
using var streamRef = new DotNetStreamReference(data);
await JS.InvokeVoidAsync("getTheStream", streamRef);
}
}
12 changes: 12 additions & 0 deletions src/Components/Samples/BlazorServerApp/Pages/_Host.cshtml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,17 @@
});
})()
</script>
<script>
async function getTheStream(streamRef) {
const data = await streamRef.arrayBuffer();
console.log(data.byteLength);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of receiving the stream as a parameter.


async function getFileData() {
const streamRef = await DotNet.invokeMethodAsync('BlazorServerApp', 'GetFileData');
const data = await streamRef.arrayBuffer();
console.log(data.byteLength);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of receiving the stream as a return value.

</script>
</body>
</html>
8 changes: 8 additions & 0 deletions src/Components/Samples/BlazorServerApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.JSInterop;

namespace BlazorServerApp
{
Expand All @@ -24,5 +25,12 @@ public static IHostBuilder CreateHostBuilder(string[] args) =>
{
webBuilder.UseStartup<Startup>();
});

[JSInvokable]
public static DotNetStreamReference GetFileData()
{
var arbitraryFilename = typeof(Program).Assembly.Location;
return new DotNetStreamReference(File.OpenRead(arbitraryFilename));
}
}
}
35 changes: 35 additions & 0 deletions src/Components/Server/src/Circuits/CircuitHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Security.Claims;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.Components.Web;
Expand Down Expand Up @@ -505,6 +507,39 @@ public void SendPendingBatches()
_ = Renderer.Dispatcher.InvokeAsync(() => Renderer.ProcessBufferedRenderBatches());
}

public async Task SendDotNetStreamAsync(long streamId, ChannelWriter<ArraySegment<byte>> writer)
{
if (!JSRuntime.TryClaimPendingStreamForSending(streamId, out var stream, out var leaveOpen))
{
throw new InvalidOperationException($"The stream with ID {streamId} is not available. It may have timed out.");
}

try
{
var buffer = new byte[64 * 1024]; // TODO: Consider optimal chunk size, consider pooling
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
// We have to stop sending if the circuit disconnects. It doesn't stop otherwise.
if (_disposed)
{
break;
}

// TODO: Is there any risk that, since there's no cancellation token here, the client could
// make us pause indefinitely?
await writer.WriteAsync(new ArraySegment<byte>(buffer, 0, bytesRead));
}
}
finally
{
if (!leaveOpen)
{
stream?.Dispose();
}
}
}

private void AssertInitialized()
{
if (!_initialized)
Expand Down
47 changes: 47 additions & 0 deletions src/Components/Server/src/Circuits/RemoteJSRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -16,6 +20,7 @@ internal class RemoteJSRuntime : JSRuntime
private readonly CircuitOptions _options;
private readonly ILogger<RemoteJSRuntime> _logger;
private CircuitClientProxy _clientProxy;
private ConcurrentDictionary<long, (Stream Stream, bool LeaveOpen)> _pendingStreams = new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider Pipes instead of Streams? You wouldn't need to manage buffers in SendDotNetStreamAsync

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Stream instance here comes from user code, which is a more familiar and flexible API to offer.

Were you suggesting we should use Pipe as the API in user code (as in, they must supply a Pipe to us), or were you suggesting this for only if it was an internal detail?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were you suggesting this for only if it was an internal detail?

Right, I don't know the details of what users call, this just looked like a mostly internal thing.


public ElementReferenceContext ElementReferenceContext { get; }

Expand Down Expand Up @@ -89,6 +94,48 @@ protected override void BeginInvokeJS(long asyncHandle, string identifier, strin
_clientProxy.SendAsync("JS.BeginInvokeJS", asyncHandle, identifier, argsJson, (int)resultType, targetInstanceId);
}

protected override void BeginTransmittingStream(long streamId, Stream stream, bool leaveOpen)
{
_ = TransmitStreamAsync(streamId, stream, leaveOpen);
}

private async Task TransmitStreamAsync(long streamId, Stream stream, bool leaveOpen)
{
if (!_pendingStreams.TryAdd(streamId, (stream, leaveOpen)))
{
throw new ArgumentException($"The stream {streamId} is already pending.");
}

// SignalR only supports streaming being initiated from the JS side, so we have to ask it to
// start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up
// and discard it.
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;
cancellationToken.Register(() =>
{
// If by now the stream hasn't been claimed for sending, stop tracking it
if (_pendingStreams.TryRemove(streamId, out var timedOutStream) && !timedOutStream.LeaveOpen)
{
timedOutStream.Stream.Dispose();
}
});

await _clientProxy.SendAsync("JS.BeginTransmitStream", streamId);
}

public bool TryClaimPendingStreamForSending(long streamId, out Stream stream, out bool leaveOpen)
{
if (_pendingStreams.TryRemove(streamId, out var pendingStream))
{
(stream, leaveOpen) = pendingStream;
return true;
}
else
{
(stream, leaveOpen) = (default, default);
return false;
}
}

public static class Log
{
private static readonly Action<ILogger, long, string, Exception> _beginInvokeJS =
Expand Down
39 changes: 39 additions & 0 deletions src/Components/Server/src/ComponentHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Components.Server.Circuits;
using Microsoft.AspNetCore.DataProtection;
Expand Down Expand Up @@ -209,6 +212,42 @@ public async ValueTask EndInvokeJSFromDotNet(long asyncHandle, bool succeeded, s
_ = circuitHost.EndInvokeJSFromDotNet(asyncHandle, succeeded, arguments);
}

public ChannelReader<ArraySegment<byte>> SendDotNetStreamToJS(long streamId)
{
var channel = Channel.CreateUnbounded<ArraySegment<byte>>();
_ = WriteStreamDataAsync(channel.Writer, streamId);
return channel.Reader;

async Task WriteStreamDataAsync(ChannelWriter<ArraySegment<byte>> writer, long streamId)
{
Exception localException = null;

try
{
var circuitHost = await GetActiveCircuitAsync();
if (circuitHost == null)
{
return;
}

await circuitHost.SendDotNetStreamAsync(streamId, writer);
}
catch (Exception ex)
{
localException = ex;
}
finally
{
// TODO: Don't send the exception to the client if detailed errors is off.
// In that case, log detailed error info on the server only. From experiments it
// doesn't seem like SignalR sends the actual exception info anyway, which is a
// bit awkward because in development we would like it to show up. Maybe if we're
// logging the detailed error info on the .NET side anyway that might be enough.
writer.Complete(localException);
}
}
}

public async ValueTask DispatchBrowserEvent(string eventDescriptor, string eventArgs)
{
var circuitHost = await GetActiveCircuitAsync();
Expand Down
14 changes: 14 additions & 0 deletions src/Components/Web.JS/src/Boot.Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ async function initializeConnection(options: CircuitStartOptions, logger: Logger
connection.on('JS.BeginInvokeJS', DotNet.jsCallDispatcher.beginInvokeJSFromDotNet);
connection.on('JS.EndInvokeDotNet', DotNet.jsCallDispatcher.endInvokeDotNetFromJS);

connection.on('JS.BeginTransmitStream', (streamId: number) => {
const readableStream = new ReadableStream({
start(controller) {
connection.stream('SendDotNetStreamToJS', streamId).subscribe({
next: (chunk: Uint8Array) => controller.enqueue(chunk),
complete: () => controller.close(),
error: (err) => controller.error(err),
});
}
});

DotNet.jsCallDispatcher.supplyDotNetStream(streamId, readableStream);
});

const renderQueue = RenderQueue.getOrCreate(logger);
connection.on('JS.RenderBatch', (batchId: number, batchData: Uint8Array) => {
logger.log(LogLevel.Debug, `Received render batch with id ${batchId} and ${batchData.byteLength} bytes.`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,27 @@ export module DotNet {
? parseJsonWithRevivers(resultJsonOrExceptionMessage)
: new Error(resultJsonOrExceptionMessage);
completePendingCall(parseInt(asyncCallId), success, resultOrError);
}
},

/**
* Supplies a stream of data being sent from .NET.
*
* @param streamId The identifier previously passed to JSRuntime's BeginTransmittingStream in .NET code
* @param streamId The stream data.
*/
supplyDotNetStream: (streamId: number, stream: ReadableStream) => {
if (_pendingStreams.has(streamId)) {
// The receiver is already waiting, so we can resolve the promise now and stop tracking this
const pendingStream = _pendingStreams.get(streamId)!;
_pendingStreams.delete(streamId);
pendingStream.resolve!(stream);
} else {
// The receiver hasn't started waiting yet, so track a pre-completed entry it can attach to later
const pendingStream = new PendingStream();
pendingStream.resolve!(stream);
_pendingStreams.set(streamId, pendingStream);
}
},
}

function formatError(error: any): string {
Expand Down Expand Up @@ -394,9 +414,17 @@ export module DotNet {
}

const dotNetObjectRefKey = '__dotNetObject';
const dotNetStreamRefKey = '__dotNetStream';
attachReviver(function reviveDotNetObject(key: any, value: any) {
if (value && typeof value === 'object' && value.hasOwnProperty(dotNetObjectRefKey)) {
return new DotNetObject(value.__dotNetObject);
if (value && typeof value === 'object') {
// At some point, instead of looking for different keys, we should have some common format
// like { __jsInteropType: 'object' } or { __jsInteropType: 'stream' } so that we only have
// to check a single property to determine whether it's a special thing.
if (value.hasOwnProperty(dotNetObjectRefKey)) {
return new DotNetObject(value.__dotNetObject);
} else if (value.hasOwnProperty(dotNetStreamRefKey)) {
return new DotNetStream(value.__dotNetStream)
}
}

// Unrecognized - let another reviver handle it
Expand All @@ -419,6 +447,50 @@ export module DotNet {
return value;
});

class DotNetStream {
private _streamPromise: Promise<ReadableStream>;

constructor(streamId: number) {
// This constructor runs when we're JSON-deserializing some value from the .NET side.
// At this point we might already have started receiving the stream, or maybe it will come later.
// We have to handle both possible orderings, but we can count on it coming eventually because
// it's not something the developer gets to control, and it would be an error if it doesn't.
if (_pendingStreams.has(streamId)) {
// We've already started receiving the stream, so no longer need to track it as pending
this._streamPromise = _pendingStreams.get(streamId)?.streamPromise!;
_pendingStreams.delete(streamId);
} else {
// We haven't started receiving it yet, so add an entry to track it as pending
const pendingStream = new PendingStream();
_pendingStreams.set(streamId, pendingStream);
this._streamPromise = pendingStream.streamPromise;
}
}

stream(): Promise<ReadableStream> {
return this._streamPromise;
}

async arrayBuffer(): Promise<ArrayBuffer> {
return new Response(await this.stream()).arrayBuffer();
}
}

const _pendingStreams = new Map<number, PendingStream>();

class PendingStream {
streamPromise: Promise<ReadableStream>;
resolve?: (value: ReadableStream) => void;
reject?: (reason: any) => void;

constructor() {
this.streamPromise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
}

function createJSCallResult(returnValue: any, resultType: JSCallResultType) {
switch (resultType) {
case JSCallResultType.Default:
Expand Down
38 changes: 38 additions & 0 deletions src/JSInterop/Microsoft.JSInterop/src/DotNetStreamReference.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO;

namespace Microsoft.JSInterop
{
/// <summary>
///
/// </summary>
public sealed class DotNetStreamReference : IDisposable
{
/// <summary>
///
/// </summary>
/// <param name="stream"></param>
/// <param name="leaveOpen"></param>
public DotNetStreamReference(Stream stream, bool leaveOpen = false)
{
Stream = stream ?? throw new ArgumentNullException(nameof(stream));
LeaveOpen = leaveOpen;
}

internal Stream Stream { get; }

internal bool LeaveOpen { get; }

/// <inheritdoc />
public void Dispose()
{
if (!LeaveOpen)
{
Stream.Dispose();
}
}
}
}
Loading