diff --git a/modules/protocol/src/errors.ts b/modules/protocol/src/errors.ts index 078714fd8..7a9dc435f 100644 --- a/modules/protocol/src/errors.ts +++ b/modules/protocol/src/errors.ts @@ -58,6 +58,7 @@ export class ValidationError extends ProtocolError { InvalidChannelAddress: "Provided channel address is invalid", InvalidCounterparty: "Channel counterparty is invalid", InvalidInitialState: "Initial transfer state is invalid", + InvalidProtocolVersion: "Protocol version is invalid", InvalidResolver: "Transfer resolver must be an object", LongChannelTimeout: `Channel timeout above maximum of ${MAXIMUM_CHANNEL_TIMEOUT.toString()}s`, OnlyResponderCanInitiateResolve: "Only transfer responder may initiate resolve update", diff --git a/modules/protocol/src/sync.ts b/modules/protocol/src/sync.ts index 7abe0804d..c93e5997b 100644 --- a/modules/protocol/src/sync.ts +++ b/modules/protocol/src/sync.ts @@ -12,6 +12,7 @@ import { IExternalValidation, MessagingError, jsonifyError, + PROTOCOL_VERSION, } from "@connext/vector-types"; import { getRandomBytes32 } from "@connext/vector-utils"; import pino from "pino"; @@ -96,6 +97,7 @@ export async function outbound( // Send and wait for response logger.debug({ method, methodId, to: update.toIdentifier, type: update.type }, "Sending protocol message"); let counterpartyResult = await messagingService.sendProtocolMessage( + PROTOCOL_VERSION, update, previousState?.latestUpdate, // LOCK_TTL / 10, diff --git a/modules/protocol/src/vector.ts b/modules/protocol/src/vector.ts index 4c81c2b6c..8cdf59888 100644 --- a/modules/protocol/src/vector.ts +++ b/modules/protocol/src/vector.ts @@ -20,6 +20,7 @@ import { jsonifyError, Values, UpdateIdentifier, + PROTOCOL_VERSION, } from "@connext/vector-types"; import { v4 as uuidV4 } from "uuid"; import { @@ -32,7 +33,7 @@ import { import { Evt } from "evt"; import pino from "pino"; -import { QueuedUpdateError, RestoreError } from "./errors"; +import { QueuedUpdateError, RestoreError, ValidationError } from "./errors"; import { Cancellable, OtherUpdate, SelfUpdate, SerializedQueue } from "./queue"; import { outbound, inbound, OtherUpdateResult, SelfUpdateResult } from "./sync"; import { @@ -448,6 +449,7 @@ export class Vector implements IVectorProtocol { } await this.messagingService.respondToProtocolMessage( received.inbox, + PROTOCOL_VERSION, updatedChannel.latestUpdate, (channelState as FullChannelState | undefined)?.latestUpdate, ); @@ -558,6 +560,28 @@ export class Vector implements IVectorProtocol { } private async setupServices(): Promise { + // TODO: REMOVE THIS! + await this.messagingService.onReceiveLockMessage( + this.publicIdentifier, + async (lockInfo: Result, from: string, inbox: string) => { + if (from === this.publicIdentifier) { + return; + } + const method = "onReceiveProtocolMessage"; + const methodId = getRandomBytes32(); + + this.logger.error({ method, methodId }, "Counterparty using incompatible version"); + await this.messagingService.respondToLockMessage( + inbox, + Result.fail( + new ValidationError(ValidationError.reasons.InvalidProtocolVersion, {} as any, undefined, { + compatible: PROTOCOL_VERSION, + }), + ), + ); + }, + ); + // response to incoming message where we are not the leader // steps: // - validate and save state @@ -566,7 +590,7 @@ export class Vector implements IVectorProtocol { await this.messagingService.onReceiveProtocolMessage( this.publicIdentifier, async ( - msg: Result<{ update: ChannelUpdate; previousUpdate: ChannelUpdate }, ProtocolError>, + msg: Result<{ update: ChannelUpdate; previousUpdate: ChannelUpdate; protocolVersion: string }, ProtocolError>, from: string, inbox: string, ) => { @@ -587,9 +611,24 @@ export class Vector implements IVectorProtocol { const received = msg.getValue(); + // Check the protocol version is compatible + const theirVersion = (received.protocolVersion ?? "0.0.0").split("."); + const ourVersion = PROTOCOL_VERSION.split("."); + if (theirVersion[0] !== ourVersion[0] || theirVersion[1] !== ourVersion[1]) { + this.logger.error({ method, methodId, theirVersion, ourVersion }, "Counterparty using incompatible version"); + await this.messagingService.respondWithProtocolError( + inbox, + new ValidationError(ValidationError.reasons.InvalidProtocolVersion, received.update, undefined, { + responderVersion: ourVersion, + initiatorVersion: theirVersion, + }), + ); + return; + } + // Verify that the message has the correct structure const keys = Object.keys(received); - if (!keys.includes("update") || !keys.includes("previousUpdate")) { + if (!keys.includes("update") || !keys.includes("previousUpdate") || !keys.includes("protocolVersion")) { this.logger.warn({ method, methodId, received: Object.keys(received) }, "Message malformed"); return; } diff --git a/modules/server-node/src/services/messaging.spec.ts b/modules/server-node/src/services/messaging.spec.ts index 037fe0ef1..d085c0c20 100644 --- a/modules/server-node/src/services/messaging.spec.ts +++ b/modules/server-node/src/services/messaging.spec.ts @@ -1,4 +1,12 @@ -import { IChannelSigner, Result, jsonifyError, MessagingError, UpdateType, VectorError } from "@connext/vector-types"; +import { + IChannelSigner, + Result, + jsonifyError, + MessagingError, + UpdateType, + VectorError, + PROTOCOL_VERSION, +} from "@connext/vector-types"; import { createTestChannelUpdate, delay, @@ -12,7 +20,6 @@ import { import pino from "pino"; import { config } from "../config"; -import { ServerNodeLockError } from "../helpers/errors"; describe("messaging", () => { const { log: logger } = getTestLoggers("messaging", (config.logLevel ?? "fatal") as pino.Level); @@ -57,13 +64,13 @@ describe("messaging", () => { expect(result.isError).to.not.be.ok; expect(result.getValue()).to.containSubset({ update }); expect(inbox).to.be.a("string"); - await messagingB.respondToProtocolMessage(inbox, update); + await messagingB.respondToProtocolMessage(inbox, PROTOCOL_VERSION, update); }, ); await delay(1_000); - const res = await messagingA.sendProtocolMessage(update); + const res = await messagingA.sendProtocolMessage(PROTOCOL_VERSION, update); expect(res.isError).to.not.be.ok; expect(res.getValue()).to.containSubset({ update }); }); @@ -88,7 +95,7 @@ describe("messaging", () => { await delay(1_000); - const res = await messagingA.sendProtocolMessage(update); + const res = await messagingA.sendProtocolMessage(PROTOCOL_VERSION, update); expect(res.isError).to.be.true; const errReceived = res.getError()!; const expected = VectorError.fromJson(jsonifyError(err)); diff --git a/modules/types/src/index.ts b/modules/types/src/index.ts index c03222325..a9598e6bc 100644 --- a/modules/types/src/index.ts +++ b/modules/types/src/index.ts @@ -19,3 +19,4 @@ export * from "./store"; export * from "./transferDefinitions"; export * from "./utils"; export * from "./vectorProvider"; +export * from "./version"; diff --git a/modules/types/src/messaging.ts b/modules/types/src/messaging.ts index f45b6b28d..506f9a69f 100644 --- a/modules/types/src/messaging.ts +++ b/modules/types/src/messaging.ts @@ -1,5 +1,5 @@ import { ChannelUpdate, FullChannelState, FullTransferState } from "./channel"; -import { ConditionalTransferCreatedPayload, ConditionalTransferRoutingCompletePayload } from "./engine"; +import { ConditionalTransferRoutingCompletePayload } from "./engine"; import { EngineError, NodeError, MessagingError, ProtocolError, Result, RouterError, VectorError } from "./error"; import { EngineParams, NodeResponses } from "./schemas"; @@ -27,12 +27,16 @@ export interface IMessagingService extends IBasicMessaging { onReceiveProtocolMessage( myPublicIdentifier: string, callback: ( - result: Result<{ update: ChannelUpdate; previousUpdate: ChannelUpdate }, ProtocolError>, + result: Result< + { update: ChannelUpdate; previousUpdate: ChannelUpdate; protocolVersion: string }, + ProtocolError + >, from: string, inbox: string, ) => void, ): Promise; sendProtocolMessage( + protocolVersion: string, channelUpdate: ChannelUpdate, previousUpdate?: ChannelUpdate, timeout?: number, @@ -42,11 +46,20 @@ export interface IMessagingService extends IBasicMessaging { >; respondToProtocolMessage( inbox: string, + protocolVersion: string, channelUpdate: ChannelUpdate, previousUpdate?: ChannelUpdate, ): Promise; respondWithProtocolError(inbox: string, error: ProtocolError): Promise; + // TODO: remove these! + onReceiveLockMessage( + publicIdentifier: string, + callback: (lockInfo: Result, from: string, inbox: string) => void, + ): Promise; + + respondToLockMessage(inbox: string, lockInformation: Result): Promise; + sendSetupMessage( setupInfo: Result, EngineError>, to: string, diff --git a/modules/types/src/version.ts b/modules/types/src/version.ts new file mode 100644 index 000000000..add59a974 --- /dev/null +++ b/modules/types/src/version.ts @@ -0,0 +1 @@ +export const PROTOCOL_VERSION = "0.3.0-dev.0"; diff --git a/modules/utils/src/messaging.ts b/modules/utils/src/messaging.ts index ca7332748..e688f5a43 100644 --- a/modules/utils/src/messaging.ts +++ b/modules/utils/src/messaging.ts @@ -334,13 +334,14 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I // PROTOCOL METHODS async sendProtocolMessage( + protocolVersion: string, channelUpdate: ChannelUpdate, previousUpdate?: ChannelUpdate, timeout = 60_000, numRetries = 0, ): Promise; previousUpdate: ChannelUpdate }, ProtocolError>> { return this.sendMessageWithRetries( - Result.ok({ update: channelUpdate, previousUpdate }), + Result.ok({ update: channelUpdate, previousUpdate, protocolVersion }), "protocol", channelUpdate.toIdentifier, channelUpdate.fromIdentifier, @@ -353,7 +354,10 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I async onReceiveProtocolMessage( myPublicIdentifier: string, callback: ( - result: Result<{ update: ChannelUpdate; previousUpdate: ChannelUpdate }, ProtocolError>, + result: Result< + { update: ChannelUpdate; previousUpdate: ChannelUpdate; protocolVersion: string }, + ProtocolError + >, from: string, inbox: string, ) => void, @@ -363,12 +367,13 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I async respondToProtocolMessage( inbox: string, + protocolVersion: string, channelUpdate: ChannelUpdate, previousUpdate?: ChannelUpdate, ): Promise { return this.respondToMessage( inbox, - Result.ok({ update: channelUpdate, previousUpdate }), + Result.ok({ update: channelUpdate, previousUpdate, protocolVersion }), "respondToProtocolMessage", ); } @@ -378,6 +383,21 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I } //////////// + // LOCK MESSAGE + // TODO: remove these! + async onReceiveLockMessage( + publicIdentifier: string, + callback: (lockInfo: Result, from: string, inbox: string) => void, + ): Promise { + return this.registerCallback(`${publicIdentifier}.*.lock`, callback, "onReceiveLockMessage"); + } + + async respondToLockMessage(inbox: string, lockInformation: Result): Promise { + return this.respondToMessage(inbox, lockInformation, "respondToLockMessage"); + } + + //////////// + // RESTORE METHODS async sendRestoreStateMessage( restoreData: Result<{ chainId: number }, EngineError>, diff --git a/modules/utils/src/test/services/messaging.ts b/modules/utils/src/test/services/messaging.ts index 64ac992de..d4ef75d94 100644 --- a/modules/utils/src/test/services/messaging.ts +++ b/modules/utils/src/test/services/messaging.ts @@ -25,6 +25,7 @@ export class MemoryMessagingService implements IMessagingService { inbox?: string; replyTo?: string; data: { + protocolVersion?: string; update?: ChannelUpdate; previousUpdate?: ChannelUpdate; error?: ProtocolError; @@ -33,7 +34,12 @@ export class MemoryMessagingService implements IMessagingService { to?: string; from: string; inbox?: string; - data: { update?: ChannelUpdate; previousUpdate?: ChannelUpdate; error?: ProtocolError }; + data: { + update?: ChannelUpdate; + previousUpdate?: ChannelUpdate; + error?: ProtocolError; + protocolVersion?: string; + }; replyTo?: string; }>(); @@ -67,7 +73,20 @@ export class MemoryMessagingService implements IMessagingService { this.protocolEvt.detach(); } + // TODO: remove these! + async onReceiveLockMessage( + publicIdentifier: string, + callback: (lockInfo: Result, from: string, inbox: string) => void, + ): Promise { + console.warn("Method to be deprecated"); + } + + async respondToLockMessage(inbox: string, lockInformation: Result): Promise { + console.warn("Method to be deprecated"); + } + async sendProtocolMessage( + protocolVersion: string, channelUpdate: ChannelUpdate, previousUpdate?: ChannelUpdate, timeout = 20_000, @@ -79,7 +98,7 @@ export class MemoryMessagingService implements IMessagingService { to: channelUpdate.toIdentifier, from: channelUpdate.fromIdentifier, replyTo: inbox, - data: { update: channelUpdate, previousUpdate }, + data: { update: channelUpdate, previousUpdate, protocolVersion }, }); const res = await responsePromise; if (res.data.error) { @@ -90,12 +109,13 @@ export class MemoryMessagingService implements IMessagingService { async respondToProtocolMessage( inbox: string, + protocolVersion: string, channelUpdate: ChannelUpdate, previousUpdate?: ChannelUpdate, ): Promise { this.protocolEvt.post({ inbox, - data: { update: channelUpdate, previousUpdate }, + data: { update: channelUpdate, previousUpdate, protocolVersion }, from: channelUpdate.toIdentifier, }); } @@ -111,7 +131,10 @@ export class MemoryMessagingService implements IMessagingService { async onReceiveProtocolMessage( myPublicIdentifier: string, callback: ( - result: Result<{ update: ChannelUpdate; previousUpdate: ChannelUpdate }, ProtocolError>, + result: Result< + { update: ChannelUpdate; previousUpdate: ChannelUpdate; protocolVersion: string }, + ProtocolError + >, from: string, inbox: string, ) => void, @@ -123,6 +146,7 @@ export class MemoryMessagingService implements IMessagingService { Result.ok({ previousUpdate: data.previousUpdate!, update: data.update!, + protocolVersion: data.protocolVersion!, }), from, replyTo!, diff --git a/ops/npm-publish.sh b/ops/npm-publish.sh index e3e021ec8..a595b4821 100644 --- a/ops/npm-publish.sh +++ b/ops/npm-publish.sh @@ -24,8 +24,6 @@ if [[ ! "$(pwd | sed 's|.*/\(.*\)|\1|')" =~ $project ]] then echo "Aborting: Make sure you're in the $project project root" && exit 1 fi -make all - echo "Did you update the changelog.md before publishing (y/n)?" read -p "> " -r echo @@ -91,6 +89,8 @@ fi ( # () designates a subshell so we don't have to cd back to where we started afterwards echo "Let's go" + echo "export const PROTOCOL_VERSION='${target_version}'" > "${root}/modules/types/src/version.ts" + make all cd modules for package in $package_names