diff --git a/modules/engine/src/index.ts b/modules/engine/src/index.ts index 63ea20764..2a0ef751f 100644 --- a/modules/engine/src/index.ts +++ b/modules/engine/src/index.ts @@ -27,6 +27,7 @@ import { WITHDRAWAL_RESOLVED_EVENT, VectorErrorJson, getConfirmationsForChain, + ProtocolError, } from "@connext/vector-types"; import { generateMerkleTreeData, @@ -54,6 +55,7 @@ import { setupEngineListeners } from "./listeners"; import { getEngineEvtContainer } from "./utils"; import { sendIsAlive } from "./isAlive"; import { WithdrawCommitment } from "@connext/vector-contracts"; +import { FullChannelState } from "../../types/dist/src"; export const ajv = new Ajv(); @@ -63,8 +65,6 @@ export class VectorEngine implements IVectorEngine { // Setup event container to emit events from vector private readonly evts: EngineEvtContainer = getEngineEvtContainer(); - private readonly restoreLocks: { [channelAddress: string]: string } = {}; - private constructor( private readonly signer: IChannelSigner, private readonly messaging: IMessagingService, @@ -579,7 +579,9 @@ export class VectorEngine implements IVectorEngine { if (setupParamsResult.isError) { return Result.fail(setupParamsResult.getError()!); } - const setupRes = await this.vector.setup(setupParamsResult.getValue()); + const setupRes = await this.runProtocolMethodWithRetries(() => + this.vector.setup(setupParamsResult.getValue()), + ); if (setupRes.isError) { return Result.fail(setupRes.getError()!); @@ -681,30 +683,8 @@ export class VectorEngine implements IVectorEngine { // own. Bob reconciles 8 and fails to recover Alice's signature properly // leaving all 8 out of the channel. - // There is no way to eliminate this race condition, so instead just retry - // depositing if a signature validation error is detected. - let depositRes = await this.vector.deposit(params); - let count = 1; - for (const _ of Array(3).fill(0)) { - // If its not an error, do not retry - if (!depositRes.isError) { - break; - } - const error = depositRes.getError()!; - // IFF deposit fails because you or the counterparty fails to recover - // signatures, retry - // This should be the message from *.reasons.BadSignatures in the protocol - // errors - const recoveryErr = "Could not recover signers"; - const recoveryFailed = error.message === recoveryErr || error.context?.counterpartyError?.message === recoveryErr; - - if (!recoveryFailed) { - break; - } - this.logger.warn({ attempt: count, channelAddress: params.channelAddress }, "Retrying deposit reconciliation"); - depositRes = await this.vector.deposit(params); - count++; - } + // This race condition should be handled by the protocol retries + const depositRes = await this.runProtocolMethodWithRetries(() => this.vector.deposit(params)); this.logger.info( { result: depositRes.isError ? jsonifyError(depositRes.getError()!) : depositRes.getValue(), @@ -804,7 +784,9 @@ export class VectorEngine implements IVectorEngine { } const createParams = createResult.getValue(); this.logger.info({ transferParams: createParams, method, methodId }, "Created conditional transfer params"); - const protocolRes = await this.vector.create(createParams); + const protocolRes = await this.runProtocolMethodWithRetries(() => + this.vector.create(createParams), + ); if (protocolRes.isError) { return Result.fail(protocolRes.getError()!); } @@ -850,7 +832,9 @@ export class VectorEngine implements IVectorEngine { return Result.fail(resolveResult.getError()!); } const resolveParams = resolveResult.getValue(); - const protocolRes = await this.vector.resolve(resolveParams); + const protocolRes = await this.runProtocolMethodWithRetries(() => + this.vector.resolve(resolveParams), + ); if (protocolRes.isError) { return Result.fail(protocolRes.getError()!); } @@ -919,7 +903,9 @@ export class VectorEngine implements IVectorEngine { ]); // create withdrawal transfer - const protocolRes = await this.vector.create(createParams); + const protocolRes = await this.runProtocolMethodWithRetries(() => + this.vector.create(createParams), + ); if (protocolRes.isError) { return Result.fail(protocolRes.getError()!); } @@ -1046,7 +1032,11 @@ export class VectorEngine implements IVectorEngine { } // RESTORE STATE - // NOTE: MUST be under protocol lock + // NOTE: this is not added to the protocol queue. That is because if your + // channel needs to be restored, any updates you are sent or try to send + // will fail until your store is properly updated. The failures create + // a natural lock. However, it is due to these failures that the protocol + // methods are retried. private async restoreState( params: EngineParams.RestoreState, ): Promise> { @@ -1079,42 +1069,23 @@ export class VectorEngine implements IVectorEngine { const { channel, activeTransfers } = restoreDataRes.getValue() ?? ({} as any); - // Here you are under lock, verify things about channel - // Create helper to send message allowing a release lock - const sendResponseToCounterparty = async (error?: Values, context: any = {}) => { - if (!error) { - const res = await this.messaging.sendRestoreStateMessage( - Result.ok({ - channelAddress: channel.channelAddress, - }), - counterpartyIdentifier, - this.signer.publicIdentifier, - ); - if (res.isError) { - error = RestoreError.reasons.AckFailed; - context = { error: jsonifyError(res.getError()!) }; - } else { - return Result.ok(channel); - } - } - + // Create helper to generate error + const generateRestoreError = ( + error: Values, + context: any = {}, + ): Result => { // handle error by returning it to counterparty && returning result const err = new RestoreError(error, channel?.channelAddress ?? "", this.publicIdentifier, { ...context, method, params, }); - await this.messaging.sendRestoreStateMessage( - Result.fail(err), - counterpartyIdentifier, - this.signer.publicIdentifier, - ); return Result.fail(err); }; // Verify data exists if (!channel || !activeTransfers) { - return sendResponseToCounterparty(RestoreError.reasons.NoData); + return generateRestoreError(RestoreError.reasons.NoData); } // Verify channel address is same as calculated @@ -1126,12 +1097,12 @@ export class VectorEngine implements IVectorEngine { chainId, ); if (calculated.isError) { - return sendResponseToCounterparty(RestoreError.reasons.GetChannelAddressFailed, { + return generateRestoreError(RestoreError.reasons.GetChannelAddressFailed, { getChannelAddressError: jsonifyError(calculated.getError()!), }); } if (calculated.getValue() !== channel.channelAddress) { - return sendResponseToCounterparty(RestoreError.reasons.InvalidChannelAddress, { + return generateRestoreError(RestoreError.reasons.InvalidChannelAddress, { calculated: calculated.getValue(), }); } @@ -1144,7 +1115,7 @@ export class VectorEngine implements IVectorEngine { "both", ); if (sigRes.isError) { - return sendResponseToCounterparty(RestoreError.reasons.InvalidSignatures, { + return generateRestoreError(RestoreError.reasons.InvalidSignatures, { recoveryError: sigRes.getError().message, }); } @@ -1152,7 +1123,7 @@ export class VectorEngine implements IVectorEngine { // Verify transfers match merkleRoot const { root } = generateMerkleTreeData(activeTransfers); if (root !== channel.merkleRoot) { - return sendResponseToCounterparty(RestoreError.reasons.InvalidMerkleRoot, { + return generateRestoreError(RestoreError.reasons.InvalidMerkleRoot, { calculated: root, merkleRoot: channel.merkleRoot, activeTransfers: activeTransfers.map((t) => t.transferId), @@ -1162,14 +1133,14 @@ export class VectorEngine implements IVectorEngine { // Verify nothing with a sync-able nonce exists in store const existing = await this.getChannelState({ channelAddress: channel.channelAddress }); if (existing.isError) { - return sendResponseToCounterparty(RestoreError.reasons.CouldNotGetChannel, { + return generateRestoreError(RestoreError.reasons.CouldNotGetChannel, { getChannelStateError: jsonifyError(existing.getError()!), }); } const nonce = existing.getValue()?.nonce ?? 0; const diff = channel.nonce - nonce; if (diff <= 1 && channel.latestUpdate.type !== UpdateType.setup) { - return sendResponseToCounterparty(RestoreError.reasons.SyncableState, { + return generateRestoreError(RestoreError.reasons.SyncableState, { existing: nonce, toRestore: channel.nonce, }); @@ -1179,14 +1150,11 @@ export class VectorEngine implements IVectorEngine { try { await this.store.saveChannelStateAndTransfers(channel, activeTransfers); } catch (e) { - return sendResponseToCounterparty(RestoreError.reasons.SaveChannelFailed, { + return generateRestoreError(RestoreError.reasons.SaveChannelFailed, { saveChannelStateAndTransfersError: e.message, }); } - // Respond by saying this was a success - const returnVal = await sendResponseToCounterparty(); - // Post to evt this.evts[EngineEvents.RESTORE_STATE_EVENT].post({ channelAddress: channel.channelAddress, @@ -1197,13 +1165,14 @@ export class VectorEngine implements IVectorEngine { this.logger.info( { - result: returnVal.isError ? jsonifyError(returnVal.getError()!) : returnVal.getValue(), + channel, + transfers: activeTransfers.map((t) => t.transferId), method, methodId, }, "Method complete", ); - return returnVal; + return Result.ok(channel); } // DISPUTE METHODS @@ -1469,6 +1438,8 @@ export class VectorEngine implements IVectorEngine { return Result.ok(results); } + // NOTE: no need to retry here because this method is not relevant + // to restoreState conditions private async syncDisputes(): Promise> { try { await this.vector.syncDisputes(); @@ -1483,6 +1454,18 @@ export class VectorEngine implements IVectorEngine { } } + private async runProtocolMethodWithRetries(fn: () => Promise>, retryCount = 5) { + let result: Result | undefined; + for (let i = 0; i < retryCount; i++) { + result = await fn(); + if (!result.isError) { + return result; + } + this.logger.warn({ attempt: i, error: result.getError().message }, "Protocol method failed"); + } + return result as Result; + } + // JSON RPC interface -- this will accept: // - "chan_deposit" // - "chan_createTransfer" diff --git a/modules/engine/src/listeners.ts b/modules/engine/src/listeners.ts index 5f2a84520..d412136df 100644 --- a/modules/engine/src/listeners.ts +++ b/modules/engine/src/listeners.ts @@ -161,11 +161,7 @@ export async function setupEngineListeners( await messaging.onReceiveRestoreStateMessage( signer.publicIdentifier, - async ( - restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>, - from: string, - inbox: string, - ) => { + async (restoreData: Result<{ chainId: number }, EngineError>, from: string, inbox: string) => { // If it is from yourself, do nothing if (from === signer.publicIdentifier) { return; @@ -173,7 +169,65 @@ export async function setupEngineListeners( const method = "onReceiveRestoreStateMessage"; logger.debug({ method }, "Handling message"); - throw new Error("call to protocol to add to internal queue"); + // Received error from counterparty + if (restoreData.isError) { + logger.error({ message: restoreData.getError()!.message, method }, "Error received from counterparty restore"); + return; + } + + const data = restoreData.getValue(); + const [key] = Object.keys(data ?? []); + if (key !== "chainId") { + logger.error({ data }, "Message malformed"); + return; + } + + // Counterparty looking to initiate a restore + let channel: FullChannelState | undefined; + const sendCannotRestoreFromError = (error: Values, context: any = {}) => { + return messaging.respondToRestoreStateMessage( + inbox, + Result.fail( + new RestoreError(error, channel?.channelAddress ?? "", signer.publicIdentifier, { ...context, method }), + ), + ); + }; + + // Get info from store to send to counterparty + const { chainId } = data as any; + try { + channel = await store.getChannelStateByParticipants(signer.publicIdentifier, from, chainId); + } catch (e) { + return sendCannotRestoreFromError(RestoreError.reasons.CouldNotGetChannel, { + storeMethod: "getChannelStateByParticipants", + chainId, + identifiers: [signer.publicIdentifier, from], + }); + } + if (!channel) { + return sendCannotRestoreFromError(RestoreError.reasons.ChannelNotFound, { chainId }); + } + let activeTransfers: FullTransferState[]; + try { + activeTransfers = await store.getActiveTransfers(channel.channelAddress); + } catch (e) { + return sendCannotRestoreFromError(RestoreError.reasons.CouldNotGetActiveTransfers, { + storeMethod: "getActiveTransfers", + chainId, + channelAddress: channel.channelAddress, + }); + } + + // Send info to counterparty + logger.debug( + { + channel: channel.channelAddress, + nonce: channel.nonce, + activeTransfers: activeTransfers.map((a) => a.transferId), + }, + "Sending counterparty state to sync", + ); + await messaging.respondToRestoreStateMessage(inbox, Result.ok({ channel, activeTransfers })); }, ); diff --git a/modules/types/src/messaging.ts b/modules/types/src/messaging.ts index 7e606f7d4..ca64288d3 100644 --- a/modules/types/src/messaging.ts +++ b/modules/types/src/messaging.ts @@ -70,11 +70,8 @@ export interface IMessagingService extends IBasicMessaging { // 2. sends restore data // - counterparty responds // - restore-r restores - // - restore-r sends result (err or success) to counterparty - // - counterparty receives - // 1. releases lock sendRestoreStateMessage( - restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>, + restoreData: Result<{ chainId: number }, EngineError>, to: string, from: string, timeout?: number, @@ -84,11 +81,7 @@ export interface IMessagingService extends IBasicMessaging { >; onReceiveRestoreStateMessage( publicIdentifier: string, - callback: ( - restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>, - from: string, - inbox: string, - ) => void, + callback: (restoreData: Result<{ chainId: number }, EngineError>, from: string, inbox: string) => void, ): Promise; respondToRestoreStateMessage( inbox: string, diff --git a/modules/utils/src/messaging.ts b/modules/utils/src/messaging.ts index 19513cf4a..ba16a91bd 100644 --- a/modules/utils/src/messaging.ts +++ b/modules/utils/src/messaging.ts @@ -381,7 +381,7 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I // RESTORE METHODS async sendRestoreStateMessage( - restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>, + restoreData: Result<{ chainId: number }, EngineError>, to: string, from: string, timeout = 30_000, @@ -400,11 +400,7 @@ export class NatsMessagingService extends NatsBasicMessagingService implements I async onReceiveRestoreStateMessage( publicIdentifier: string, - callback: ( - restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>, - from: string, - inbox: string, - ) => void, + callback: (restoreData: Result<{ chainId: number }, EngineError>, from: string, inbox: string) => void, ): Promise { await this.registerCallback(`${publicIdentifier}.*.restore`, callback, "onReceiveRestoreStateMessage"); } diff --git a/modules/utils/src/test/services/messaging.ts b/modules/utils/src/test/services/messaging.ts index 5e724f19f..c27b0456e 100644 --- a/modules/utils/src/test/services/messaging.ts +++ b/modules/utils/src/test/services/messaging.ts @@ -158,7 +158,7 @@ export class MemoryMessagingService implements IMessagingService { } sendRestoreStateMessage( - restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>, + restoreData: Result<{ chainId: number }, EngineError>, to: string, from: string, timeout?: number, @@ -168,11 +168,7 @@ export class MemoryMessagingService implements IMessagingService { } onReceiveRestoreStateMessage( publicIdentifier: string, - callback: ( - restoreData: Result<{ chainId: number } | { channelAddress: string }, EngineError>, - from: string, - inbox: string, - ) => void, + callback: (restoreData: Result<{ chainId: number }, EngineError>, from: string, inbox: string) => void, ): Promise { throw new Error("Method not implemented."); }