Skip to content
This repository was archived by the owner on Dec 27, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/protocol/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export class ValidationError extends ProtocolError {
InvalidChannelAddress: "Provided channel address is invalid",
InvalidCounterparty: "Channel counterparty is invalid",
InvalidInitialState: "Initial transfer state is invalid",
InvalidProtocolVersion: "Protocol version is invalid",
InvalidResolver: "Transfer resolver must be an object",
LongChannelTimeout: `Channel timeout above maximum of ${MAXIMUM_CHANNEL_TIMEOUT.toString()}s`,
OnlyResponderCanInitiateResolve: "Only transfer responder may initiate resolve update",
Expand Down
2 changes: 2 additions & 0 deletions modules/protocol/src/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
IExternalValidation,
MessagingError,
jsonifyError,
PROTOCOL_VERSION,
} from "@connext/vector-types";
import { getRandomBytes32 } from "@connext/vector-utils";
import pino from "pino";
Expand Down Expand Up @@ -96,6 +97,7 @@ export async function outbound(
// Send and wait for response
logger.debug({ method, methodId, to: update.toIdentifier, type: update.type }, "Sending protocol message");
let counterpartyResult = await messagingService.sendProtocolMessage(
PROTOCOL_VERSION,
update,
previousState?.latestUpdate,
// LOCK_TTL / 10,
Expand Down
45 changes: 42 additions & 3 deletions modules/protocol/src/vector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
jsonifyError,
Values,
UpdateIdentifier,
PROTOCOL_VERSION,
} from "@connext/vector-types";
import { v4 as uuidV4 } from "uuid";
import {
Expand All @@ -32,7 +33,7 @@ import {
import { Evt } from "evt";
import pino from "pino";

import { QueuedUpdateError, RestoreError } from "./errors";
import { QueuedUpdateError, RestoreError, ValidationError } from "./errors";
import { Cancellable, OtherUpdate, SelfUpdate, SerializedQueue } from "./queue";
import { outbound, inbound, OtherUpdateResult, SelfUpdateResult } from "./sync";
import {
Expand Down Expand Up @@ -448,6 +449,7 @@ export class Vector implements IVectorProtocol {
}
await this.messagingService.respondToProtocolMessage(
received.inbox,
PROTOCOL_VERSION,
updatedChannel.latestUpdate,
(channelState as FullChannelState | undefined)?.latestUpdate,
);
Expand Down Expand Up @@ -558,6 +560,28 @@ export class Vector implements IVectorProtocol {
}

private async setupServices(): Promise<Vector> {
// TODO: REMOVE THIS!
await this.messagingService.onReceiveLockMessage(
this.publicIdentifier,
async (lockInfo: Result<any>, from: string, inbox: string) => {
if (from === this.publicIdentifier) {
return;
}
const method = "onReceiveProtocolMessage";
const methodId = getRandomBytes32();

this.logger.error({ method, methodId }, "Counterparty using incompatible version");
await this.messagingService.respondToLockMessage(
inbox,
Result.fail(
new ValidationError(ValidationError.reasons.InvalidProtocolVersion, {} as any, undefined, {
compatible: PROTOCOL_VERSION,
}),
),
);
},
);

// response to incoming message where we are not the leader
// steps:
// - validate and save state
Expand All @@ -566,7 +590,7 @@ export class Vector implements IVectorProtocol {
await this.messagingService.onReceiveProtocolMessage(
this.publicIdentifier,
async (
msg: Result<{ update: ChannelUpdate; previousUpdate: ChannelUpdate }, ProtocolError>,
msg: Result<{ update: ChannelUpdate; previousUpdate: ChannelUpdate; protocolVersion: string }, ProtocolError>,
from: string,
inbox: string,
) => {
Expand All @@ -587,9 +611,24 @@ export class Vector implements IVectorProtocol {

const received = msg.getValue();

// Check the protocol version is compatible
const theirVersion = (received.protocolVersion ?? "0.0.0").split(".");
const ourVersion = PROTOCOL_VERSION.split(".");
if (theirVersion[0] !== ourVersion[0] || theirVersion[1] !== ourVersion[1]) {
this.logger.error({ method, methodId, theirVersion, ourVersion }, "Counterparty using incompatible version");
await this.messagingService.respondWithProtocolError(
inbox,
new ValidationError(ValidationError.reasons.InvalidProtocolVersion, received.update, undefined, {
responderVersion: ourVersion,
initiatorVersion: theirVersion,
}),
);
return;
}

// Verify that the message has the correct structure
const keys = Object.keys(received);
if (!keys.includes("update") || !keys.includes("previousUpdate")) {
if (!keys.includes("update") || !keys.includes("previousUpdate") || !keys.includes("protocolVersion")) {
this.logger.warn({ method, methodId, received: Object.keys(received) }, "Message malformed");
return;
}
Expand Down
17 changes: 12 additions & 5 deletions modules/server-node/src/services/messaging.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { IChannelSigner, Result, jsonifyError, MessagingError, UpdateType, VectorError } from "@connext/vector-types";
import {
IChannelSigner,
Result,
jsonifyError,
MessagingError,
UpdateType,
VectorError,
PROTOCOL_VERSION,
} from "@connext/vector-types";
import {
createTestChannelUpdate,
delay,
Expand All @@ -12,7 +20,6 @@ import {
import pino from "pino";

import { config } from "../config";
import { ServerNodeLockError } from "../helpers/errors";

describe("messaging", () => {
const { log: logger } = getTestLoggers("messaging", (config.logLevel ?? "fatal") as pino.Level);
Expand Down Expand Up @@ -57,13 +64,13 @@ describe("messaging", () => {
expect(result.isError).to.not.be.ok;
expect(result.getValue()).to.containSubset({ update });
expect(inbox).to.be.a("string");
await messagingB.respondToProtocolMessage(inbox, update);
await messagingB.respondToProtocolMessage(inbox, PROTOCOL_VERSION, update);
},
);

await delay(1_000);

const res = await messagingA.sendProtocolMessage(update);
const res = await messagingA.sendProtocolMessage(PROTOCOL_VERSION, update);
expect(res.isError).to.not.be.ok;
expect(res.getValue()).to.containSubset({ update });
});
Expand All @@ -88,7 +95,7 @@ describe("messaging", () => {

await delay(1_000);

const res = await messagingA.sendProtocolMessage(update);
const res = await messagingA.sendProtocolMessage(PROTOCOL_VERSION, update);
expect(res.isError).to.be.true;
const errReceived = res.getError()!;
const expected = VectorError.fromJson(jsonifyError(err));
Expand Down
1 change: 1 addition & 0 deletions modules/types/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ export * from "./store";
export * from "./transferDefinitions";
export * from "./utils";
export * from "./vectorProvider";
export * from "./version";
17 changes: 15 additions & 2 deletions modules/types/src/messaging.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ChannelUpdate, FullChannelState, FullTransferState } from "./channel";
import { ConditionalTransferCreatedPayload, ConditionalTransferRoutingCompletePayload } from "./engine";
import { ConditionalTransferRoutingCompletePayload } from "./engine";
import { EngineError, NodeError, MessagingError, ProtocolError, Result, RouterError, VectorError } from "./error";
import { EngineParams, NodeResponses } from "./schemas";

Expand Down Expand Up @@ -27,12 +27,16 @@ export interface IMessagingService extends IBasicMessaging {
onReceiveProtocolMessage(
myPublicIdentifier: string,
callback: (
result: Result<{ update: ChannelUpdate<any>; previousUpdate: ChannelUpdate<any> }, ProtocolError>,
result: Result<
{ update: ChannelUpdate<any>; previousUpdate: ChannelUpdate<any>; protocolVersion: string },
ProtocolError
>,
from: string,
inbox: string,
) => void,
): Promise<void>;
sendProtocolMessage(
protocolVersion: string,
channelUpdate: ChannelUpdate<any>,
previousUpdate?: ChannelUpdate<any>,
timeout?: number,
Expand All @@ -42,11 +46,20 @@ export interface IMessagingService extends IBasicMessaging {
>;
respondToProtocolMessage(
inbox: string,
protocolVersion: string,
channelUpdate: ChannelUpdate<any>,
previousUpdate?: ChannelUpdate<any>,
): Promise<void>;
respondWithProtocolError(inbox: string, error: ProtocolError): Promise<void>;

// TODO: remove these!
onReceiveLockMessage(
publicIdentifier: string,
callback: (lockInfo: Result<any, NodeError>, from: string, inbox: string) => void,
): Promise<void>;

respondToLockMessage(inbox: string, lockInformation: Result<any, NodeError>): Promise<void>;

sendSetupMessage(
setupInfo: Result<Omit<EngineParams.Setup, "counterpartyIdentifier">, EngineError>,
to: string,
Expand Down
1 change: 1 addition & 0 deletions modules/types/src/version.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const PROTOCOL_VERSION = "0.3.0-dev.0";
26 changes: 23 additions & 3 deletions modules/utils/src/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,14 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I

// PROTOCOL METHODS
async sendProtocolMessage(
protocolVersion: string,
channelUpdate: ChannelUpdate<any>,
previousUpdate?: ChannelUpdate<any>,
timeout = 60_000,
numRetries = 0,
): Promise<Result<{ update: ChannelUpdate<any>; previousUpdate: ChannelUpdate<any> }, ProtocolError>> {
return this.sendMessageWithRetries(
Result.ok({ update: channelUpdate, previousUpdate }),
Result.ok({ update: channelUpdate, previousUpdate, protocolVersion }),
"protocol",
channelUpdate.toIdentifier,
channelUpdate.fromIdentifier,
Expand All @@ -353,7 +354,10 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I
async onReceiveProtocolMessage(
myPublicIdentifier: string,
callback: (
result: Result<{ update: ChannelUpdate<any>; previousUpdate: ChannelUpdate<any> }, ProtocolError>,
result: Result<
{ update: ChannelUpdate<any>; previousUpdate: ChannelUpdate<any>; protocolVersion: string },
ProtocolError
>,
from: string,
inbox: string,
) => void,
Expand All @@ -363,12 +367,13 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I

async respondToProtocolMessage(
inbox: string,
protocolVersion: string,
channelUpdate: ChannelUpdate<any>,
previousUpdate?: ChannelUpdate<any>,
): Promise<void> {
return this.respondToMessage(
inbox,
Result.ok({ update: channelUpdate, previousUpdate }),
Result.ok({ update: channelUpdate, previousUpdate, protocolVersion }),
"respondToProtocolMessage",
);
}
Expand All @@ -378,6 +383,21 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I
}
////////////

// LOCK MESSAGE
// TODO: remove these!
async onReceiveLockMessage(
publicIdentifier: string,
callback: (lockInfo: Result<any, NodeError>, from: string, inbox: string) => void,
): Promise<void> {
return this.registerCallback(`${publicIdentifier}.*.lock`, callback, "onReceiveLockMessage");
}

async respondToLockMessage(inbox: string, lockInformation: Result<any, NodeError>): Promise<void> {
return this.respondToMessage(inbox, lockInformation, "respondToLockMessage");
}

////////////

// RESTORE METHODS
async sendRestoreStateMessage(
restoreData: Result<{ chainId: number }, EngineError>,
Expand Down
32 changes: 28 additions & 4 deletions modules/utils/src/test/services/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class MemoryMessagingService implements IMessagingService {
inbox?: string;
replyTo?: string;
data: {
protocolVersion?: string;
update?: ChannelUpdate<any>;
previousUpdate?: ChannelUpdate<any>;
error?: ProtocolError;
Expand All @@ -33,7 +34,12 @@ export class MemoryMessagingService implements IMessagingService {
to?: string;
from: string;
inbox?: string;
data: { update?: ChannelUpdate<any>; previousUpdate?: ChannelUpdate<any>; error?: ProtocolError };
data: {
update?: ChannelUpdate<any>;
previousUpdate?: ChannelUpdate<any>;
error?: ProtocolError;
protocolVersion?: string;
};
replyTo?: string;
}>();

Expand Down Expand Up @@ -67,7 +73,20 @@ export class MemoryMessagingService implements IMessagingService {
this.protocolEvt.detach();
}

// TODO: remove these!
async onReceiveLockMessage(
publicIdentifier: string,
callback: (lockInfo: Result<any, NodeError>, from: string, inbox: string) => void,
): Promise<void> {
console.warn("Method to be deprecated");
}

async respondToLockMessage(inbox: string, lockInformation: Result<any, NodeError>): Promise<void> {
console.warn("Method to be deprecated");
}

async sendProtocolMessage(
protocolVersion: string,
channelUpdate: ChannelUpdate<any>,
previousUpdate?: ChannelUpdate<any>,
timeout = 20_000,
Expand All @@ -79,7 +98,7 @@ export class MemoryMessagingService implements IMessagingService {
to: channelUpdate.toIdentifier,
from: channelUpdate.fromIdentifier,
replyTo: inbox,
data: { update: channelUpdate, previousUpdate },
data: { update: channelUpdate, previousUpdate, protocolVersion },
});
const res = await responsePromise;
if (res.data.error) {
Expand All @@ -90,12 +109,13 @@ export class MemoryMessagingService implements IMessagingService {

async respondToProtocolMessage(
inbox: string,
protocolVersion: string,
channelUpdate: ChannelUpdate<any>,
previousUpdate?: ChannelUpdate<any>,
): Promise<void> {
this.protocolEvt.post({
inbox,
data: { update: channelUpdate, previousUpdate },
data: { update: channelUpdate, previousUpdate, protocolVersion },
from: channelUpdate.toIdentifier,
});
}
Expand All @@ -111,7 +131,10 @@ export class MemoryMessagingService implements IMessagingService {
async onReceiveProtocolMessage(
myPublicIdentifier: string,
callback: (
result: Result<{ update: ChannelUpdate<any>; previousUpdate: ChannelUpdate<any> }, ProtocolError>,
result: Result<
{ update: ChannelUpdate<any>; previousUpdate: ChannelUpdate<any>; protocolVersion: string },
ProtocolError
>,
from: string,
inbox: string,
) => void,
Expand All @@ -123,6 +146,7 @@ export class MemoryMessagingService implements IMessagingService {
Result.ok({
previousUpdate: data.previousUpdate!,
update: data.update!,
protocolVersion: data.protocolVersion!,
}),
from,
replyTo!,
Expand Down
4 changes: 2 additions & 2 deletions ops/npm-publish.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ if [[ ! "$(pwd | sed 's|.*/\(.*\)|\1|')" =~ $project ]]
then echo "Aborting: Make sure you're in the $project project root" && exit 1
fi

make all

echo "Did you update the changelog.md before publishing (y/n)?"
read -p "> " -r
echo
Expand Down Expand Up @@ -91,6 +89,8 @@ fi

( # () designates a subshell so we don't have to cd back to where we started afterwards
echo "Let's go"
echo "export const PROTOCOL_VERSION='${target_version}'" > "${root}/modules/types/src/version.ts"
make all
cd modules

for package in $package_names
Expand Down