Skip to content

Commit f52882b

Browse files
committed
Changing streaming things
1 parent 34bb352 commit f52882b

File tree

26 files changed

+669
-371
lines changed

26 files changed

+669
-371
lines changed

clients/ts/signalr/src/UploadStream.ts

Lines changed: 0 additions & 27 deletions
This file was deleted.

src/SignalR/clients/ts/FunctionalTests/TestHub.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Reactive.Linq;
6+
using System.Text;
67
using System.Threading.Channels;
78
using System.Threading.Tasks;
89
using Microsoft.AspNetCore.Http.Connections;
@@ -50,6 +51,21 @@ public ChannelReader<string> Stream()
5051
return channel.Reader;
5152
}
5253

54+
public async Task<string> StreamingConcat(ChannelReader<string> stream)
55+
{
56+
var sb = new StringBuilder();
57+
58+
while (await stream.WaitToReadAsync())
59+
{
60+
while (stream.TryRead(out var item))
61+
{
62+
sb.Append(item);
63+
}
64+
}
65+
66+
return sb.ToString();
67+
}
68+
5369
public ChannelReader<int> EmptyStream()
5470
{
5571
var channel = Channel.CreateUnbounded<int>();

src/SignalR/clients/ts/FunctionalTests/package-lock.json

Lines changed: 22 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/SignalR/clients/ts/FunctionalTests/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"karma-sauce-launcher": "^1.2.0",
3131
"karma-sourcemap-loader": "^0.3.7",
3232
"karma-summary-reporter": "^1.5.0",
33+
"rxjs": "^6.3.3",
3334
"ts-node": "^4.1.0",
3435
"typescript": "^3.0.1",
3536
"ws": " ^6.0.0"

src/SignalR/clients/ts/FunctionalTests/ts/HubConnectionTests.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { eachTransport, eachTransportAndProtocol, ENDPOINT_BASE_HTTPS_URL, ENDPO
1111
import "./LogBannerReporter";
1212
import { TestLogger } from "./TestLogger";
1313

14+
import * as RX from "rxjs";
15+
1416
const TESTHUBENDPOINT_URL = ENDPOINT_BASE_URL + "/testhub";
1517
const TESTHUBENDPOINT_HTTPS_URL = ENDPOINT_BASE_HTTPS_URL ? (ENDPOINT_BASE_HTTPS_URL + "/testhub") : undefined;
1618

@@ -531,6 +533,46 @@ describe("hubConnection", () => {
531533
done();
532534
});
533535
});
536+
537+
it("can stream from client to server with rxjs", async (done) => {
538+
const hubConnection = getConnectionBuilder(transportType)
539+
.withHubProtocol(protocol)
540+
.build();
541+
542+
await hubConnection.start();
543+
const subject = new RX.Subject<string>();
544+
const resultPromise = hubConnection.invoke<string>("StreamingConcat", subject.asObservable());
545+
subject.next("Hello ");
546+
subject.next("world");
547+
subject.next("!");
548+
subject.complete();
549+
expect(await resultPromise).toBe("Hello world!");
550+
await hubConnection.stop();
551+
done();
552+
});
553+
554+
it("can stream from client to server and close with error with rxjs", async (done) => {
555+
const hubConnection = getConnectionBuilder(transportType)
556+
.withHubProtocol(protocol)
557+
.build();
558+
559+
await hubConnection.start();
560+
const subject = new RX.Subject<string>();
561+
const resultPromise = hubConnection.invoke<string>("StreamingConcat", subject.asObservable());
562+
subject.next("Hello ");
563+
subject.next("world");
564+
subject.next("!");
565+
subject.error(new Error("Something bad"));
566+
try {
567+
await resultPromise;
568+
expect(false).toBe(true);
569+
} catch (err) {
570+
expect(err.message).toEqual("An unexpected error occurred invoking 'StreamingConcat' on the server. Exception: Something bad");
571+
} finally {
572+
await hubConnection.stop();
573+
}
574+
done();
575+
});
534576
});
535577
});
536578

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import { Buffer } from "buffer";
55
import * as msgpack5 from "msgpack5";
66

7-
import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
7+
import { CompletionMessage, HubMessage, IHubProtocol, ILogger, InvocationMessage, LogLevel, MessageHeaders, MessageType, NullLogger, StreamCompleteMessage, StreamDataMessage, StreamInvocationMessage, StreamItemMessage, TransferFormat } from "@aspnet/signalr";
88

99
import { BinaryMessageFormat } from "./BinaryMessageFormat";
1010
import { isArrayBuffer } from "./Utils";
@@ -65,11 +65,15 @@ export class MessagePackHubProtocol implements IHubProtocol {
6565
return this.writeInvocation(message as InvocationMessage);
6666
case MessageType.StreamInvocation:
6767
return this.writeStreamInvocation(message as StreamInvocationMessage);
68+
case MessageType.StreamData:
69+
return this.writeStreamData(message as StreamDataMessage);
6870
case MessageType.StreamItem:
6971
case MessageType.Completion:
7072
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
7173
case MessageType.Ping:
7274
return BinaryMessageFormat.write(SERIALIZED_PING_MESSAGE);
75+
case MessageType.StreamComplete:
76+
return this.writeStreamComplete(message as StreamCompleteMessage);
7377
default:
7478
throw new Error("Invalid message type.");
7579
}
@@ -226,6 +230,22 @@ export class MessagePackHubProtocol implements IHubProtocol {
226230
return BinaryMessageFormat.write(payload.slice());
227231
}
228232

233+
private writeStreamData(streamDataMessage: StreamDataMessage): ArrayBuffer {
234+
const msgpack = msgpack5();
235+
const payload = msgpack.encode([MessageType.StreamData, streamDataMessage.streamId,
236+
streamDataMessage.item]);
237+
238+
return BinaryMessageFormat.write(payload.slice());
239+
}
240+
241+
private writeStreamComplete(streamCompleteMessage: StreamCompleteMessage): ArrayBuffer {
242+
const msgpack = msgpack5();
243+
const payload = msgpack.encode([MessageType.StreamComplete, streamCompleteMessage.streamId,
244+
streamCompleteMessage.error || null]);
245+
246+
return BinaryMessageFormat.write(payload.slice());
247+
}
248+
229249
private readHeaders(properties: any): MessageHeaders {
230250
const headers: MessageHeaders = properties[1] as MessageHeaders;
231251
if (typeof headers !== "object") {

0 commit comments

Comments
 (0)