Skip to content
This repository was archived by the owner on Dec 27, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 51 additions & 68 deletions modules/engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
WITHDRAWAL_RESOLVED_EVENT,
VectorErrorJson,
getConfirmationsForChain,
ProtocolError,
} from "@connext/vector-types";
import {
generateMerkleTreeData,
Expand Down Expand Up @@ -54,6 +55,7 @@ import { setupEngineListeners } from "./listeners";
import { getEngineEvtContainer } from "./utils";
import { sendIsAlive } from "./isAlive";
import { WithdrawCommitment } from "@connext/vector-contracts";
import { FullChannelState } from "../../types/dist/src";

export const ajv = new Ajv();

Expand All @@ -63,8 +65,6 @@ export class VectorEngine implements IVectorEngine {
// Setup event container to emit events from vector
private readonly evts: EngineEvtContainer = getEngineEvtContainer();

private readonly restoreLocks: { [channelAddress: string]: string } = {};

private constructor(
private readonly signer: IChannelSigner,
private readonly messaging: IMessagingService,
Expand Down Expand Up @@ -579,7 +579,9 @@ export class VectorEngine implements IVectorEngine {
if (setupParamsResult.isError) {
return Result.fail(setupParamsResult.getError()!);
}
const setupRes = await this.vector.setup(setupParamsResult.getValue());
const setupRes = await this.runProtocolMethodWithRetries<FullChannelState>(() =>
this.vector.setup(setupParamsResult.getValue()),
);

if (setupRes.isError) {
return Result.fail(setupRes.getError()!);
Expand Down Expand Up @@ -681,30 +683,8 @@ export class VectorEngine implements IVectorEngine {
// own. Bob reconciles 8 and fails to recover Alice's signature properly
// leaving all 8 out of the channel.

// There is no way to eliminate this race condition, so instead just retry
// depositing if a signature validation error is detected.
let depositRes = await this.vector.deposit(params);
let count = 1;
for (const _ of Array(3).fill(0)) {
// If its not an error, do not retry
if (!depositRes.isError) {
break;
}
const error = depositRes.getError()!;
// IFF deposit fails because you or the counterparty fails to recover
// signatures, retry
// This should be the message from *.reasons.BadSignatures in the protocol
// errors
const recoveryErr = "Could not recover signers";
const recoveryFailed = error.message === recoveryErr || error.context?.counterpartyError?.message === recoveryErr;

if (!recoveryFailed) {
break;
}
this.logger.warn({ attempt: count, channelAddress: params.channelAddress }, "Retrying deposit reconciliation");
depositRes = await this.vector.deposit(params);
count++;
}
// This race condition should be handled by the protocol retries
const depositRes = await this.runProtocolMethodWithRetries<FullChannelState>(() => this.vector.deposit(params));
this.logger.info(
{
result: depositRes.isError ? jsonifyError(depositRes.getError()!) : depositRes.getValue(),
Expand Down Expand Up @@ -804,7 +784,9 @@ export class VectorEngine implements IVectorEngine {
}
const createParams = createResult.getValue();
this.logger.info({ transferParams: createParams, method, methodId }, "Created conditional transfer params");
const protocolRes = await this.vector.create(createParams);
const protocolRes = await this.runProtocolMethodWithRetries<FullChannelState>(() =>
this.vector.create(createParams),
);
if (protocolRes.isError) {
return Result.fail(protocolRes.getError()!);
}
Expand Down Expand Up @@ -850,7 +832,9 @@ export class VectorEngine implements IVectorEngine {
return Result.fail(resolveResult.getError()!);
}
const resolveParams = resolveResult.getValue();
const protocolRes = await this.vector.resolve(resolveParams);
const protocolRes = await this.runProtocolMethodWithRetries<FullChannelState>(() =>
this.vector.resolve(resolveParams),
);
if (protocolRes.isError) {
return Result.fail(protocolRes.getError()!);
}
Expand Down Expand Up @@ -919,7 +903,9 @@ export class VectorEngine implements IVectorEngine {
]);

// create withdrawal transfer
const protocolRes = await this.vector.create(createParams);
const protocolRes = await this.runProtocolMethodWithRetries<FullChannelState>(() =>
this.vector.create(createParams),
);
if (protocolRes.isError) {
return Result.fail(protocolRes.getError()!);
}
Expand Down Expand Up @@ -1046,7 +1032,11 @@ export class VectorEngine implements IVectorEngine {
}

// RESTORE STATE
// NOTE: MUST be under protocol lock
// NOTE: this is not added to the protocol queue. That is because if your
// channel needs to be restored, any updates you are sent or try to send
// will fail until your store is properly updated. The failures create
// a natural lock. However, it is due to these failures that the protocol
// methods are retried.
private async restoreState(
params: EngineParams.RestoreState,
): Promise<Result<ChannelRpcMethodsResponsesMap["chan_restoreState"], EngineError>> {
Expand Down Expand Up @@ -1079,42 +1069,23 @@ export class VectorEngine implements IVectorEngine {

const { channel, activeTransfers } = restoreDataRes.getValue() ?? ({} as any);

// Here you are under lock, verify things about channel
// Create helper to send message allowing a release lock
const sendResponseToCounterparty = async (error?: Values<typeof RestoreError.reasons>, context: any = {}) => {
if (!error) {
const res = await this.messaging.sendRestoreStateMessage(
Result.ok({
channelAddress: channel.channelAddress,
}),
counterpartyIdentifier,
this.signer.publicIdentifier,
);
if (res.isError) {
error = RestoreError.reasons.AckFailed;
context = { error: jsonifyError(res.getError()!) };
} else {
return Result.ok(channel);
}
}

// Create helper to generate error
const generateRestoreError = (
error: Values<typeof RestoreError.reasons>,
context: any = {},
): Result<FullChannelState, RestoreError> => {
// handle error by returning it to counterparty && returning result
const err = new RestoreError(error, channel?.channelAddress ?? "", this.publicIdentifier, {
...context,
method,
params,
});
await this.messaging.sendRestoreStateMessage(
Result.fail(err),
counterpartyIdentifier,
this.signer.publicIdentifier,
);
return Result.fail(err);
};

// Verify data exists
if (!channel || !activeTransfers) {
return sendResponseToCounterparty(RestoreError.reasons.NoData);
return generateRestoreError(RestoreError.reasons.NoData);
}

// Verify channel address is same as calculated
Expand All @@ -1126,12 +1097,12 @@ export class VectorEngine implements IVectorEngine {
chainId,
);
if (calculated.isError) {
return sendResponseToCounterparty(RestoreError.reasons.GetChannelAddressFailed, {
return generateRestoreError(RestoreError.reasons.GetChannelAddressFailed, {
getChannelAddressError: jsonifyError(calculated.getError()!),
});
}
if (calculated.getValue() !== channel.channelAddress) {
return sendResponseToCounterparty(RestoreError.reasons.InvalidChannelAddress, {
return generateRestoreError(RestoreError.reasons.InvalidChannelAddress, {
calculated: calculated.getValue(),
});
}
Expand All @@ -1144,15 +1115,15 @@ export class VectorEngine implements IVectorEngine {
"both",
);
if (sigRes.isError) {
return sendResponseToCounterparty(RestoreError.reasons.InvalidSignatures, {
return generateRestoreError(RestoreError.reasons.InvalidSignatures, {
recoveryError: sigRes.getError().message,
});
}

// Verify transfers match merkleRoot
const { root } = generateMerkleTreeData(activeTransfers);
if (root !== channel.merkleRoot) {
return sendResponseToCounterparty(RestoreError.reasons.InvalidMerkleRoot, {
return generateRestoreError(RestoreError.reasons.InvalidMerkleRoot, {
calculated: root,
merkleRoot: channel.merkleRoot,
activeTransfers: activeTransfers.map((t) => t.transferId),
Expand All @@ -1162,14 +1133,14 @@ export class VectorEngine implements IVectorEngine {
// Verify nothing with a sync-able nonce exists in store
const existing = await this.getChannelState({ channelAddress: channel.channelAddress });
if (existing.isError) {
return sendResponseToCounterparty(RestoreError.reasons.CouldNotGetChannel, {
return generateRestoreError(RestoreError.reasons.CouldNotGetChannel, {
getChannelStateError: jsonifyError(existing.getError()!),
});
}
const nonce = existing.getValue()?.nonce ?? 0;
const diff = channel.nonce - nonce;
if (diff <= 1 && channel.latestUpdate.type !== UpdateType.setup) {
return sendResponseToCounterparty(RestoreError.reasons.SyncableState, {
return generateRestoreError(RestoreError.reasons.SyncableState, {
existing: nonce,
toRestore: channel.nonce,
});
Expand All @@ -1179,14 +1150,11 @@ export class VectorEngine implements IVectorEngine {
try {
await this.store.saveChannelStateAndTransfers(channel, activeTransfers);
} catch (e) {
return sendResponseToCounterparty(RestoreError.reasons.SaveChannelFailed, {
return generateRestoreError(RestoreError.reasons.SaveChannelFailed, {
saveChannelStateAndTransfersError: e.message,
});
}

// Respond by saying this was a success
const returnVal = await sendResponseToCounterparty();

// Post to evt
this.evts[EngineEvents.RESTORE_STATE_EVENT].post({
channelAddress: channel.channelAddress,
Expand All @@ -1197,13 +1165,14 @@ export class VectorEngine implements IVectorEngine {

this.logger.info(
{
result: returnVal.isError ? jsonifyError(returnVal.getError()!) : returnVal.getValue(),
channel,
transfers: activeTransfers.map((t) => t.transferId),
method,
methodId,
},
"Method complete",
);
return returnVal;
return Result.ok(channel);
}

// DISPUTE METHODS
Expand Down Expand Up @@ -1469,6 +1438,8 @@ export class VectorEngine implements IVectorEngine {
return Result.ok(results);
}

// NOTE: no need to retry here because this method is not relevant
// to restoreState conditions
private async syncDisputes(): Promise<Result<void, EngineError>> {
try {
await this.vector.syncDisputes();
Expand All @@ -1483,6 +1454,18 @@ export class VectorEngine implements IVectorEngine {
}
}

private async runProtocolMethodWithRetries<T = any>(fn: () => Promise<Result<T, ProtocolError>>, retryCount = 5) {
let result: Result<T> | undefined;
for (let i = 0; i < retryCount; i++) {
result = await fn();
if (!result.isError) {
return result;
}
this.logger.warn({ attempt: i, error: result.getError().message }, "Protocol method failed");
}
return result as Result<T, ProtocolError>;
}

// JSON RPC interface -- this will accept:
// - "chan_deposit"
// - "chan_createTransfer"
Expand Down
66 changes: 60 additions & 6 deletions modules/engine/src/listeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,73 @@ export async function setupEngineListeners(

await messaging.onReceiveRestoreStateMessage(
signer.publicIdentifier,
async (
restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>,
from: string,
inbox: string,
) => {
async (restoreData: Result<{ chainId: number }, EngineError>, from: string, inbox: string) => {
// If it is from yourself, do nothing
if (from === signer.publicIdentifier) {
return;
}
const method = "onReceiveRestoreStateMessage";
logger.debug({ method }, "Handling message");

throw new Error("call to protocol to add to internal queue");
// Received error from counterparty
if (restoreData.isError) {
logger.error({ message: restoreData.getError()!.message, method }, "Error received from counterparty restore");
return;
}

const data = restoreData.getValue();
const [key] = Object.keys(data ?? []);
if (key !== "chainId") {
logger.error({ data }, "Message malformed");
return;
}

// Counterparty looking to initiate a restore
let channel: FullChannelState | undefined;
const sendCannotRestoreFromError = (error: Values<typeof RestoreError.reasons>, context: any = {}) => {
return messaging.respondToRestoreStateMessage(
inbox,
Result.fail(
new RestoreError(error, channel?.channelAddress ?? "", signer.publicIdentifier, { ...context, method }),
),
);
};

// Get info from store to send to counterparty
const { chainId } = data as any;
try {
channel = await store.getChannelStateByParticipants(signer.publicIdentifier, from, chainId);
} catch (e) {
return sendCannotRestoreFromError(RestoreError.reasons.CouldNotGetChannel, {
storeMethod: "getChannelStateByParticipants",
chainId,
identifiers: [signer.publicIdentifier, from],
});
}
if (!channel) {
return sendCannotRestoreFromError(RestoreError.reasons.ChannelNotFound, { chainId });
}
let activeTransfers: FullTransferState[];
try {
activeTransfers = await store.getActiveTransfers(channel.channelAddress);
} catch (e) {
return sendCannotRestoreFromError(RestoreError.reasons.CouldNotGetActiveTransfers, {
storeMethod: "getActiveTransfers",
chainId,
channelAddress: channel.channelAddress,
});
}

// Send info to counterparty
logger.debug(
{
channel: channel.channelAddress,
nonce: channel.nonce,
activeTransfers: activeTransfers.map((a) => a.transferId),
},
"Sending counterparty state to sync",
);
await messaging.respondToRestoreStateMessage(inbox, Result.ok({ channel, activeTransfers }));
},
);

Expand Down
11 changes: 2 additions & 9 deletions modules/types/src/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,8 @@ export interface IMessagingService extends IBasicMessaging {
// 2. sends restore data
// - counterparty responds
// - restore-r restores
// - restore-r sends result (err or success) to counterparty
// - counterparty receives
// 1. releases lock
sendRestoreStateMessage(
restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>,
restoreData: Result<{ chainId: number }, EngineError>,
to: string,
from: string,
timeout?: number,
Expand All @@ -84,11 +81,7 @@ export interface IMessagingService extends IBasicMessaging {
>;
onReceiveRestoreStateMessage(
publicIdentifier: string,
callback: (
restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>,
from: string,
inbox: string,
) => void,
callback: (restoreData: Result<{ chainId: number }, EngineError>, from: string, inbox: string) => void,
): Promise<void>;
respondToRestoreStateMessage(
inbox: string,
Expand Down
Loading