diff --git a/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts b/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts index 66b0a893afee..edf8d8dc72e2 100644 --- a/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts +++ b/src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts @@ -221,16 +221,28 @@ export class MessagePackHubProtocol implements IHubProtocol { private writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer { const msgpack = msgpack5(); - const payload = msgpack.encode([MessageType.Invocation, invocationMessage.headers || {}, invocationMessage.invocationId || null, - invocationMessage.target, invocationMessage.arguments, invocationMessage.streamIds]); + let payload: any; + if (invocationMessage.streamIds) { + payload = msgpack.encode([MessageType.Invocation, invocationMessage.headers || {}, invocationMessage.invocationId || null, + invocationMessage.target, invocationMessage.arguments, invocationMessage.streamIds]); + } else { + payload = msgpack.encode([MessageType.Invocation, invocationMessage.headers || {}, invocationMessage.invocationId || null, + invocationMessage.target, invocationMessage.arguments]); + } return BinaryMessageFormat.write(payload.slice()); } private writeStreamInvocation(streamInvocationMessage: StreamInvocationMessage): ArrayBuffer { const msgpack = msgpack5(); - const payload = msgpack.encode([MessageType.StreamInvocation, streamInvocationMessage.headers || {}, streamInvocationMessage.invocationId, - streamInvocationMessage.target, streamInvocationMessage.arguments, streamInvocationMessage.streamIds]); + let payload: any; + if (streamInvocationMessage.streamIds) { + payload = msgpack.encode([MessageType.StreamInvocation, streamInvocationMessage.headers || {}, streamInvocationMessage.invocationId, + streamInvocationMessage.target, streamInvocationMessage.arguments, streamInvocationMessage.streamIds]); + } else { + payload = msgpack.encode([MessageType.StreamInvocation, streamInvocationMessage.headers || {}, streamInvocationMessage.invocationId, + streamInvocationMessage.target, streamInvocationMessage.arguments]); + } return BinaryMessageFormat.write(payload.slice()); } diff --git a/src/SignalR/clients/ts/signalr/src/HubConnection.ts b/src/SignalR/clients/ts/signalr/src/HubConnection.ts index 0435a92300f2..505631fa8d63 100644 --- a/src/SignalR/clients/ts/signalr/src/HubConnection.ts +++ b/src/SignalR/clients/ts/signalr/src/HubConnection.ts @@ -818,23 +818,40 @@ export class HubConnection { private createInvocation(methodName: string, args: any[], nonblocking: boolean, streamIds: string[]): InvocationMessage { if (nonblocking) { - return { - arguments: args, - streamIds, - target: methodName, - type: MessageType.Invocation, - }; + if (streamIds.length !== 0) { + return { + arguments: args, + streamIds, + target: methodName, + type: MessageType.Invocation, + }; + } else { + return { + arguments: args, + target: methodName, + type: MessageType.Invocation, + }; + } } else { const invocationId = this.invocationId; this.invocationId++; - return { - arguments: args, - invocationId: invocationId.toString(), - streamIds, - target: methodName, - type: MessageType.Invocation, - }; + if (streamIds.length !== 0) { + return { + arguments: args, + invocationId: invocationId.toString(), + streamIds, + target: methodName, + type: MessageType.Invocation, + }; + } else { + return { + arguments: args, + invocationId: invocationId.toString(), + target: methodName, + type: MessageType.Invocation, + }; + } } } @@ -903,13 +920,22 @@ export class HubConnection { const invocationId = this.invocationId; this.invocationId++; - return { - arguments: args, - invocationId: invocationId.toString(), - streamIds, - target: methodName, - type: MessageType.StreamInvocation, - }; + if (streamIds.length !== 0) { + return { + arguments: args, + invocationId: invocationId.toString(), + streamIds, + target: methodName, + type: MessageType.StreamInvocation, + }; + } else { + return { + arguments: args, + invocationId: invocationId.toString(), + target: methodName, + type: MessageType.StreamInvocation, + }; + } } private createCancelInvocation(id: string): CancelInvocationMessage { diff --git a/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts b/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts index 7a250dbc41b5..9ed7338b6398 100644 --- a/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts +++ b/src/SignalR/clients/ts/signalr/src/IHubProtocol.ts @@ -65,7 +65,7 @@ export interface InvocationMessage extends HubInvocationMessage { /** The target method arguments. */ readonly arguments: any[]; /** The target methods stream IDs. */ - readonly streamIds: string[]; + readonly streamIds?: string[]; } /** A hub message representing a streaming invocation. */ @@ -80,7 +80,7 @@ export interface StreamInvocationMessage extends HubInvocationMessage { /** The target method arguments. */ readonly arguments: any[]; /** The target methods stream IDs. */ - readonly streamIds: string[]; + readonly streamIds?: string[]; } /** A hub message representing a single item produced as part of a result stream. */ diff --git a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts index a31f4eeee9a7..797e0e1ea1a7 100644 --- a/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts +++ b/src/SignalR/clients/ts/signalr/tests/HubConnection.test.ts @@ -184,7 +184,6 @@ describe("HubConnection", () => { "arg", 42, ], - streamIds: [], target: "testMethod", type: MessageType.Invocation, }); @@ -213,7 +212,6 @@ describe("HubConnection", () => { "arg", null, ], - streamIds: [], target: "testMethod", type: MessageType.Invocation, }); @@ -245,7 +243,6 @@ describe("HubConnection", () => { 42, ], invocationId: connection.lastInvocationId, - streamIds: [], target: "testMethod", type: MessageType.Invocation, }); @@ -998,7 +995,6 @@ describe("HubConnection", () => { 42, ], invocationId: connection.lastInvocationId, - streamIds: [], target: "testStream", type: MessageType.StreamInvocation, }); diff --git a/src/SignalR/clients/ts/signalr/tests/MessageSize.test.ts b/src/SignalR/clients/ts/signalr/tests/MessageSize.test.ts new file mode 100644 index 000000000000..1d06879acf7f --- /dev/null +++ b/src/SignalR/clients/ts/signalr/tests/MessageSize.test.ts @@ -0,0 +1,175 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +import { HubConnection } from "../src/HubConnection"; +import { IConnection } from "../src/IConnection"; +import { IHubProtocol, MessageType } from "../src/IHubProtocol"; +import { ILogger } from "../src/ILogger"; +import { JsonHubProtocol } from "../src/JsonHubProtocol"; +import { NullLogger } from "../src/Loggers"; +import { Subject } from "../src/Subject"; +import { VerifyLogger } from "./Common"; +import { TestConnection } from "./TestConnection"; +import { delayUntil, registerUnhandledRejectionHandler } from "./Utils"; + +registerUnhandledRejectionHandler(); + +function createHubConnection(connection: IConnection, logger?: ILogger | null, protocol?: IHubProtocol | null) { + return HubConnection.create(connection, logger || NullLogger.instance, protocol || new JsonHubProtocol()); +} + +// These tests check that the message size doesn't change without us being aware of it and making a conscious decision to increase the size + +describe("Message size", () => { + it("send invocation", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + + const hubConnection = createHubConnection(connection, logger); + try { + // We don't actually care to wait for the send. + // tslint:disable-next-line:no-floating-promises + hubConnection.send("target", 1) + .catch((_) => { }); // Suppress exception and unhandled promise rejection warning. + + // Verify the message is sent + expect(connection.sentData.length).toBe(1); + expect(connection.parsedSentData[0].type).toEqual(MessageType.Invocation); + expect((connection.sentData[0] as string).length).toEqual(44); + } finally { + // Close the connection + await hubConnection.stop(); + } + }); + }); + + it("invoke invocation", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + + const hubConnection = createHubConnection(connection, logger); + try { + // We don't actually care to wait for the invoke. + // tslint:disable-next-line:no-floating-promises + hubConnection.invoke("target", 1) + .catch((_) => { }); // Suppress exception and unhandled promise rejection warning. + + // Verify the message is sent + expect(connection.sentData.length).toBe(1); + expect(connection.parsedSentData[0].type).toEqual(MessageType.Invocation); + expect((connection.sentData[0] as string).length).toEqual(63); + } finally { + // Close the connection + await hubConnection.stop(); + } + }); + }); + + it("stream invocation", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + + const hubConnection = createHubConnection(connection, logger); + try { + hubConnection.stream("target", 1); + + // Verify the message is sent + expect(connection.sentData.length).toBe(1); + expect(connection.parsedSentData[0].type).toEqual(MessageType.StreamInvocation); + expect((connection.sentData[0] as string).length).toEqual(63); + } finally { + // Close the connection + await hubConnection.stop(); + } + }); + }); + + it("upload invocation", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + + const hubConnection = createHubConnection(connection, logger); + try { + // We don't actually care to wait for the invoke. + // tslint:disable-next-line:no-floating-promises + hubConnection.invoke("target", 1, new Subject()) + .catch((_) => { }); // Suppress exception and unhandled promise rejection warning. + + // Verify the message is sent + expect(connection.sentData.length).toBe(1); + expect(connection.parsedSentData[0].type).toEqual(MessageType.Invocation); + expect((connection.sentData[0] as string).length).toEqual(81); + } finally { + // Close the connection + await hubConnection.stop(); + } + }); + }); + + it("upload stream invocation", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + + const hubConnection = createHubConnection(connection, logger); + try { + hubConnection.stream("target", 1, new Subject()); + + // Verify the message is sent + expect(connection.sentData.length).toBe(1); + expect(connection.parsedSentData[0].type).toEqual(MessageType.StreamInvocation); + expect((connection.sentData[0] as string).length).toEqual(81); + } finally { + // Close the connection + await hubConnection.stop(); + } + }); + }); + + it("completion message", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + + const hubConnection = createHubConnection(connection, logger); + try { + const subject = new Subject(); + hubConnection.stream("target", 1, subject); + subject.complete(); + + await delayUntil(1000, () => connection.sentData.length === 2); + + // Verify the message is sent + expect(connection.sentData.length).toBe(2); + expect(connection.parsedSentData[1].type).toEqual(MessageType.Completion); + expect((connection.sentData[1] as string).length).toEqual(29); + } finally { + // Close the connection + await hubConnection.stop(); + } + }); + }); + + it("cancel message", async () => { + await VerifyLogger.run(async (logger) => { + const connection = new TestConnection(); + + const hubConnection = createHubConnection(connection, logger); + try { + hubConnection.stream("target", 1).subscribe({ + complete: () => {}, + error: () => {}, + next: () => {}, + }).dispose(); + + await delayUntil(1000, () => connection.sentData.length === 2); + + // Verify the message is sent + expect(connection.sentData.length).toBe(2); + expect(connection.parsedSentData[1].type).toEqual(MessageType.CancelInvocation); + expect((connection.sentData[1] as string).length).toEqual(29); + } finally { + // Close the connection + await hubConnection.stop(); + } + }); + }); +}); diff --git a/src/SignalR/clients/ts/signalr/tests/TestConnection.ts b/src/SignalR/clients/ts/signalr/tests/TestConnection.ts index e4f35281ebc4..0506d76ea925 100644 --- a/src/SignalR/clients/ts/signalr/tests/TestConnection.ts +++ b/src/SignalR/clients/ts/signalr/tests/TestConnection.ts @@ -13,6 +13,7 @@ export class TestConnection implements IConnection { public onclose: ((error?: Error) => void) | null; public sentData: any[]; + public parsedSentData: any[]; public lastInvocationId: string | null; private autoHandshake: boolean | null; @@ -21,6 +22,7 @@ export class TestConnection implements IConnection { this.onreceive = null; this.onclose = null; this.sentData = []; + this.parsedSentData = []; this.lastInvocationId = null; this.autoHandshake = autoHandshake; this.baseUrl = "http://example.com"; @@ -43,8 +45,10 @@ export class TestConnection implements IConnection { } if (this.sentData) { this.sentData.push(invocation); + this.parsedSentData.push(parsedInvocation); } else { this.sentData = [invocation]; + this.parsedSentData = [parsedInvocation]; } return Promise.resolve(); } diff --git a/src/SignalR/common/SignalR.Common/test/Internal/Protocol/JsonHubProtocolTestsBase.cs b/src/SignalR/common/SignalR.Common/test/Internal/Protocol/JsonHubProtocolTestsBase.cs index 0465959d1a7c..a38ad6e3e7b6 100644 --- a/src/SignalR/common/SignalR.Common/test/Internal/Protocol/JsonHubProtocolTestsBase.cs +++ b/src/SignalR/common/SignalR.Common/test/Internal/Protocol/JsonHubProtocolTestsBase.cs @@ -342,6 +342,7 @@ public void ReadCaseInsensitivePropertiesByDefault() [Theory] [MemberData(nameof(MessageSizeDataNames))] + // These tests check that the message size doesn't change without us being aware of it and making a conscious decision to increase the size public void VerifyMessageSize(string testDataName) { var testData = MessageSizeData[testDataName]; diff --git a/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTestBase.cs b/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTestBase.cs index 34d97e10f3a8..0192641dadc0 100644 --- a/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTestBase.cs +++ b/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTestBase.cs @@ -375,6 +375,57 @@ public void ParserDoesNotConsumePartialData(byte[] payload) Assert.Null(message); } + public static IDictionary MessageSizeData => new[] + { + new MessageSizeTestData("InvocationMessage_WithoutInvocationId", new InvocationMessage("Target", new object[] { 1 }), 15), + new MessageSizeTestData("InvocationMessage_WithInvocationId", new InvocationMessage("1", "Target", new object[] { 1 }), 16), + new MessageSizeTestData("InvocationMessage_WithInvocationIdAndStreamId", new InvocationMessage("1", "Target", new object[] { 1 }, new string[] { "2" }), 18), + + new MessageSizeTestData("CloseMessage_Empty", CloseMessage.Empty, 5), + new MessageSizeTestData("CloseMessage_WithError", new CloseMessage("error"), 10), + + new MessageSizeTestData("StreamItemMessage_WithNullItem", new StreamItemMessage("1", null), 7), + new MessageSizeTestData("StreamItemMessage_WithItem", new StreamItemMessage("1", 1), 7), + + new MessageSizeTestData("CompletionMessage_Empty", CompletionMessage.Empty("1"), 7), + new MessageSizeTestData("CompletionMessage_WithResult", CompletionMessage.WithResult("1", 1), 8), + new MessageSizeTestData("CompletionMessage_WithError", CompletionMessage.WithError("1", "error"), 13), + + new MessageSizeTestData("StreamInvocationMessage", new StreamInvocationMessage("1", "target", Array.Empty()), 15), + new MessageSizeTestData("StreamInvocationMessage_WithStreamId", new StreamInvocationMessage("1", "target", Array.Empty(), new [] { "2" }), 17), + + new MessageSizeTestData("CancelInvocationMessage", new CancelInvocationMessage("1"), 6), + + new MessageSizeTestData("PingMessage", PingMessage.Instance, 3), + }.ToDictionary(t => t.Name); + + public static IEnumerable MessageSizeDataNames => MessageSizeData.Keys.Select(name => new object[] { name }); + + [Theory] + [MemberData(nameof(MessageSizeDataNames))] + // These tests check that the message size doesn't change without us being aware of it and making a conscious decision to increase the size + public void VerifyMessageSize(string testDataName) + { + var testData = MessageSizeData[testDataName]; + Assert.Equal(testData.Size, Write(testData.Message).Length); + } + + public class MessageSizeTestData + { + public string Name { get; } + public HubMessage Message { get; } + public int Size { get; } + + public MessageSizeTestData(string name, HubMessage message, int size) + { + Name = name; + Message = message; + Size = size; + } + + public override string ToString() => Name; + } + protected byte ArrayBytes(int size) { Debug.Assert(size < 16, "Test code doesn't support array sizes greater than 15"); diff --git a/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTests.cs b/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTests.cs index 2355228bd9c0..cf193576d77d 100644 --- a/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTests.cs +++ b/src/SignalR/common/SignalR.Common/test/Internal/Protocol/MessagePackHubProtocolTests.cs @@ -203,55 +203,5 @@ public void WriteMessages(string testDataName) TestWriteMessages(testData); } - - public static IDictionary MessageSizeData => new[] - { - new MessageSizeTestData("InvocationMessage_WithoutInvocationId", new InvocationMessage("Target", new object[] { 1 }), 15), - new MessageSizeTestData("InvocationMessage_WithInvocationId", new InvocationMessage("1", "Target", new object[] { 1 }), 16), - new MessageSizeTestData("InvocationMessage_WithInvocationIdAndStreamId", new InvocationMessage("1", "Target", new object[] { 1 }, new string[] { "2" }), 18), - - new MessageSizeTestData("CloseMessage_Empty", CloseMessage.Empty, 5), - new MessageSizeTestData("CloseMessage_WithError", new CloseMessage("error"), 10), - - new MessageSizeTestData("StreamItemMessage_WithNullItem", new StreamItemMessage("1", null), 7), - new MessageSizeTestData("StreamItemMessage_WithItem", new StreamItemMessage("1", 1), 7), - - new MessageSizeTestData("CompletionMessage_Empty", CompletionMessage.Empty("1"), 7), - new MessageSizeTestData("CompletionMessage_WithResult", CompletionMessage.WithResult("1", 1), 8), - new MessageSizeTestData("CompletionMessage_WithError", CompletionMessage.WithError("1", "error"), 13), - - new MessageSizeTestData("StreamInvocationMessage", new StreamInvocationMessage("1", "target", Array.Empty()), 15), - new MessageSizeTestData("StreamInvocationMessage_WithStreamId", new StreamInvocationMessage("1", "target", Array.Empty(), new [] { "2" }), 17), - - new MessageSizeTestData("CancelInvocationMessage", new CancelInvocationMessage("1"), 6), - - new MessageSizeTestData("PingMessage", PingMessage.Instance, 3), - }.ToDictionary(t => t.Name); - - public static IEnumerable MessageSizeDataNames => MessageSizeData.Keys.Select(name => new object[] { name }); - - [Theory] - [MemberData(nameof(MessageSizeDataNames))] - public void VerifyMessageSize(string testDataName) - { - var testData = MessageSizeData[testDataName]; - Assert.Equal(testData.Size, Write(testData.Message).Length); - } - - public class MessageSizeTestData - { - public string Name { get; } - public HubMessage Message { get; } - public int Size { get; } - - public MessageSizeTestData(string name, HubMessage message, int size) - { - Name = name; - Message = message; - Size = size; - } - - public override string ToString() => Name; - } } }