-
Notifications
You must be signed in to change notification settings - Fork 10.5k
Blazor Streaming Interop | JS to DotNet
#33491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 28 commits
1d32e1e
66dbaf5
2509ce5
50bcb31
5b69105
519f854
b35027b
46f4ddf
c77fd3e
e1635e6
1b76862
c97360c
b266fba
7133bd6
e3956fe
639b34f
7f89312
115d935
1e01c62
80a2171
6a931ad
e799997
9296a36
e07686d
62f2338
e7f44a2
12f7b89
e37413b
eafd3f8
903b16d
0bb3e6c
381c53d
6e09532
350f4de
b5615c7
53f0392
3cd7619
d549784
7589cd0
9e63bb6
f565366
1b8ed19
c1e0873
6e91d84
20e8489
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| // Copyright (c) .NET Foundation. All rights reserved. | ||
| // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
|
||
| using System; | ||
| using System.Buffers; | ||
| using System.IO; | ||
| using System.IO.Pipelines; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.JSInterop; | ||
|
|
||
| namespace Microsoft.AspNetCore.Components.Server.Circuits | ||
| { | ||
| internal sealed class RemoteJSDataStream : Stream | ||
| { | ||
| private readonly RemoteJSRuntime _runtime; | ||
| private readonly long _streamId; | ||
| private readonly long _totalLength; | ||
| private readonly CancellationToken _streamCancellationToken; | ||
| private readonly Stream _pipeReaderStream; | ||
| private readonly Pipe _pipe; | ||
| private long _bytesRead; | ||
|
|
||
| public static async Task<bool> ReceiveData(RemoteJSRuntime runtime, long streamId, byte[] chunk, string error) | ||
| { | ||
| if (!runtime.RemoteJSDataStreamInstances.TryGetValue(streamId, out var instance)) | ||
| { | ||
| // There is no data stream with the given identifier. It may have already been disposed. | ||
| // We notify JS that the stream has been cancelled/disposed. | ||
| return false; | ||
| } | ||
|
|
||
| return await instance.ReceiveData(chunk, error); | ||
| } | ||
|
|
||
| public static async ValueTask<RemoteJSDataStream> CreateRemoteJSDataStreamAsync( | ||
| RemoteJSRuntime runtime, | ||
| IJSDataReference jsDataReference, | ||
| long totalLength, | ||
| long maxBufferSize, | ||
| long maximumIncomingBytes, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| var streamId = runtime.RemoteJSDataStreamNextInstanceId++; | ||
TanayParikh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, maxBufferSize, cancellationToken); | ||
| await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsDataReference, streamId, maximumIncomingBytes); | ||
TanayParikh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
TanayParikh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return remoteJSDataStream; | ||
| } | ||
|
|
||
| private RemoteJSDataStream( | ||
| RemoteJSRuntime runtime, | ||
| long streamId, | ||
| long totalLength, | ||
| long maxBufferSize, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| _runtime = runtime; | ||
| _streamId = streamId; | ||
| _totalLength = totalLength; | ||
| _streamCancellationToken = cancellationToken; | ||
|
|
||
| _runtime.RemoteJSDataStreamInstances.Add(_streamId, this); | ||
|
|
||
| _pipe = new Pipe(new PipeOptions(pauseWriterThreshold: maxBufferSize, resumeWriterThreshold: maxBufferSize / 2)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davidfowl since you are the pipelines expert, can you review our usage in this PR?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From @davidfowl
|
||
| _pipeReaderStream = _pipe.Reader.AsStream(); | ||
| } | ||
|
|
||
| // Ideally we'd have IAsyncEnumerable<ReadOnlySequence<byte>> here so we can pass through the | ||
| // data without having to copy it into a temporary buffer in BlazorPackHubProtocolWorker. But trying | ||
| // this gives strange errors; sometimes the "chunk" variable below has a negative length, even though | ||
| // the logic never returns a corrupted item as far as I can tell. | ||
TanayParikh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private async Task<bool> ReceiveData(byte[] chunk, string error) | ||
| { | ||
| try | ||
| { | ||
| if (!string.IsNullOrEmpty(error)) | ||
| { | ||
| throw new InvalidOperationException($"An error occurred while reading the remote stream: {error}"); | ||
| } | ||
|
|
||
| if (chunk.Length == 0) | ||
| { | ||
| throw new InvalidOperationException($"The incoming data chunk cannot be empty."); | ||
| } | ||
|
|
||
| _bytesRead += chunk.Length; | ||
|
|
||
| if (_bytesRead > _totalLength) | ||
| { | ||
| throw new InvalidOperationException($"The incoming data stream declared a length {_totalLength}, but {_bytesRead} bytes were read."); | ||
| } | ||
TanayParikh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| await _pipe.Writer.WriteAsync(chunk, _streamCancellationToken); | ||
|
|
||
| if (_bytesRead == _totalLength) | ||
| { | ||
| await _pipe.Writer.CompleteAsync(); | ||
| } | ||
|
|
||
| return true; | ||
TanayParikh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| catch (Exception e) | ||
| { | ||
| await _pipe.Writer.CompleteAsync(e); | ||
| Dispose(true); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| public override bool CanRead => true; | ||
|
|
||
| public override bool CanSeek => false; | ||
|
|
||
| public override bool CanWrite => false; | ||
|
|
||
| public override long Length => _totalLength; | ||
|
|
||
| public override long Position | ||
| { | ||
| get => _pipeReaderStream.Position; | ||
| set => throw new NotSupportedException(); | ||
| } | ||
|
|
||
| public override void Flush() | ||
| => throw new NotSupportedException(); | ||
|
|
||
| public override int Read(byte[] buffer, int offset, int count) | ||
| => throw new NotSupportedException("Synchronous reads are not supported."); | ||
|
|
||
| public override long Seek(long offset, SeekOrigin origin) | ||
| => throw new NotSupportedException(); | ||
|
|
||
| public override void SetLength(long value) | ||
| => throw new NotSupportedException(); | ||
|
|
||
| public override void Write(byte[] buffer, int offset, int count) | ||
| => throw new NotSupportedException(); | ||
|
|
||
| public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | ||
| { | ||
| var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken); | ||
| return await _pipeReaderStream.ReadAsync(buffer.AsMemory(offset, count), linkedCancellationToken); | ||
| } | ||
|
|
||
| public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) | ||
| { | ||
| var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken); | ||
| return await _pipeReaderStream.ReadAsync(buffer, linkedCancellationToken); | ||
| } | ||
|
|
||
| private static CancellationToken GetLinkedCancellationToken(CancellationToken a, CancellationToken b) | ||
| { | ||
| if (a.CanBeCanceled && b.CanBeCanceled) | ||
| { | ||
| return CancellationTokenSource.CreateLinkedTokenSource(a, b).Token; | ||
| } | ||
| else if (a.CanBeCanceled) | ||
| { | ||
| return a; | ||
| } | ||
|
|
||
| return b; | ||
| } | ||
|
|
||
| protected override void Dispose(bool disposing) | ||
| { | ||
| if (disposing) | ||
| { | ||
| _runtime.RemoteJSDataStreamInstances.Remove(_streamId); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.