Skip to content
This repository was archived by the owner on Dec 27, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions modules/engine/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,6 @@ export class CheckInError extends EngineError {
}
}

export class RestoreError extends EngineError {
static readonly type = "RestoreError";

static readonly reasons = {
AckFailed: "Could not send restore ack",
AcquireLockError: "Failed to acquire restore lock",
ChannelNotFound: "Channel not found",
CouldNotGetActiveTransfers: "Failed to retrieve active transfers from store",
CouldNotGetChannel: "Failed to retrieve channel from store",
GetChannelAddressFailed: "Failed to calculate channel address for verification",
InvalidChannelAddress: "Failed to verify channel address",
InvalidMerkleRoot: "Failed to validate merkleRoot for restoration",
InvalidSignatures: "Failed to validate sigs on latestUpdate",
NoData: "No data sent from counterparty to restore",
ReceivedError: "Got restore error from counterparty",
ReleaseLockError: "Failed to release restore lock",
SaveChannelFailed: "Failed to save channel state",
SyncableState: "Cannot restore, state is syncable. Try reconcileDeposit",
} as const;

constructor(
public readonly message: Values<typeof RestoreError.reasons>,
channelAddress: string,
publicIdentifier: string,
context: any = {},
) {
super(message, channelAddress, publicIdentifier, context, RestoreError.type);
}
}

export class IsAliveError extends EngineError {
static readonly type = "IsAliveError";

Expand Down
192 changes: 38 additions & 154 deletions modules/engine/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { WithdrawCommitment } from "@connext/vector-contracts";
import { Vector } from "@connext/vector-protocol";
import {
ChainAddresses,
Expand All @@ -19,31 +20,20 @@ import {
IExternalValidation,
AUTODEPLOY_CHAIN_IDS,
EngineError,
UpdateType,
Values,
VectorError,
jsonifyError,
MinimalTransaction,
WITHDRAWAL_RESOLVED_EVENT,
VectorErrorJson,
ProtocolError,
} from "@connext/vector-types";
import {
generateMerkleRoot,
validateChannelUpdateSignatures,
getSignerAddressFromPublicIdentifier,
getRandomBytes32,
getParticipant,
hashWithdrawalQuote,
delay,
} from "@connext/vector-utils";
import { getRandomBytes32, getParticipant, hashWithdrawalQuote, delay } from "@connext/vector-utils";
import pino from "pino";
import Ajv from "ajv";
import { Evt } from "evt";

import { version } from "../package.json";

import { DisputeError, IsAliveError, RestoreError, RpcError } from "./errors";
import { DisputeError, IsAliveError, RpcError } from "./errors";
import {
convertConditionalTransferParams,
convertResolveConditionParams,
Expand All @@ -53,8 +43,6 @@ import {
import { setupEngineListeners } from "./listeners";
import { getEngineEvtContainer, withdrawRetryForTransferId, addTransactionToCommitment } from "./utils";
import { sendIsAlive } from "./isAlive";
import { WithdrawCommitment } from "@connext/vector-contracts";
import { FullChannelState } from "../../types/dist/src";

export const ajv = new Ajv();

Expand Down Expand Up @@ -578,10 +566,7 @@ export class VectorEngine implements IVectorEngine {
if (setupParamsResult.isError) {
return Result.fail(setupParamsResult.getError()!);
}
const setupRes = await this.runProtocolMethodWithRetries<FullChannelState>(
() => this.vector.setup(setupParamsResult.getValue()),
"",
);
const setupRes = await this.vector.setup(setupParamsResult.getValue());

if (setupRes.isError) {
return Result.fail(setupRes.getError()!);
Expand Down Expand Up @@ -683,10 +668,30 @@ export class VectorEngine implements IVectorEngine {
// leaving all 8 out of the channel.

// This race condition should be handled by the protocol retries
const depositRes = await this.runProtocolMethodWithRetries<FullChannelState>(
() => this.vector.deposit(params),
params.channelAddress,
);
const timeout = 500;
let depositRes = await this.vector.deposit(params);
let count = 1;
for (const _ of Array(3).fill(0)) {
// If its not an error, do not retry
if (!depositRes.isError) {
break;
}
const error = depositRes.getError()!;
// IFF deposit fails because you or the counterparty fails to recover
// signatures, retry
// This should be the message from *.reasons.BadSignatures in the protocol
// errors
const recoveryErr = "Could not recover signers";
const recoveryFailed = error.message === recoveryErr || error.context?.counterpartyError?.message === recoveryErr;

if (!recoveryFailed) {
break;
}
this.logger.warn({ attempt: count, channelAddress: params.channelAddress }, "Retrying deposit reconciliation");
depositRes = await this.vector.deposit(params);
count++;
await delay(timeout);
}
this.logger.info(
{
result: depositRes.isError ? jsonifyError(depositRes.getError()!) : depositRes.getValue().channelAddress,
Expand Down Expand Up @@ -786,10 +791,7 @@ export class VectorEngine implements IVectorEngine {
}
const createParams = createResult.getValue();
this.logger.info({ transferParams: createParams, method, methodId }, "Created conditional transfer params");
const protocolRes = await this.runProtocolMethodWithRetries<FullChannelState>(
() => this.vector.create(createParams),
createParams.channelAddress,
);
const protocolRes = await this.vector.create(createParams);
if (protocolRes.isError) {
return Result.fail(protocolRes.getError()!);
}
Expand Down Expand Up @@ -835,10 +837,7 @@ export class VectorEngine implements IVectorEngine {
return Result.fail(resolveResult.getError()!);
}
const resolveParams = resolveResult.getValue();
const protocolRes = await this.runProtocolMethodWithRetries<FullChannelState>(
() => this.vector.resolve(resolveParams),
resolveParams.channelAddress,
);
const protocolRes = await this.vector.resolve(resolveParams);
if (protocolRes.isError) {
return Result.fail(protocolRes.getError()!);
}
Expand Down Expand Up @@ -902,10 +901,7 @@ export class VectorEngine implements IVectorEngine {
);

// create withdrawal transfer
const protocolRes = await this.runProtocolMethodWithRetries<FullChannelState>(
() => this.vector.create(createParams),
createParams.channelAddress,
);
const protocolRes = await this.vector.create(createParams);
if (protocolRes.isError) {
return Result.fail(protocolRes.getError()!);
}
Expand Down Expand Up @@ -1195,119 +1191,25 @@ export class VectorEngine implements IVectorEngine {
);
}

// Send message to counterparty, they will grab lock and
// return information under lock, initiator will update channel,
// then send confirmation message to counterparty, who will release the lock
const { chainId, counterpartyIdentifier } = params;
const restoreDataRes = await this.messaging.sendRestoreStateMessage(
Result.ok({ chainId }),
counterpartyIdentifier,
this.signer.publicIdentifier,
);
if (restoreDataRes.isError) {
return Result.fail(restoreDataRes.getError()!);
// Request protocol restore
const restoreResult = await this.vector.restoreState(params);
if (restoreResult.isError) {
return Result.fail(restoreResult.getError()!);
}

const { channel, activeTransfers } = restoreDataRes.getValue() ?? ({} as any);

// Create helper to generate error
const generateRestoreError = (
error: Values<typeof RestoreError.reasons>,
context: any = {},
): Result<FullChannelState, RestoreError> => {
// handle error by returning it to counterparty && returning result
const err = new RestoreError(error, channel?.channelAddress ?? "", this.publicIdentifier, {
...context,
method,
params,
});
return Result.fail(err);
};

// Verify data exists
if (!channel || !activeTransfers) {
return generateRestoreError(RestoreError.reasons.NoData);
}

// Verify channel address is same as calculated
const counterparty = getSignerAddressFromPublicIdentifier(counterpartyIdentifier);
const calculated = await this.chainService.getChannelAddress(
channel.alice === this.signer.address ? this.signer.address : counterparty,
channel.bob === this.signer.address ? this.signer.address : counterparty,
channel.networkContext.channelFactoryAddress,
chainId,
);
if (calculated.isError) {
return generateRestoreError(RestoreError.reasons.GetChannelAddressFailed, {
getChannelAddressError: jsonifyError(calculated.getError()!),
});
}
if (calculated.getValue() !== channel.channelAddress) {
return generateRestoreError(RestoreError.reasons.InvalidChannelAddress, {
calculated: calculated.getValue(),
});
}

// Verify signatures on latest update
const sigRes = await validateChannelUpdateSignatures(
channel,
channel.latestUpdate.aliceSignature,
channel.latestUpdate.bobSignature,
"both",
);
if (sigRes.isError) {
return generateRestoreError(RestoreError.reasons.InvalidSignatures, {
recoveryError: sigRes.getError().message,
});
}

// Verify transfers match merkleRoot
const root = generateMerkleRoot(activeTransfers);
if (root !== channel.merkleRoot) {
return generateRestoreError(RestoreError.reasons.InvalidMerkleRoot, {
calculated: root,
merkleRoot: channel.merkleRoot,
activeTransfers: activeTransfers.map((t) => t.transferId),
});
}

// Verify nothing with a sync-able nonce exists in store
const existing = await this.getChannelState({ channelAddress: channel.channelAddress });
if (existing.isError) {
return generateRestoreError(RestoreError.reasons.CouldNotGetChannel, {
getChannelStateError: jsonifyError(existing.getError()!),
});
}
const nonce = existing.getValue()?.nonce ?? 0;
const diff = channel.nonce - nonce;
if (diff <= 1 && channel.latestUpdate.type !== UpdateType.setup) {
return generateRestoreError(RestoreError.reasons.SyncableState, {
existing: nonce,
toRestore: channel.nonce,
});
}

// Save channel
try {
await this.store.saveChannelStateAndTransfers(channel, activeTransfers);
} catch (e) {
return generateRestoreError(RestoreError.reasons.SaveChannelFailed, {
saveChannelStateAndTransfersError: e.message,
});
}
const channel = restoreResult.getValue();

// Post to evt
this.evts[EngineEvents.RESTORE_STATE_EVENT].post({
channelAddress: channel.channelAddress,
aliceIdentifier: channel.aliceIdentifier,
bobIdentifier: channel.bobIdentifier,
chainId,
chainId: channel.networkContext.chainId,
});

this.logger.info(
{
channel,
transfers: activeTransfers.map((t) => t.transferId),
channel: channel.channelAddress,
method,
methodId,
},
Expand Down Expand Up @@ -1585,24 +1487,6 @@ export class VectorEngine implements IVectorEngine {
}
}

private async runProtocolMethodWithRetries<T = any>(
fn: () => Promise<Result<T, ProtocolError>>,
channelAddress: string,
retryCount = 5,
) {
const result = await fn();
// let result: Result<T> | undefined;
// for (let i = 0; i < retryCount; i++) {
// result = await fn();
// if (!result.isError) {
// return result;
// }
// this.logger.warn({ attempt: i, error: result.getError().message, channelAddress }, "Protocol method failed");
// await delay(500);
// }
return result as Result<T, ProtocolError>;
}

// JSON RPC interface -- this will accept:
// - "chan_deposit"
// - "chan_createTransfer"
Expand Down
Loading